[jira] [Commented] (FLINK-9190) YarnResourceManager sometimes does not request new Containers

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463386#comment-16463386
 ] 

ASF GitHub Bot commented on FLINK-9190:
---

Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5931
  
@GJL In blink, we solve this problem like this. 
When a container complete, we will first whether the container has 
registered yet, if it has registered before, RM will not request a new 
container, as the job master will ask for it when failover. If it has not 
registered, RM will request a new one.


> YarnResourceManager sometimes does not request new Containers
> -
>
> Key: FLINK-9190
> URL: https://issues.apache.org/jira/browse/FLINK-9190
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.5.0
> Environment: Hadoop 2.8.3
> ZooKeeper 3.4.5
> Flink 71c3cd2781d36e0a03d022a38cc4503d343f7ff8
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
> Attachments: yarn-logs
>
>
> *Description*
> The {{YarnResourceManager}} does not request new containers if 
> {{TaskManagers}} are killed rapidly in succession. After 5 minutes the job is 
> restarted due to {{NoResourceAvailableException}}, and the job runs normally 
> afterwards. I suspect that {{TaskManager}} failures are not registered if the 
> failure occurs before the {{TaskManager}} registers with the master. Logs are 
> attached; I added additional log statements to 
> {{YarnResourceManager.onContainersCompleted}} and 
> {{YarnResourceManager.onContainersAllocated}}.
> *Expected Behavior*
> The {{YarnResourceManager}} should recognize that the container is completed 
> and keep requesting new containers. The job should run as soon as resources 
> are available. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5931: [FLINK-9190][flip6,yarn] Request new container if contain...

2018-05-03 Thread shuai-xu
Github user shuai-xu commented on the issue:

https://github.com/apache/flink/pull/5931
  
@GJL In blink, we solve this problem like this. 
When a container complete, we will first whether the container has 
registered yet, if it has registered before, RM will not request a new 
container, as the job master will ask for it when failover. If it has not 
registered, RM will request a new one.


---


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463384#comment-16463384
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
I think even with the `UnloadableTypeSerializerException` exception 
bubbling approach, we actually still need a flag in the serialization proxy to 
decide how to handle the exception.

The serialization proxy handles deserialization of all meta data of all 
registered key states, so that would be the highest level where we need to 
decide whether or not to use the dummy serializer.

If we want to hand out this control to an even higher level (i.e. the 
backend), we would then need to break up the deserialization logic from the 
serialization proxy, which IMO isn't appropriate.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

2018-05-03 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
I think even with the `UnloadableTypeSerializerException` exception 
bubbling approach, we actually still need a flag in the serialization proxy to 
decide how to handle the exception.

The serialization proxy handles deserialization of all meta data of all 
registered key states, so that would be the highest level where we need to 
decide whether or not to use the dummy serializer.

If we want to hand out this control to an even higher level (i.e. the 
backend), we would then need to break up the deserialization logic from the 
serialization proxy, which IMO isn't appropriate.


---


[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

2018-05-03 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
@StefanRRichter yes, now that you mentioned it, the 
`isSerializerPresenceRequiredFlag` does seem a bit awkward to be in the 
serialization proxy. Essentially, what it is only doing is serving as a switch 
to decide whether or not to fail - something that could be done by the caller.

I'll quickly try your suggested approach and see how that turns out.


---


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463363#comment-16463363
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5950
  
@StefanRRichter yes, now that you mentioned it, the 
`isSerializerPresenceRequiredFlag` does seem a bit awkward to be in the 
serialization proxy. Essentially, what it is only doing is serving as a switch 
to decide whether or not to fail - something that could be done by the caller.

I'll quickly try your suggested approach and see how that turns out.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-05-03 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-9294:
-
Description: 
Most of the UDF function signatures that includes composite types such as 
*{{MAP}}*, *{{ARRAY}}*, etc would require user to override 
*{{getParameterType}}* or *{{getResultType}}* method explicitly. 

It should be able to resolve the composite type based on the function 
signature, such as:

{code:java}
public List eval(Map mapArg) {
  //...
}
{code}
should automatically resolve that:
- *{{ObjectArrayTypeInfo}}* to be the result type.
- *{{MapTypeInfo}}*  to be the 
parameter type.


  was:
Most of the UDF function signatures that includes composite types such as 
*{{MAP}}*, *{{ARRAY}}*, etc would require user to override 
*{{getParameterType}}* or *{{getResultType}}* method explicitly. 

It should be able to resolve the composite type based on the function 
signature, such as:

{code:java}
public List eval(Map mapArg) {
  //...
}
{code}
should automatically resolve *{{ObjectArrayTypeInfo}}* & 
*{{MapTypeInfo}}* to be the result 
type and parameter type.



> Improve type inference for UDFs with composite parameter or result type 
> 
>
> Key: FLINK-9294
> URL: https://issues.apache.org/jira/browse/FLINK-9294
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Most of the UDF function signatures that includes composite types such as 
> *{{MAP}}*, *{{ARRAY}}*, etc would require user to override 
> *{{getParameterType}}* or *{{getResultType}}* method explicitly. 
> It should be able to resolve the composite type based on the function 
> signature, such as:
> {code:java}
> public List eval(Map mapArg) {
>   //...
> }
> {code}
> should automatically resolve that:
> - *{{ObjectArrayTypeInfo}}* to be the result type.
> - *{{MapTypeInfo}}*  to be the 
> parameter type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-05-03 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-9294:
-
Description: 
Most of the UDF function signatures that includes composite types such as 
*{{MAP}}*, *{{ARRAY}}*, etc would require user to override 
*{{getParameterType}}* or *{{getResultType}}* method explicitly. 

It should be able to resolve the composite type based on the function 
signature, such as:

{code:java}
public List eval(Map mapArg) {
  //...
}
{code}
should automatically resolve *{{ObjectArrayTypeInfo}}* & 
*{{MapTypeInfo}}* to be the result 
type and parameter type.


  was:
Most of the UDF function signatures that includes composite types such as 
*{{MAP}}*, *{{ARRAY}}*, etc would require user to override 
*{{getParameterType}}* or *{{getResultType}}* method explicitly. 

It should be able to resolve the composite type based on the function signature.


> Improve type inference for UDFs with composite parameter or result type 
> 
>
> Key: FLINK-9294
> URL: https://issues.apache.org/jira/browse/FLINK-9294
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Most of the UDF function signatures that includes composite types such as 
> *{{MAP}}*, *{{ARRAY}}*, etc would require user to override 
> *{{getParameterType}}* or *{{getResultType}}* method explicitly. 
> It should be able to resolve the composite type based on the function 
> signature, such as:
> {code:java}
> public List eval(Map mapArg) {
>   //...
> }
> {code}
> should automatically resolve *{{ObjectArrayTypeInfo}}* 
> & *{{MapTypeInfo}}* to be the 
> result type and parameter type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7795) Utilize error-prone to discover common coding mistakes

2018-05-03 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7795?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16345955#comment-16345955
 ] 

Ted Yu edited comment on FLINK-7795 at 5/3/18 11:40 PM:


error-prone has JDK 8 dependency .


was (Author: yuzhih...@gmail.com):
error-prone has JDK 8 dependency.

> Utilize error-prone to discover common coding mistakes
> --
>
> Key: FLINK-7795
> URL: https://issues.apache.org/jira/browse/FLINK-7795
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Priority: Major
>
> http://errorprone.info/ is a tool which detects common coding mistakes.
> We should incorporate into Flink build process.
> Here are the dependencies:
> {code}
> 
>   com.google.errorprone
>   error_prone_annotation
>   ${error-prone.version}
>   provided
> 
> 
>   
>   com.google.auto.service
>   auto-service
>   1.0-rc3
>   true
> 
> 
>   com.google.errorprone
>   error_prone_check_api
>   ${error-prone.version}
>   provided
>   
> 
>   com.google.code.findbugs
>   jsr305
> 
>   
> 
> 
>   com.google.errorprone
>   javac
>   9-dev-r4023-3
>   provided
> 
>   
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8554) Upgrade AWS SDK

2018-05-03 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-8554:
---

Assignee: vinoyang

> Upgrade AWS SDK
> ---
>
> Key: FLINK-8554
> URL: https://issues.apache.org/jira/browse/FLINK-8554
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> AWS SDK 1.11.271 fixes a lot of bugs.
> One of which would exhibit the following:
> {code}
> Caused by: java.lang.NullPointerException
>   at com.amazonaws.metrics.AwsSdkMetrics.getRegion(AwsSdkMetrics.java:729)
>   at com.amazonaws.metrics.MetricAdmin.getRegion(MetricAdmin.java:67)
>   at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-6719) Add details about fault-tolerance of timers to ProcessFunction docs

2018-05-03 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-6719.

   Resolution: Fixed
Fix Version/s: (was: 1.5.1)
   (was: 1.6.0)
   1.4.3
   1.5.0

Fixed for 1.6.0 with 2cfd89c845ce2341c94a88e5cea7a4c86419b25f
Fixed for 1.5.0 with 9435cd4fe54fd8600d661175dcf00d6b4464e200
Fixed for 1.4.3 with b84cdda4d5fa4ded807c13138f5742e379be453b

> Add details about fault-tolerance of timers to ProcessFunction docs
> ---
>
> Key: FLINK-6719
> URL: https://issues.apache.org/jira/browse/FLINK-6719
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Documentation
>Affects Versions: 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0, 1.4.3
>
>
> The fault-tolerance of timers is a frequently asked questions on the mailing 
> lists. We should add details about the topic in the {{ProcessFunction}} docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6719) Add details about fault-tolerance of timers to ProcessFunction docs

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463147#comment-16463147
 ] 

ASF GitHub Bot commented on FLINK-6719:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5887


> Add details about fault-tolerance of timers to ProcessFunction docs
> ---
>
> Key: FLINK-6719
> URL: https://issues.apache.org/jira/browse/FLINK-6719
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Documentation
>Affects Versions: 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.6.0, 1.5.1
>
>
> The fault-tolerance of timers is a frequently asked questions on the mailing 
> lists. We should add details about the topic in the {{ProcessFunction}} docs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5887: [FLINK-6719] [docs] Add details about fault-tolera...

2018-05-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5887


---


[jira] [Resolved] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id

2018-05-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-9293.
-
Resolution: Fixed

Fixed in
  - 1.5.0 via df1eda8646a769b419388db2cf699cc53b009849
  - 1.6.0 via bbaf82ebe245d4758e73aa928d79a3708c816311

> SlotPool should check slot id when accepting a slot offer with existing 
> allocation id
> -
>
> Key: FLINK-9293
> URL: https://issues.apache.org/jira/browse/FLINK-9293
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For flip-6, there may be two or more slot assigned to the same slot 
> allocation. For example, taskExecutor1 register, and assign allocationID1 to 
> its slot1, but from taskExecutor1 side, the registeration timeout, and it 
> register again, RM will fail the allocationID1 and assign slot2 on 
> taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. 
> So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the 
> allocationID1. Now slot pool just accept all the slot offer, and this may one 
> slot leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id

2018-05-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-9293.
---

> SlotPool should check slot id when accepting a slot offer with existing 
> allocation id
> -
>
> Key: FLINK-9293
> URL: https://issues.apache.org/jira/browse/FLINK-9293
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For flip-6, there may be two or more slot assigned to the same slot 
> allocation. For example, taskExecutor1 register, and assign allocationID1 to 
> its slot1, but from taskExecutor1 side, the registeration timeout, and it 
> register again, RM will fail the allocationID1 and assign slot2 on 
> taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. 
> So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the 
> allocationID1. Now slot pool just accept all the slot offer, and this may one 
> slot leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463122#comment-16463122
 ] 

ASF GitHub Bot commented on FLINK-9293:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5951


> SlotPool should check slot id when accepting a slot offer with existing 
> allocation id
> -
>
> Key: FLINK-9293
> URL: https://issues.apache.org/jira/browse/FLINK-9293
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For flip-6, there may be two or more slot assigned to the same slot 
> allocation. For example, taskExecutor1 register, and assign allocationID1 to 
> its slot1, but from taskExecutor1 side, the registeration timeout, and it 
> register again, RM will fail the allocationID1 and assign slot2 on 
> taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. 
> So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the 
> allocationID1. Now slot pool just accept all the slot offer, and this may one 
> slot leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5951: [FLINK-9293] [runtime] SlotPool should check slot ...

2018-05-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5951


---


[jira] [Commented] (FLINK-9141) Calling getSideOutput() and split() on one DataStream causes NPE

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463093#comment-16463093
 ] 

ASF GitHub Bot commented on FLINK-9141:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5836#discussion_r185942219
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests that verify correct behavior when applying split/getSideOutput 
operations on one {@link DataStream}.
+ */
+public class SplitSideOutputTest {
+
+   private static final OutputTag outputTag = new 
OutputTag("outputTag") {};
+
+   @Test
+   public void testSideOutputAfterSelectIsForbidden() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   SingleOutputStreamOperator processInput = 
env.fromElements("foo")
+   .process(new DummyProcessFunction());
+
+   processInput.split(Collections::singleton);
+
+   try {
+   processInput.getSideOutput(outputTag);
+   } catch (UnsupportedOperationException expected){
+   // expected
+   }
+   }
+
+   @Test
+   public void testSelectAfterSideOutputIsForbidden() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   SingleOutputStreamOperator processInput = 
env.fromElements("foo")
+   .process(new DummyProcessFunction());
+
+   processInput.getSideOutput(outputTag);
+
+   try {
+   processInput.split(Collections::singleton);
--- End diff --

same as above


> Calling getSideOutput() and split() on one DataStream causes NPE
> 
>
> Key: FLINK-9141
> URL: https://issues.apache.org/jira/browse/FLINK-9141
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>
> Calling both {{getSideOutput()}} and {{split()}} on one DataStream causes a 
> {{NullPointerException}} to be thrown at runtime.
> As a work-around one can add a no-op map function before the split() call.
> Exception:
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.(DirectedOutput.java:79)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:326)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:128)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:274)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Reproducer:
> {code}
> private static final OutputTag tag = new OutputTag("tag") {};
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStream dataStream1 = env.fromElements("foo");
>   SingleOutputStreamOperator processedStream = dataStream1
>   .process(new ProcessFunction() {
>   @Override
>  

[jira] [Commented] (FLINK-9141) Calling getSideOutput() and split() on one DataStream causes NPE

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463092#comment-16463092
 ] 

ASF GitHub Bot commented on FLINK-9141:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5836#discussion_r185942147
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests that verify correct behavior when applying split/getSideOutput 
operations on one {@link DataStream}.
+ */
+public class SplitSideOutputTest {
+
+   private static final OutputTag outputTag = new 
OutputTag("outputTag") {};
+
+   @Test
+   public void testSideOutputAfterSelectIsForbidden() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   SingleOutputStreamOperator processInput = 
env.fromElements("foo")
+   .process(new DummyProcessFunction());
+
+   processInput.split(Collections::singleton);
+
+   try {
+   processInput.getSideOutput(outputTag);
--- End diff --

add `Assert.fail();` after `processInput.getSideOutput(outputTag);` to 
ensure that the test fails if no exception is thrown.


> Calling getSideOutput() and split() on one DataStream causes NPE
> 
>
> Key: FLINK-9141
> URL: https://issues.apache.org/jira/browse/FLINK-9141
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
>
> Calling both {{getSideOutput()}} and {{split()}} on one DataStream causes a 
> {{NullPointerException}} to be thrown at runtime.
> As a work-around one can add a no-op map function before the split() call.
> Exception:
> {code}
> Caused by: java.lang.NullPointerException
>   at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.(DirectedOutput.java:79)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:326)
>   at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:128)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:274)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> Reproducer:
> {code}
> private static final OutputTag tag = new OutputTag("tag") {};
> public static void main(String[] args) throws Exception {
>   StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>   DataStream dataStream1 = env.fromElements("foo");
>   SingleOutputStreamOperator processedStream = dataStream1
>   .process(new ProcessFunction() {
>   @Override
>   public void processElement(String value, Context ctx, 
> Collector out) {
>   }
>   });
>   processedStream.getSideOutput(tag)
>   .print();
>   processedStream
>   .split(Collections::singletonList)
>   .select("bar")
>   .print();
>   env.execute();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5836: [FLINK-9141][datastream] Fail early when using bot...

2018-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5836#discussion_r185942219
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests that verify correct behavior when applying split/getSideOutput 
operations on one {@link DataStream}.
+ */
+public class SplitSideOutputTest {
+
+   private static final OutputTag outputTag = new 
OutputTag("outputTag") {};
+
+   @Test
+   public void testSideOutputAfterSelectIsForbidden() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   SingleOutputStreamOperator processInput = 
env.fromElements("foo")
+   .process(new DummyProcessFunction());
+
+   processInput.split(Collections::singleton);
+
+   try {
+   processInput.getSideOutput(outputTag);
+   } catch (UnsupportedOperationException expected){
+   // expected
+   }
+   }
+
+   @Test
+   public void testSelectAfterSideOutputIsForbidden() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   SingleOutputStreamOperator processInput = 
env.fromElements("foo")
+   .process(new DummyProcessFunction());
+
+   processInput.getSideOutput(outputTag);
+
+   try {
+   processInput.split(Collections::singleton);
--- End diff --

same as above


---


[GitHub] flink pull request #5836: [FLINK-9141][datastream] Fail early when using bot...

2018-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5836#discussion_r185942147
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests that verify correct behavior when applying split/getSideOutput 
operations on one {@link DataStream}.
+ */
+public class SplitSideOutputTest {
+
+   private static final OutputTag outputTag = new 
OutputTag("outputTag") {};
+
+   @Test
+   public void testSideOutputAfterSelectIsForbidden() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   SingleOutputStreamOperator processInput = 
env.fromElements("foo")
+   .process(new DummyProcessFunction());
+
+   processInput.split(Collections::singleton);
+
+   try {
+   processInput.getSideOutput(outputTag);
--- End diff --

add `Assert.fail();` after `processInput.getSideOutput(outputTag);` to 
ensure that the test fails if no exception is thrown.


---


[jira] [Comment Edited] (FLINK-6105) Properly handle InterruptedException in HadoopInputFormatBase

2018-05-03 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307281#comment-16307281
 ] 

Ted Yu edited comment on FLINK-6105 at 5/3/18 9:24 PM:
---

In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 :

{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}

Interrupt status should be restored, or throw InterruptedIOException .


was (Author: yuzhih...@gmail.com):
In 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/RollingSink.java
 :
{code}
  try {
Thread.sleep(500);
  } catch (InterruptedException e1) {
// ignore it
  }
{code}

Interrupt status should be restored, or throw InterruptedIOException .

> Properly handle InterruptedException in HadoopInputFormatBase
> -
>
> Key: FLINK-6105
> URL: https://issues.apache.org/jira/browse/FLINK-6105
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
>
> When catching InterruptedException, we should throw InterruptedIOException 
> instead of IOException.
> The following example is from HadoopInputFormatBase :
> {code}
> try {
>   splits = this.mapreduceInputFormat.getSplits(jobContext);
> } catch (InterruptedException e) {
>   throw new IOException("Could not get Splits.", e);
> }
> {code}
> There may be other places where IOE is thrown.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8726) Code highlighting partially broken

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463056#comment-16463056
 ] 

ASF GitHub Bot commented on FLINK-8726:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5909


> Code highlighting partially broken
> --
>
> Key: FLINK-8726
> URL: https://issues.apache.org/jira/browse/FLINK-8726
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> With the recent changes around the documentation build dependencies code 
> highlighting is no longer fully working.
> Sections as below are rendered without any background [like 
> here|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html].
> {code}
> ~~~bash
> # get the hadoop2 package from the Flink download page at
> # {{ site.download_url }}
> curl -O 
> tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
> cd flink-{{ site.version }}/
> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
> ./examples/batch/WordCount.jar
> ~~~
> {code}
> Sections using the {{\{% highlight java %}}} syntax are still working.
> We may have to do a sweep over the docs and port all code sections to the 
> working syntax.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5909: [FLINK-8726][docs] Fix and normalize code-highligh...

2018-05-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5909


---


[jira] [Closed] (FLINK-8726) Code highlighting partially broken

2018-05-03 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8726.
---
Resolution: Fixed

master: b13c70b60b04e0d51604d0e37868612f77e86299
1.5: 6d0775176af206edb86d8ac9cfff35654208e1e9

> Code highlighting partially broken
> --
>
> Key: FLINK-8726
> URL: https://issues.apache.org/jira/browse/FLINK-8726
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> With the recent changes around the documentation build dependencies code 
> highlighting is no longer fully working.
> Sections as below are rendered without any background [like 
> here|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html].
> {code}
> ~~~bash
> # get the hadoop2 package from the Flink download page at
> # {{ site.download_url }}
> curl -O 
> tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
> cd flink-{{ site.version }}/
> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
> ./examples/batch/WordCount.jar
> ~~~
> {code}
> Sections using the {{\{% highlight java %}}} syntax are still working.
> We may have to do a sweep over the docs and port all code sections to the 
> working syntax.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5909: [FLINK-8726][docs] Fix and normalize code-highlighting

2018-05-03 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5909
  
merging.


---


[jira] [Commented] (FLINK-8726) Code highlighting partially broken

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463052#comment-16463052
 ] 

ASF GitHub Bot commented on FLINK-8726:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5909
  
merging.


> Code highlighting partially broken
> --
>
> Key: FLINK-8726
> URL: https://issues.apache.org/jira/browse/FLINK-8726
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> With the recent changes around the documentation build dependencies code 
> highlighting is no longer fully working.
> Sections as below are rendered without any background [like 
> here|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html].
> {code}
> ~~~bash
> # get the hadoop2 package from the Flink download page at
> # {{ site.download_url }}
> curl -O 
> tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
> cd flink-{{ site.version }}/
> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
> ./examples/batch/WordCount.jar
> ~~~
> {code}
> Sections using the {{\{% highlight java %}}} syntax are still working.
> We may have to do a sweep over the docs and port all code sections to the 
> working syntax.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463037#comment-16463037
 ] 

ASF GitHub Bot commented on FLINK-8237:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5927
  
Thanks for the PR @pavel-shvetsov-git!
I've left a suggestion to improve the error message.
Afterwards the PR should be good to merge.


> BucketingSink throws NPE when Writer.duplicate returns null
> ---
>
> Key: FLINK-8237
> URL: https://issues.apache.org/jira/browse/FLINK-8237
> Project: Flink
>  Issue Type: Bug
>Reporter: Gábor Hermann
>Priority: Minor
>
> Users need to look into Flink code to find the cause. We could catch that 
> null before even running the job.
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added

2018-05-03 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5927
  
Thanks for the PR @pavel-shvetsov-git!
I've left a suggestion to improve the error message.
Afterwards the PR should be good to merge.


---


[jira] [Commented] (FLINK-8237) BucketingSink throws NPE when Writer.duplicate returns null

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463035#comment-16463035
 ] 

ASF GitHub Bot commented on FLINK-8237:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5927#discussion_r185932952
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -550,6 +550,9 @@ private void openNewPartFile(Path bucketPath, 
BucketState bucketState) throws
Path inProgressPath = getInProgressPathFor(partPath);
if (bucketState.writer == null) {
bucketState.writer = writerTemplate.duplicate();
+   if (bucketState.writer == null) {
+   throw new RuntimeException("Could not duplicate 
writer.");
--- End diff --

I would add the class name of the `writerTemplate` object and that the 
class needs to implement the `Writer.duplicate()` method.


> BucketingSink throws NPE when Writer.duplicate returns null
> ---
>
> Key: FLINK-8237
> URL: https://issues.apache.org/jira/browse/FLINK-8237
> Project: Flink
>  Issue Type: Bug
>Reporter: Gábor Hermann
>Priority: Minor
>
> Users need to look into Flink code to find the cause. We could catch that 
> null before even running the job.
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:546)
> at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:441)
> at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at 
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5927: [FLINK-8237] [BucketingSink] Better error message ...

2018-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5927#discussion_r185932952
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -550,6 +550,9 @@ private void openNewPartFile(Path bucketPath, 
BucketState bucketState) throws
Path inProgressPath = getInProgressPathFor(partPath);
if (bucketState.writer == null) {
bucketState.writer = writerTemplate.duplicate();
+   if (bucketState.writer == null) {
+   throw new RuntimeException("Could not duplicate 
writer.");
--- End diff --

I would add the class name of the `writerTemplate` object and that the 
class needs to implement the `Writer.duplicate()` method.


---


[jira] [Commented] (FLINK-8726) Code highlighting partially broken

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8726?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16463025#comment-16463025
 ] 

ASF GitHub Bot commented on FLINK-8726:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5909
  
Thanks for cleaning up the syntax highlighting @zentol!
+1 to merge


> Code highlighting partially broken
> --
>
> Key: FLINK-8726
> URL: https://issues.apache.org/jira/browse/FLINK-8726
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Trivial
> Fix For: 1.5.0
>
>
> With the recent changes around the documentation build dependencies code 
> highlighting is no longer fully working.
> Sections as below are rendered without any background [like 
> here|https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/windows.html].
> {code}
> ~~~bash
> # get the hadoop2 package from the Flink download page at
> # {{ site.download_url }}
> curl -O 
> tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
> cd flink-{{ site.version }}/
> ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 
> ./examples/batch/WordCount.jar
> ~~~
> {code}
> Sections using the {{\{% highlight java %}}} syntax are still working.
> We may have to do a sweep over the docs and port all code sections to the 
> working syntax.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5909: [FLINK-8726][docs] Fix and normalize code-highlighting

2018-05-03 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5909
  
Thanks for cleaning up the syntax highlighting @zentol!
+1 to merge


---


[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462758#comment-16462758
 ] 

ASF GitHub Bot commented on FLINK-8978:
---

Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/5947
  
agree, I added constant check of previous non-null state into its update 
method


> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5947: [FLINK-8978] Stateful generic stream job upgrade e2e test

2018-05-03 Thread azagrebin
Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/5947
  
agree, I added constant check of previous non-null state into its update 
method


---


[jira] [Updated] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode

2018-05-03 Thread Christopher Ng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher Ng updated FLINK-9295:
--
Description: 
{{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
sinks are used within the same sub-task.  This can happen when 
operator-chaining results in two different sinks in the same topology being 
chained into a task, and thus into each of its sub-tasks.

The problem is that {{TransactionIdsGenerator}} only takes into account the 
task name, the subtask index, the number of subtasks, and a couple of other 
things.  All the attributes are the same between different 
{{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
transaction ids and one of them ends up failing.

  was:
{{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
sinks are used within the same sub-task.  This can happen when 
operator-chaining results in two different sinks in the same topology being 
chained into a single sub-task.

The problem is that {{TransactionIdsGenerator}} only takes into account the 
task name, the subtask index, the number of subtasks, and a couple of other 
things.  All the attributes are the same between different 
{{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
transaction ids and one of them ends up failing.


> FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in 
> EXACTLY_ONCE mode
> -
>
> Key: FLINK-9295
> URL: https://issues.apache.org/jira/browse/FLINK-9295
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Christopher Ng
>Priority: Major
>
> {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
> sinks are used within the same sub-task.  This can happen when 
> operator-chaining results in two different sinks in the same topology being 
> chained into a task, and thus into each of its sub-tasks.
> The problem is that {{TransactionIdsGenerator}} only takes into account the 
> task name, the subtask index, the number of subtasks, and a couple of other 
> things.  All the attributes are the same between different 
> {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
> transaction ids and one of them ends up failing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode

2018-05-03 Thread Christopher Ng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher Ng updated FLINK-9295:
--
Description: 
{{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
sinks are used within the same sub-task.  This can happen when 
operator-chaining results in two different sinks in the same topology being 
chained into a single sub-task.

The problem is that {{TransactionIdsGenerator}} only takes into account the 
task name, the subtask index, the number of subtasks, and a couple of other 
things.  All the attributes are the same between different 
{{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
transaction ids and one of them ends up failing.

  was:
{{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
sinks are used within the same sub-task.  This can happen when chaining results 
in two different sinks in the same topology being chained into a single 
sub-task.

The problem is that {{TransactionIdsGenerator}} only takes into account the 
task name, the subtask index, the number of subtasks, and a couple of other 
things.  All the attributes are the same between different 
{{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
transaction ids and one of them ends up failing.


> FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in 
> EXACTLY_ONCE mode
> -
>
> Key: FLINK-9295
> URL: https://issues.apache.org/jira/browse/FLINK-9295
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.2
>Reporter: Christopher Ng
>Priority: Major
>
> {{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
> sinks are used within the same sub-task.  This can happen when 
> operator-chaining results in two different sinks in the same topology being 
> chained into a single sub-task.
> The problem is that {{TransactionIdsGenerator}} only takes into account the 
> task name, the subtask index, the number of subtasks, and a couple of other 
> things.  All the attributes are the same between different 
> {{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
> transaction ids and one of them ends up failing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9295) FlinkKafkaProducer011 sometimes throws ProducerFencedExceptions when in EXACTLY_ONCE mode

2018-05-03 Thread Christopher Ng (JIRA)
Christopher Ng created FLINK-9295:
-

 Summary: FlinkKafkaProducer011 sometimes throws 
ProducerFencedExceptions when in EXACTLY_ONCE mode
 Key: FLINK-9295
 URL: https://issues.apache.org/jira/browse/FLINK-9295
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector
Affects Versions: 1.4.2
Reporter: Christopher Ng


{{FlinkKafkaProducer011}} can throw {{ProducerFencedExceptions}} if multiple 
sinks are used within the same sub-task.  This can happen when chaining results 
in two different sinks in the same topology being chained into a single 
sub-task.

The problem is that {{TransactionIdsGenerator}} only takes into account the 
task name, the subtask index, the number of subtasks, and a couple of other 
things.  All the attributes are the same between different 
{{FlinkKafkaProducer011s}} within the same sub-task, so they get the same 
transaction ids and one of them ends up failing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462665#comment-16462665
 ] 

ASF GitHub Bot commented on FLINK-9293:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5951
  
Good catch! Thank you for the PR.

Will try to review this asap...


> SlotPool should check slot id when accepting a slot offer with existing 
> allocation id
> -
>
> Key: FLINK-9293
> URL: https://issues.apache.org/jira/browse/FLINK-9293
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For flip-6, there may be two or more slot assigned to the same slot 
> allocation. For example, taskExecutor1 register, and assign allocationID1 to 
> its slot1, but from taskExecutor1 side, the registeration timeout, and it 
> register again, RM will fail the allocationID1 and assign slot2 on 
> taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. 
> So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the 
> allocationID1. Now slot pool just accept all the slot offer, and this may one 
> slot leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5951: [FLINK-9293] [runtime] SlotPool should check slot id when...

2018-05-03 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5951
  
Good catch! Thank you for the PR.

Will try to review this asap...


---


[jira] [Commented] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462643#comment-16462643
 ] 

ASF GitHub Bot commented on FLINK-9235:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5953
  
R: @zentol @suez1224 

At least the secured ITCase is currently failing for legacy mode. 
Investigating...


> Add Integration test for Flink-Yarn-Kerberos integration for flip-6
> ---
>
> Key: FLINK-9235
> URL: https://issues.apache.org/jira/browse/FLINK-9235
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.5.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> We need to provide an integration test for flip-6 similar to 
> YARNSessionFIFOSecuredITCase for the legacy deployment mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462641#comment-16462641
 ] 

ASF GitHub Bot commented on FLINK-9235:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5901
  
@suez1224 I did an alternative version in #5953. All YARN tests currently 
have the problem that they are only executed with the "new" (FLIP-6) mode or 
legacy mode (only the secured IT Case). In my PR I change that to use the 
legacy flag that we specify on Travis. Meaning that all YARN tests now run for 
both configurations.


> Add Integration test for Flink-Yarn-Kerberos integration for flip-6
> ---
>
> Key: FLINK-9235
> URL: https://issues.apache.org/jira/browse/FLINK-9235
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.5.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> We need to provide an integration test for flip-6 similar to 
> YARNSessionFIFOSecuredITCase for the legacy deployment mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5953: [FLINK-9235] Add Integration test for Flink-Yarn-Kerberos...

2018-05-03 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5953
  
R: @zentol @suez1224 

At least the secured ITCase is currently failing for legacy mode. 
Investigating...


---


[GitHub] flink issue #5901: [FLINK-9235][Security] Add integration tests for YARN ker...

2018-05-03 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5901
  
@suez1224 I did an alternative version in #5953. All YARN tests currently 
have the problem that they are only executed with the "new" (FLIP-6) mode or 
legacy mode (only the secured IT Case). In my PR I change that to use the 
legacy flag that we specify on Travis. Meaning that all YARN tests now run for 
both configurations.


---


[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-05-03 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-9294:
-
Description: 
Most of the UDF function signatures that includes composite types such as 
*{{MAP}}*, *{{ARRAY}}*, etc would require user to override 
*{{getParameterType}}* or *{{getResultType}}* method explicitly. 

It should be able to resolve the composite type based on the function signature.

  was:
Most of the UDF function signatures that includes composite types such as 
{quote}MAP{quote}, {quote}ARRAY{quote}, etc would require user to override 
{code:java}getParameterType{code} or {code:java}getResultType{code} method 
explicitly. 

It should be able to resolve the composite type based on the function signature.


> Improve type inference for UDFs with composite parameter or result type 
> 
>
> Key: FLINK-9294
> URL: https://issues.apache.org/jira/browse/FLINK-9294
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> Most of the UDF function signatures that includes composite types such as 
> *{{MAP}}*, *{{ARRAY}}*, etc would require user to override 
> *{{getParameterType}}* or *{{getResultType}}* method explicitly. 
> It should be able to resolve the composite type based on the function 
> signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-05-03 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong reassigned FLINK-9294:


Assignee: Rong Rong

> Improve type inference for UDFs with composite parameter or result type 
> 
>
> Key: FLINK-9294
> URL: https://issues.apache.org/jira/browse/FLINK-9294
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> Most of the UDF function signatures that includes composite types such as 
> *{{MAP}}*, *{{ARRAY}}*, etc would require user to override 
> *{{getParameterType}}* or *{{getResultType}}* method explicitly. 
> It should be able to resolve the composite type based on the function 
> signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-05-03 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-9294:
-
Description: 
Most of the UDF function signatures that includes composite types such as 
{quote}MAP{quote}, {quote}ARRAY{quote}, etc would require user to override 
{code:java}getParameterType{code} or {code:java}getResultType{code} method 
explicitly. 

It should be able to resolve the composite type based on the function signature.

  was:
Most of the UDF function signatures that includes composite types such as 
{code:java}MAP{code}, {code:java}ARRAY{code}, etc would require user to 
override {code:java}getParameterType{code} or {code:java}getResultType{code} 
method explicitly. 

It should be able to resolve the composite type based on the function signature.


> Improve type inference for UDFs with composite parameter or result type 
> 
>
> Key: FLINK-9294
> URL: https://issues.apache.org/jira/browse/FLINK-9294
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> Most of the UDF function signatures that includes composite types such as 
> {quote}MAP{quote}, {quote}ARRAY{quote}, etc would require user to override 
> {code:java}getParameterType{code} or {code:java}getResultType{code} method 
> explicitly. 
> It should be able to resolve the composite type based on the function 
> signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-05-03 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-9294:
-
Description: 
Most of the UDF function signatures that includes composite types such as 
{code:java}MAP{code}, {code:java}ARRAY{code}, etc would require user to 
override {code:java}getParameterType{code} or {code:java}getResultType{code} 
method explicitly. 

It should be able to resolve the composite type based on the function signature.

  was:
Most of the UDF function signatures that includes composite types such as 
{code}MAP{/code} {{ARRAY}}, etc would require user to override 
{{getParameterType}} or {{getResultType}} method explicitly. 

It should be able to resolve the composite type based on the function signature.


> Improve type inference for UDFs with composite parameter or result type 
> 
>
> Key: FLINK-9294
> URL: https://issues.apache.org/jira/browse/FLINK-9294
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> Most of the UDF function signatures that includes composite types such as 
> {code:java}MAP{code}, {code:java}ARRAY{code}, etc would require user to 
> override {code:java}getParameterType{code} or {code:java}getResultType{code} 
> method explicitly. 
> It should be able to resolve the composite type based on the function 
> signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-05-03 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-9294:
-
Description: 
Most of the UDF function signatures that includes composite types such as 
{code}MAP{/code} {{ARRAY}}, etc would require user to override 
{{getParameterType}} or {{getResultType}} method explicitly. 

It should be able to resolve the composite type based on the function signature.

  was:
For now most of the UDF function signatures that includes composite types such 
as {{MAP}} {{ARRAY}}, etc would require user to override {{getParameterType}} 
or {{getResultType}} method explicitly. 

It should be able to resolve the composite type based on the function signature.


> Improve type inference for UDFs with composite parameter or result type 
> 
>
> Key: FLINK-9294
> URL: https://issues.apache.org/jira/browse/FLINK-9294
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> Most of the UDF function signatures that includes composite types such as 
> {code}MAP{/code} {{ARRAY}}, etc would require user to override 
> {{getParameterType}} or {{getResultType}} method explicitly. 
> It should be able to resolve the composite type based on the function 
> signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-05-03 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9294?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong updated FLINK-9294:
-
Description: 
For now most of the UDF function signatures that includes composite types such 
as {{MAP}} {{ARRAY}}, etc would require user to override {{getParameterType}} 
or {{getResultType}} method explicitly. 

It should be able to resolve the composite type based on the function signature.

  was:
For now most of the UDF function signatures that includes composite types such 
as `MAP` `ARRAY`, etc would require user to override `getParameterType` or 
`getResultType` method explicitly. 

It should be able to resolve the composite type based on the function signature.


> Improve type inference for UDFs with composite parameter or result type 
> 
>
> Key: FLINK-9294
> URL: https://issues.apache.org/jira/browse/FLINK-9294
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Rong Rong
>Priority: Major
>
> For now most of the UDF function signatures that includes composite types 
> such as {{MAP}} {{ARRAY}}, etc would require user to override 
> {{getParameterType}} or {{getResultType}} method explicitly. 
> It should be able to resolve the composite type based on the function 
> signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9294) Improve type inference for UDFs with composite parameter or result type

2018-05-03 Thread Rong Rong (JIRA)
Rong Rong created FLINK-9294:


 Summary: Improve type inference for UDFs with composite parameter 
or result type 
 Key: FLINK-9294
 URL: https://issues.apache.org/jira/browse/FLINK-9294
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Rong Rong


For now most of the UDF function signatures that includes composite types such 
as `MAP` `ARRAY`, etc would require user to override `getParameterType` or 
`getResultType` method explicitly. 

It should be able to resolve the composite type based on the function signature.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462615#comment-16462615
 ] 

ASF GitHub Bot commented on FLINK-9235:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5953

[FLINK-9235] Add Integration test for Flink-Yarn-Kerberos integration for 
flip-6

Alternative version of #5901

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-9235-flip-6-yarn-secured-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5953.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5953


commit 9ca7852fe1a6f1a86d05ba2f7851199e43411579
Author: Aljoscha Krettek 
Date:   2018-05-03T14:23:29Z

Remove special-case krb5.conf code from YARN runners

commit b4fb889c911ccef0521cd9ed8839190a12f2d4a5
Author: Aljoscha Krettek 
Date:   2018-05-03T14:27:40Z

[FLINK-9235] Test new FLIP-6 code in YARNSessionFIFOSecuredITCase

Before, always setting mode to LEGACY_MODE when security settings are
present caused the test never to run with the new code.

For this, we also need to actually execute an example. Otherwise, no
TaskExecutors would be brought up.




> Add Integration test for Flink-Yarn-Kerberos integration for flip-6
> ---
>
> Key: FLINK-9235
> URL: https://issues.apache.org/jira/browse/FLINK-9235
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.5.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> We need to provide an integration test for flip-6 similar to 
> YARNSessionFIFOSecuredITCase for the legacy deployment mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5953: [FLINK-9235] Add Integration test for Flink-Yarn-K...

2018-05-03 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5953

[FLINK-9235] Add Integration test for Flink-Yarn-Kerberos integration for 
flip-6

Alternative version of #5901

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink 
jira-9235-flip-6-yarn-secured-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5953.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5953


commit 9ca7852fe1a6f1a86d05ba2f7851199e43411579
Author: Aljoscha Krettek 
Date:   2018-05-03T14:23:29Z

Remove special-case krb5.conf code from YARN runners

commit b4fb889c911ccef0521cd9ed8839190a12f2d4a5
Author: Aljoscha Krettek 
Date:   2018-05-03T14:27:40Z

[FLINK-9235] Test new FLIP-6 code in YARNSessionFIFOSecuredITCase

Before, always setting mode to LEGACY_MODE when security settings are
present caused the test never to run with the new code.

For this, we also need to actually execute an example. Otherwise, no
TaskExecutors would be brought up.




---


[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462608#comment-16462608
 ] 

ASF GitHub Bot commented on FLINK-8978:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5947
  
Please also double check, it seems there are files without license header.


> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5947: [FLINK-8978] Stateful generic stream job upgrade e2e test

2018-05-03 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5947
  
Please also double check, it seems there are files without license header.


---


[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462591#comment-16462591
 ] 

ASF GitHub Bot commented on FLINK-8978:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5947#discussion_r185840683
  
--- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
 ---
@@ -33,21 +33,28 @@
private static final long serialVersionUID = -1205814329756790916L;
 
private transient ValueState valueState;
+   private transient boolean afterRestoration;
private final TypeSerializer typeSerializer;
private final JoinFunction stateValueGenerator;
+   private final RestoredStateVerifier restoredStateVerifier;
 
public ArtificialValueStateBuilder(
String stateName,
JoinFunction stateValueGenerator,
-   TypeSerializer typeSerializer) {
-
+   TypeSerializer typeSerializer,
+   RestoredStateVerifier restoredStateVerifier) {
super(stateName);
this.typeSerializer = typeSerializer;
this.stateValueGenerator = stateValueGenerator;
+   this.restoredStateVerifier = restoredStateVerifier;
}
 
@Override
public void artificialStateForElement(IN event) throws Exception {
+   if (afterRestoration) {
--- End diff --

As this is a test job, I think it might not hurt to just check every 
element after a restore.


> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5947: [FLINK-8978] Stateful generic stream job upgrade e...

2018-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5947#discussion_r185840683
  
--- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
 ---
@@ -33,21 +33,28 @@
private static final long serialVersionUID = -1205814329756790916L;
 
private transient ValueState valueState;
+   private transient boolean afterRestoration;
private final TypeSerializer typeSerializer;
private final JoinFunction stateValueGenerator;
+   private final RestoredStateVerifier restoredStateVerifier;
 
public ArtificialValueStateBuilder(
String stateName,
JoinFunction stateValueGenerator,
-   TypeSerializer typeSerializer) {
-
+   TypeSerializer typeSerializer,
+   RestoredStateVerifier restoredStateVerifier) {
super(stateName);
this.typeSerializer = typeSerializer;
this.stateValueGenerator = stateValueGenerator;
+   this.restoredStateVerifier = restoredStateVerifier;
}
 
@Override
public void artificialStateForElement(IN event) throws Exception {
+   if (afterRestoration) {
--- End diff --

As this is a test job, I think it might not hurt to just check every 
element after a restore.


---


[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462589#comment-16462589
 ] 

ASF GitHub Bot commented on FLINK-8978:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5947#discussion_r185840213
  
--- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
 ---
@@ -33,21 +33,28 @@
private static final long serialVersionUID = -1205814329756790916L;
 
private transient ValueState valueState;
+   private transient boolean afterRestoration;
private final TypeSerializer typeSerializer;
private final JoinFunction stateValueGenerator;
+   private final RestoredStateVerifier restoredStateVerifier;
 
public ArtificialValueStateBuilder(
String stateName,
JoinFunction stateValueGenerator,
-   TypeSerializer typeSerializer) {
-
+   TypeSerializer typeSerializer,
+   RestoredStateVerifier restoredStateVerifier) {
super(stateName);
this.typeSerializer = typeSerializer;
this.stateValueGenerator = stateValueGenerator;
+   this.restoredStateVerifier = restoredStateVerifier;
}
 
@Override
public void artificialStateForElement(IN event) throws Exception {
+   if (afterRestoration) {
--- End diff --

I find this way of checking the state rather invasive not completely 
thorough. There is now a pretty tight coupling between creating artificial 
state and checking something about it on restore. In particular, there is a 
hardcoded way now when to check. This makes it harder to reuse the classes in 
further test jobs that we might want to build with them. Can't we use a way 
that is more based on composition? For example, wrap the state builder in a 
state checker? This is also only doing just one check, so if the input element 
has a key that we never encountered, the state is `null` and there might be no 
check.


> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5947: [FLINK-8978] Stateful generic stream job upgrade e...

2018-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5947#discussion_r185840213
  
--- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/artificialstate/eventpayload/ArtificialValueStateBuilder.java
 ---
@@ -33,21 +33,28 @@
private static final long serialVersionUID = -1205814329756790916L;
 
private transient ValueState valueState;
+   private transient boolean afterRestoration;
private final TypeSerializer typeSerializer;
private final JoinFunction stateValueGenerator;
+   private final RestoredStateVerifier restoredStateVerifier;
 
public ArtificialValueStateBuilder(
String stateName,
JoinFunction stateValueGenerator,
-   TypeSerializer typeSerializer) {
-
+   TypeSerializer typeSerializer,
+   RestoredStateVerifier restoredStateVerifier) {
super(stateName);
this.typeSerializer = typeSerializer;
this.stateValueGenerator = stateValueGenerator;
+   this.restoredStateVerifier = restoredStateVerifier;
}
 
@Override
public void artificialStateForElement(IN event) throws Exception {
+   if (afterRestoration) {
--- End diff --

I find this way of checking the state rather invasive not completely 
thorough. There is now a pretty tight coupling between creating artificial 
state and checking something about it on restore. In particular, there is a 
hardcoded way now when to check. This makes it harder to reuse the classes in 
further test jobs that we might want to build with them. Can't we use a way 
that is more based on composition? For example, wrap the state builder in a 
state checker? This is also only doing just one check, so if the input element 
has a key that we never encountered, the state is `null` and there might be no 
check.


---


[jira] [Commented] (FLINK-9245) Can't create a BucketingSink with a provided Configuration if no hadoop defaults

2018-05-03 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462583#comment-16462583
 ] 

Aljoscha Krettek commented on FLINK-9245:
-

I think the problem is that there really is no authority in the path you 
specify, and there is no  default authority. The part between the second and 
third {{/}} is where the authority should be, mostly a hostname. For example 
{{hdfs://my-namenode:50030/user/$USER)}/application_name/}} or 
{{hdfs://localhost:50030/user/$USER)}/application_name/}}.

Does that help?

> Can't create a BucketingSink with a provided Configuration if no hadoop 
> defaults
> 
>
> Key: FLINK-9245
> URL: https://issues.apache.org/jira/browse/FLINK-9245
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.4.2
>Reporter: Julien Cuquemelle
>Priority: Minor
> Fix For: 1.6.0
>
>
> We build Integration tests using this kind of code: 
> {code:java}
> val bucketingSink = new 
> BucketingSink[Row]("hdfs:///user/$USER)}/application_name/")
> bucketingSink.setFSConfig(hadoopRule.getConfiguration.hdfs)
> bucketingSink.setBucketer(new DateTimeBucketer[Row]("-MM-dd--HHmm"))
> outpuStream.addSink(bucketingSink)
> {code}
> Here, the hadoopRule is providing a valid hdfs config that should allows this 
> kind of code to run on a machine with no HADOOP_HOME or HADOOP_CONF_DIR set 
> up, like a developper workstation or a Jenkins slave.
> When running this code on such a machine, the .createHadoopFileSystem(...) 
> fails with 
> {noformat}
> The given file system URI (hdfs:///user/$USER/application_name/) did not 
> describe the authority 
> at 
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:149)
> at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401){noformat}
> because it tries to instantiate the fileSystem from a default configuration 
> in .getUnguardedFileSystem() ; as the default conf doesn't exist, the default 
> filesystem resolves to "file:///" and the checks of the consistency of the 
> URI fails because no authority can be found So the whole filesystem creation 
> fails before actually trying to use the provided config.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9293) SlotPool should check slot id when accepting a slot offer with existing allocation id

2018-05-03 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-9293:

Fix Version/s: 1.5.0

> SlotPool should check slot id when accepting a slot offer with existing 
> allocation id
> -
>
> Key: FLINK-9293
> URL: https://issues.apache.org/jira/browse/FLINK-9293
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>Assignee: shuai.xu
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For flip-6, there may be two or more slot assigned to the same slot 
> allocation. For example, taskExecutor1 register, and assign allocationID1 to 
> its slot1, but from taskExecutor1 side, the registeration timeout, and it 
> register again, RM will fail the allocationID1 and assign slot2 on 
> taskExecutor2 to it. but taskExecutor1 has already accept the allocationID1. 
> So taskExecutor1 and taskExecutor2 both offer slot to jobmaster with the 
> allocationID1. Now slot pool just accept all the slot offer, and this may one 
> slot leak.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462563#comment-16462563
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5950
  
Please also check the travis build, some related tests seem to fail:
Tests in error: 
  
TypeSerializerSerializationUtilTest.testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures:236
 » IO
  
TypeSerializerSerializationUtilTest.testSerializerSerializationWithClassNotFound:109
 » IO
  
TypeSerializerSerializationUtilTest.testSerializerSerializationWithInvalidClass:149
 » IO
  PojoSerializerTest.testSerializerSerializationFailureResilience:570 » IO


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5950: [FLINK-9169] [state-backend] Allow absence of old seriali...

2018-05-03 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5950
  
Please also check the travis build, some related tests seem to fail:
Tests in error: 
  
TypeSerializerSerializationUtilTest.testSerializerAndConfigPairsSerializationWithSerializerDeserializationFailures:236
 » IO
  
TypeSerializerSerializationUtilTest.testSerializerSerializationWithClassNotFound:109
 » IO
  
TypeSerializerSerializationUtilTest.testSerializerSerializationWithInvalidClass:149
 » IO
  PojoSerializerTest.testSerializerSerializationFailureResilience:570 » 
IO


---


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462554#comment-16462554
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185828137
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

The problem is that this method might mix too many things together, that is 
also again visible in the complex return type and e.g. many call sites are only 
interested in the first element of the list. Wonder if we should break this up 
in dedicated steps (serializer, config) and let the callers invoke them one by 
one, so that we can handle exceptions on a higher level and make decisions 
about if we need to have a serializer there.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

2018-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185828137
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

The problem is that this method might mix too many things together, that is 
also again visible in the complex return type and e.g. many call sites are only 
interested in the first element of the list. Wonder if we should break this up 
in dedicated steps (serializer, config) and let the callers invoke them one by 
one, so that we can handle exceptions on a higher level and make decisions 
about if we need to have a serializer there.


---


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462547#comment-16462547
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185825767
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

This could be the place where we catch a `UnloadableSerializerException`, 
but if we let the caller do the iteration from 0 to 
`numSerializersAndConfigSnapshots`, we can push it out even more. Why is it 
helpful to create a list here? Otherwise we can do the exception handling in 
the caller and more fine grained.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

2018-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185825767
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -200,7 +197,7 @@ public static void 
writeSerializersAndConfigsWithResilience(
for (int i = 0; i < numSerializersAndConfigSnapshots; 
i++) {
 
bufferWithPos.setPosition(offsets[i * 2]);
-   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader);
+   serializer = tryReadSerializer(bufferWrapper, 
userCodeClassLoader, true);
--- End diff --

This could be the place where we catch a `UnloadableSerializerException`, 
but if we let the caller do the iteration from 0 to 
`numSerializersAndConfigSnapshots`, we can push it out even more. Why is it 
helpful to create a list here? Otherwise we can do the exception handling in 
the caller and more fine grained.


---


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462534#comment-16462534
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185821296
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -373,15 +370,14 @@ public void read(DataInputView in) throws IOException 
{
 

Thread.currentThread().setContextClassLoader(userClassLoader);
typeSerializer = (TypeSerializer) 
ois.readObject();
-   } catch (ClassNotFoundException | InvalidClassException 
e) {
+   } catch (Exception e) {
if (useDummyPlaceholder) {
// we create a dummy so that all the 
information is not lost when we get a new checkpoint before receiving
// a proper typeserializer from the user
-   typeSerializer =
-   new 
UnloadableDummyTypeSerializer<>(buffer);
-   LOG.warn("Could not find requested 
TypeSerializer class in classpath. Created dummy.", e);
+   typeSerializer = new 
UnloadableDummyTypeSerializer<>(buffer);
--- End diff --

Some food for thought, even if it is not introduced by this PR: why can we 
not introduce a special `UnloadableSerializerException extends IOException` 
that holds a field  with the byte array in `buffer` and let it bubble up to a 
higher level component. If that component wants to introduce dummies, it can do 
some from the bytes in the caught exception, if not forward the exception. Then 
we would not have to hand down this flag but let the higher level component 
decide. What do you think?


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the 

[GitHub] flink pull request #5923: [FLINK-9253][network] make the maximum floating bu...

2018-05-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5923#discussion_r185817506
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -228,20 +228,19 @@ public void setupPartition(ResultPartition partition) 
throws IOException {
@VisibleForTesting
public void setupInputGate(SingleInputGate gate) throws IOException {
BufferPool bufferPool = null;
-   int maxNumberOfMemorySegments;
try {
if (enableCreditBased) {
-   maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   extraNetworkBuffersPerGate : 
Integer.MAX_VALUE;
-
// assign exclusive buffers to input channels 
directly and use the rest for floating buffers
-   gate.assignExclusiveSegments(networkBufferPool, 
networkBuffersPerChannel);
-   bufferPool = 
networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
+   int nrExclusiveMemorySegments = 
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
--- End diff --

Please, no abbreviations like `nrExclusiveMemorySegments`. 
`assignedExclusiveMemorySegments`? `exclusiveMemorySegments`?


---


[jira] [Commented] (FLINK-9253) Make buffer count per InputGate always #channels*buffersPerChannel + ExclusiveBuffers

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462535#comment-16462535
 ] 

ASF GitHub Bot commented on FLINK-9253:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5923#discussion_r185819830
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -228,20 +228,19 @@ public void setupPartition(ResultPartition partition) 
throws IOException {
@VisibleForTesting
public void setupInputGate(SingleInputGate gate) throws IOException {
BufferPool bufferPool = null;
-   int maxNumberOfMemorySegments;
try {
if (enableCreditBased) {
-   maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   extraNetworkBuffersPerGate : 
Integer.MAX_VALUE;
-
// assign exclusive buffers to input channels 
directly and use the rest for floating buffers
-   gate.assignExclusiveSegments(networkBufferPool, 
networkBuffersPerChannel);
-   bufferPool = 
networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
+   int nrExclusiveMemorySegments = 
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
+   int maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
+   gate.getNumberOfInputChannels() * 
networkBuffersPerChannel +
+   extraNetworkBuffersPerGate - 
nrExclusiveMemorySegments : Integer.MAX_VALUE;
+   bufferPool = networkBufferPool
+   .createBufferPool(0, 
maxNumberOfMemorySegments);
--- End diff --

I think that express this way:
```
if (enableCreditBased) {
int desiredMaxNumberOfMemorySegments = gate.getNumberOfInputChannels() 
* networkBuffersPerChannel + extraNetworkBuffersPerGate;
int assignedExclusiveMemorySegments = 
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
int floatingMemorySegments = desiredMaxNumberOfMemorySegments - 
assignedExclusiveMemorySegments;

bufferPool = networkBufferPool
.createBufferPool(0, 
gate.getConsumedPartitionType().isBounded() ? floatingMemorySegments : 
Integer.MAX_VALUE);
}
```

it's more easier to understand and allows us to skip the redundant comment. 
Especially current `maxNumberOfMemorySegments` is strange name.


> Make buffer count per InputGate always #channels*buffersPerChannel + 
> ExclusiveBuffers
> -
>
> Key: FLINK-9253
> URL: https://issues.apache.org/jira/browse/FLINK-9253
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.5.0
>
>
> The credit-based flow control path assigns exclusive buffers only to remote 
> channels (which makes sense since local channels don't use any own buffers). 
> However, this is a bit intransparent with respect to how much data may be in 
> buffers since this depends on the actual schedule of the job and not the job 
> graph.
> By adapting the floating buffers to use a maximum of 
> {{#channels*buffersPerChannel + floatingBuffersPerGate - #exclusiveBuffers}}, 
> we would be channel-type agnostic and keep the old behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9253) Make buffer count per InputGate always #channels*buffersPerChannel + ExclusiveBuffers

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462536#comment-16462536
 ] 

ASF GitHub Bot commented on FLINK-9253:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5923#discussion_r185817506
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -228,20 +228,19 @@ public void setupPartition(ResultPartition partition) 
throws IOException {
@VisibleForTesting
public void setupInputGate(SingleInputGate gate) throws IOException {
BufferPool bufferPool = null;
-   int maxNumberOfMemorySegments;
try {
if (enableCreditBased) {
-   maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   extraNetworkBuffersPerGate : 
Integer.MAX_VALUE;
-
// assign exclusive buffers to input channels 
directly and use the rest for floating buffers
-   gate.assignExclusiveSegments(networkBufferPool, 
networkBuffersPerChannel);
-   bufferPool = 
networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
+   int nrExclusiveMemorySegments = 
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
--- End diff --

Please, no abbreviations like `nrExclusiveMemorySegments`. 
`assignedExclusiveMemorySegments`? `exclusiveMemorySegments`?


> Make buffer count per InputGate always #channels*buffersPerChannel + 
> ExclusiveBuffers
> -
>
> Key: FLINK-9253
> URL: https://issues.apache.org/jira/browse/FLINK-9253
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Major
> Fix For: 1.5.0
>
>
> The credit-based flow control path assigns exclusive buffers only to remote 
> channels (which makes sense since local channels don't use any own buffers). 
> However, this is a bit intransparent with respect to how much data may be in 
> buffers since this depends on the actual schedule of the job and not the job 
> graph.
> By adapting the floating buffers to use a maximum of 
> {{#channels*buffersPerChannel + floatingBuffersPerGate - #exclusiveBuffers}}, 
> we would be channel-type agnostic and keep the old behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5923: [FLINK-9253][network] make the maximum floating bu...

2018-05-03 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5923#discussion_r185819830
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 ---
@@ -228,20 +228,19 @@ public void setupPartition(ResultPartition partition) 
throws IOException {
@VisibleForTesting
public void setupInputGate(SingleInputGate gate) throws IOException {
BufferPool bufferPool = null;
-   int maxNumberOfMemorySegments;
try {
if (enableCreditBased) {
-   maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
-   extraNetworkBuffersPerGate : 
Integer.MAX_VALUE;
-
// assign exclusive buffers to input channels 
directly and use the rest for floating buffers
-   gate.assignExclusiveSegments(networkBufferPool, 
networkBuffersPerChannel);
-   bufferPool = 
networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
+   int nrExclusiveMemorySegments = 
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
+   int maxNumberOfMemorySegments = 
gate.getConsumedPartitionType().isBounded() ?
+   gate.getNumberOfInputChannels() * 
networkBuffersPerChannel +
+   extraNetworkBuffersPerGate - 
nrExclusiveMemorySegments : Integer.MAX_VALUE;
+   bufferPool = networkBufferPool
+   .createBufferPool(0, 
maxNumberOfMemorySegments);
--- End diff --

I think that express this way:
```
if (enableCreditBased) {
int desiredMaxNumberOfMemorySegments = gate.getNumberOfInputChannels() 
* networkBuffersPerChannel + extraNetworkBuffersPerGate;
int assignedExclusiveMemorySegments = 
gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
int floatingMemorySegments = desiredMaxNumberOfMemorySegments - 
assignedExclusiveMemorySegments;

bufferPool = networkBufferPool
.createBufferPool(0, 
gate.getConsumedPartitionType().isBounded() ? floatingMemorySegments : 
Integer.MAX_VALUE);
}
```

it's more easier to understand and allows us to skip the redundant comment. 
Especially current `maxNumberOfMemorySegments` is strange name.


---


[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

2018-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185821296
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -373,15 +370,14 @@ public void read(DataInputView in) throws IOException 
{
 

Thread.currentThread().setContextClassLoader(userClassLoader);
typeSerializer = (TypeSerializer) 
ois.readObject();
-   } catch (ClassNotFoundException | InvalidClassException 
e) {
+   } catch (Exception e) {
if (useDummyPlaceholder) {
// we create a dummy so that all the 
information is not lost when we get a new checkpoint before receiving
// a proper typeserializer from the user
-   typeSerializer =
-   new 
UnloadableDummyTypeSerializer<>(buffer);
-   LOG.warn("Could not find requested 
TypeSerializer class in classpath. Created dummy.", e);
+   typeSerializer = new 
UnloadableDummyTypeSerializer<>(buffer);
--- End diff --

Some food for thought, even if it is not introduced by this PR: why can we 
not introduce a special `UnloadableSerializerException extends IOException` 
that holds a field  with the byte array in `buffer` and let it bubble up to a 
higher level component. If that component wants to introduce dummies, it can do 
some from the bytes in the caught exception, if not forward the exception. Then 
we would not have to hand down this flag but let the higher level component 
decide. What do you think?


---


[jira] [Commented] (FLINK-9088) Upgrade Nifi connector dependency to 1.6.0

2018-05-03 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9088?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462530#comment-16462530
 ] 

Ted Yu commented on FLINK-9088:
---

lgtm

> Upgrade Nifi connector dependency to 1.6.0
> --
>
> Key: FLINK-9088
> URL: https://issues.apache.org/jira/browse/FLINK-9088
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming Connectors
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
>
> Currently dependency of Nifi is 0.6.1
> We should upgrade to 1.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8554) Upgrade AWS SDK

2018-05-03 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-8554:
--
Description: 
AWS SDK 1.11.271 fixes a lot of bugs.

One of which would exhibit the following:

{code}
Caused by: java.lang.NullPointerException
at com.amazonaws.metrics.AwsSdkMetrics.getRegion(AwsSdkMetrics.java:729)
at com.amazonaws.metrics.MetricAdmin.getRegion(MetricAdmin.java:67)
at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
{code}

  was:
AWS SDK 1.11.271 fixes a lot of bugs.

One of which would exhibit the following:
{code}
Caused by: java.lang.NullPointerException
at com.amazonaws.metrics.AwsSdkMetrics.getRegion(AwsSdkMetrics.java:729)
at com.amazonaws.metrics.MetricAdmin.getRegion(MetricAdmin.java:67)
at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
{code}


> Upgrade AWS SDK
> ---
>
> Key: FLINK-8554
> URL: https://issues.apache.org/jira/browse/FLINK-8554
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Minor
>
> AWS SDK 1.11.271 fixes a lot of bugs.
> One of which would exhibit the following:
> {code}
> Caused by: java.lang.NullPointerException
>   at com.amazonaws.metrics.AwsSdkMetrics.getRegion(AwsSdkMetrics.java:729)
>   at com.amazonaws.metrics.MetricAdmin.getRegion(MetricAdmin.java:67)
>   at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9236) Use Apache Parent POM 19

2018-05-03 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9236:
--
Description: 
Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out.

This will also fix Javadoc generation with JDK 10+

  was:Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out. 
This will also fix Javadoc generation with JDK 10+


> Use Apache Parent POM 19
> 
>
> Key: FLINK-9236
> URL: https://issues.apache.org/jira/browse/FLINK-9236
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: mingleizhang
>Priority: Major
>
> Flink is still using Apache Parent POM 18. Apache Parent POM 19 is out.
> This will also fix Javadoc generation with JDK 10+



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8978) End-to-end test: Job upgrade

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8978?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462520#comment-16462520
 ] 

ASF GitHub Bot commented on FLINK-8978:
---

Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/5947
  
Thanks for review and good points @StefanRRichter 
I updated the PR to address the comments. 
The resume state e2e test also checks operator <-> state correspondence 
upon state restoration now.


> End-to-end test: Job upgrade
> 
>
> Key: FLINK-8978
> URL: https://issues.apache.org/jira/browse/FLINK-8978
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Andrey Zagrebin
>Priority: Blocker
> Fix For: 1.6.0, 1.5.1
>
>
> Job upgrades usually happen during the lifetime of a real world Flink job. 
> Therefore, we should add an end-to-end test which exactly covers this 
> scenario. I suggest to do the follwoing:
> # run the general purpose testing job FLINK-8971
> # take a savepoint
> # Modify the job by introducing a new operator and changing the order of 
> others
> # Resume the modified job from the savepoint
> # Verify that everything went correctly



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5947: [FLINK-8978] Stateful generic stream job upgrade e2e test

2018-05-03 Thread azagrebin
Github user azagrebin commented on the issue:

https://github.com/apache/flink/pull/5947
  
Thanks for review and good points @StefanRRichter 
I updated the PR to address the comments. 
The resume state e2e test also checks operator <-> state correspondence 
upon state restoration now.


---


[jira] [Commented] (FLINK-9287) KafkaProducer011 seems to leak threads when not in exactly-once mode

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462480#comment-16462480
 ] 

ASF GitHub Bot commented on FLINK-9287:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5952

[FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE 
FlinkKafkaProducer011

Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and 
NONE Semantics when closing FlinkKafkaProducer011. This was leading to 
resources leaking (for example increasing number of active threads). 

## Verifying this change

This bug fix might be hard to test automatically. This PR adds a new test 
proposal in separate commit, however it might be flaky if there are other tests 
being executed in parallel.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f9287

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5952.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5952


commit c9139a90dd6a2afb25a4eb3102e1abedf90d8f5f
Author: Piotr Nowojski 
Date:   2018-05-03T13:50:53Z

[FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE 
FlinkKafkaProducer011

Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and 
NONE Semantics
when closing FlinkKafkaProducer011. This was leading to resources leaking 
(for example
increasing number of active threads)

commit 5a1d0962f3fb92a87ee809b5a09106f5c4d05caf
Author: Piotr Nowojski 
Date:   2018-05-03T13:53:40Z

[FLINK-9287][kafka] Ensure threads count do not grow in 
FlinkKafkaProducer011




> KafkaProducer011 seems to leak threads when not in exactly-once mode
> 
>
> Key: FLINK-9287
> URL: https://issues.apache.org/jira/browse/FLINK-9287
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Christopher Ng
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread}} 
> threads.  As far as I can tell it happens when it is not in EXACTLY_ONCE 
> mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, 
> even when the {{FlinkKafkaProducer011}} itself is closed.
> I observed this when running a local cluster and submitting and then 
> cancelling a job, a lot of kafka threads were left alive afterwards.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9287) KafkaProducer011 seems to leak threads when not in exactly-once mode

2018-05-03 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9287?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-9287:
-

Assignee: Piotr Nowojski

> KafkaProducer011 seems to leak threads when not in exactly-once mode
> 
>
> Key: FLINK-9287
> URL: https://issues.apache.org/jira/browse/FLINK-9287
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Christopher Ng
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {{KafkaProducer011}} appears to be leaking {{kafka-producer-network-thread}} 
> threads.  As far as I can tell it happens when it is not in EXACTLY_ONCE 
> mode, it seems that it creates a {{FlinkKafkaProducer}} but never closes it, 
> even when the {{FlinkKafkaProducer011}} itself is closed.
> I observed this when running a local cluster and submitting and then 
> cancelling a job, a lot of kafka threads were left alive afterwards.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5952: [FLINK-9287][kafka] Properly clean up resources in...

2018-05-03 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5952

[FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE 
FlinkKafkaProducer011

Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and 
NONE Semantics when closing FlinkKafkaProducer011. This was leading to 
resources leaking (for example increasing number of active threads). 

## Verifying this change

This bug fix might be hard to test automatically. This PR adds a new test 
proposal in separate commit, however it might be flaky if there are other tests 
being executed in parallel.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f9287

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5952.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5952


commit c9139a90dd6a2afb25a4eb3102e1abedf90d8f5f
Author: Piotr Nowojski 
Date:   2018-05-03T13:50:53Z

[FLINK-9287][kafka] Properly clean up resources in non EXACTLY_ONCE 
FlinkKafkaProducer011

Previously FlinkKafkaProducer was not being closed for AT_LEAST_ONCE and 
NONE Semantics
when closing FlinkKafkaProducer011. This was leading to resources leaking 
(for example
increasing number of active threads)

commit 5a1d0962f3fb92a87ee809b5a09106f5c4d05caf
Author: Piotr Nowojski 
Date:   2018-05-03T13:53:40Z

[FLINK-9287][kafka] Ensure threads count do not grow in 
FlinkKafkaProducer011




---


[jira] [Assigned] (FLINK-9284) Update CLI page

2018-05-03 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang reassigned FLINK-9284:
---

Assignee: Triones Deng

> Update CLI page
> ---
>
> Key: FLINK-9284
> URL: https://issues.apache.org/jira/browse/FLINK-9284
> Project: Flink
>  Issue Type: Improvement
>  Components: Client, Documentation
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Triones Deng
>Priority: Critical
> Fix For: 1.5.0
>
>
> The [CLI|https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html] 
> page must be updated for 1.5.
> The 
> [examples|https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#examples]
>  using the {{-m}} option must be updated to use {{8081}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6

2018-05-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8900.
---

> YARN FinalStatus always shows as KILLED with Flip-6
> ---
>
> Key: FLINK-8900
> URL: https://issues.apache.org/jira/browse/FLINK-8900
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Whenever I run a simple simple word count like this one on YARN with Flip-6 
> enabled,
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns 
> even though the program ran successfully like this one (irrespective of 
> FLINK-8899 occurring or not):
> {code}
> 2018-03-08 16:48:39,049 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:48:39,078 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED.
> 2018-03-08 16:48:39,151 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager.
> 2018-03-08 16:48:39,221 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(11a794d2f5dc2955d8015625ec300c20).
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> 43f725adaee14987d3ff99380701f52f: JobManager is shutting down..
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0
>  for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:48:39,775 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager.
> 2018-03-08 16:48:39,846 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager.
> 2018-03-08 16:48:39,876 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:48:39,910 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager.
> 2018-03-08 16:48:39,942 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager.
> 2018-03-08 16:48:39,974 INFO  org.apache.flink.runtime.blob.BlobServer
>   - Stopped BLOB server at 0.0.0.0:46511
> 2018-03-08 16:48:39,975 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down 
> BLOB cache
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6

2018-05-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8900.
-
Resolution: Fixed
  Assignee: Stephan Ewen  (was: Gary Yao)

Fixed in
  - 1.5.0 via 5b26718404a89744fd4ddcdd963a712ec581222c
  - 1.6.0 via 545d530747067c805071935ec2bd62083299164b

> YARN FinalStatus always shows as KILLED with Flip-6
> ---
>
> Key: FLINK-8900
> URL: https://issues.apache.org/jira/browse/FLINK-8900
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Stephan Ewen
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Whenever I run a simple simple word count like this one on YARN with Flip-6 
> enabled,
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns 
> even though the program ran successfully like this one (irrespective of 
> FLINK-8899 occurring or not):
> {code}
> 2018-03-08 16:48:39,049 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:48:39,078 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED.
> 2018-03-08 16:48:39,151 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager.
> 2018-03-08 16:48:39,221 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(11a794d2f5dc2955d8015625ec300c20).
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> 43f725adaee14987d3ff99380701f52f: JobManager is shutting down..
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0
>  for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:48:39,775 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager.
> 2018-03-08 16:48:39,846 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager.
> 2018-03-08 16:48:39,876 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:48:39,910 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager.
> 2018-03-08 16:48:39,942 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager.
> 2018-03-08 16:48:39,974 INFO  org.apache.flink.runtime.blob.BlobServer
>   - Stopped BLOB server at 0.0.0.0:46511
> 2018-03-08 16:48:39,975 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down 
> BLOB cache
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9288) clarify a few points in the event time / watermark docs

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462401#comment-16462401
 ] 

ASF GitHub Bot commented on FLINK-9288:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5949


> clarify a few points in the event time / watermark docs
> ---
>
> Key: FLINK-9288
> URL: https://issues.apache.org/jira/browse/FLINK-9288
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>Priority: Minor
> Fix For: 1.5.0
>
>
> There are a few things that folks often seem to miss when reading the event 
> time and watermark docs. Adding a couple of sentences and a couple of links 
> should help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8900) YARN FinalStatus always shows as KILLED with Flip-6

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462402#comment-16462402
 ] 

ASF GitHub Bot commented on FLINK-8900:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5944


> YARN FinalStatus always shows as KILLED with Flip-6
> ---
>
> Key: FLINK-8900
> URL: https://issues.apache.org/jira/browse/FLINK-8900
> Project: Flink
>  Issue Type: Bug
>  Components: YARN
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Nico Kruber
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Whenever I run a simple simple word count like this one on YARN with Flip-6 
> enabled,
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> it will show up as {{KILLED}} in the {{State}} and {{FinalStatus}} columns 
> even though the program ran successfully like this one (irrespective of 
> FLINK-8899 occurring or not):
> {code}
> 2018-03-08 16:48:39,049 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (11a794d2f5dc2955d8015625ec300c20) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job 11a794d2f5dc2955d8015625ec300c20
> 2018-03-08 16:48:39,050 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:48:39,078 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> 11a794d2f5dc2955d8015625ec300c20 reached globally terminal state FINISHED.
> 2018-03-08 16:48:39,151 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager e58efd886429e8f080815ea74ddfa734 at the SlotManager.
> 2018-03-08 16:48:39,221 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(11a794d2f5dc2955d8015625ec300c20).
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> 43f725adaee14987d3ff99380701f52f: JobManager is shutting down..
> 2018-03-08 16:48:39,270 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-7-0.eu-west-1.compute.internal:34281/user/jobmanager_0
>  for job 11a794d2f5dc2955d8015625ec300c20 from the resource manager.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:48:39,349 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:48:39,775 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 4e1fb6c8f95685e24b6a4cb4b71ffb92 at the SlotManager.
> 2018-03-08 16:48:39,846 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b5bce0bdfa7fbb0f4a0905cc3ee1c233 at the SlotManager.
> 2018-03-08 16:48:39,876 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:48:39,910 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager a35b0690fdc6ec38bbcbe18a965000fd at the SlotManager.
> 2018-03-08 16:48:39,942 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 5175cabe428bea19230ac056ff2a17bb at the SlotManager.
> 2018-03-08 16:48:39,974 INFO  org.apache.flink.runtime.blob.BlobServer
>   - Stopped BLOB server at 0.0.0.0:46511
> 2018-03-08 16:48:39,975 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache  - Shutting down 
> BLOB cache
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9288) clarify a few points in the event time / watermark docs

2018-05-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-9288.
-
   Resolution: Fixed
Fix Version/s: (was: 1.6.0)

Fixed in
  - 1.5.0 via e06d36ae17dfd9e830d4f4729996cf2939fd5f75
  - 1.6.0 via 90855b638caebd1032e699528a3e0bd232b7c95a

> clarify a few points in the event time / watermark docs
> ---
>
> Key: FLINK-9288
> URL: https://issues.apache.org/jira/browse/FLINK-9288
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>Priority: Minor
> Fix For: 1.5.0
>
>
> There are a few things that folks often seem to miss when reading the event 
> time and watermark docs. Adding a couple of sentences and a couple of links 
> should help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9288) clarify a few points in the event time / watermark docs

2018-05-03 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-9288.
---

> clarify a few points in the event time / watermark docs
> ---
>
> Key: FLINK-9288
> URL: https://issues.apache.org/jira/browse/FLINK-9288
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: David Anderson
>Assignee: David Anderson
>Priority: Minor
> Fix For: 1.5.0
>
>
> There are a few things that folks often seem to miss when reading the event 
> time and watermark docs. Adding a couple of sentences and a couple of links 
> should help.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5944: [FLINK-8900] [yarn] Set correct application status...

2018-05-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5944


---


[GitHub] flink pull request #5949: [FLINK-9288][docs] clarify the event time / waterm...

2018-05-03 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5949


---


[jira] [Commented] (FLINK-9235) Add Integration test for Flink-Yarn-Kerberos integration for flip-6

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462393#comment-16462393
 ] 

ASF GitHub Bot commented on FLINK-9235:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5901
  
@suez1224 Ah, I think the reason this test passes even without your fix in 
#5896 is that the test doesn't really test submission to a YARN cluster on 
different machines. The TaskManager runner will pick up the path of the key tab 
that exists on the local filesystem.


> Add Integration test for Flink-Yarn-Kerberos integration for flip-6
> ---
>
> Key: FLINK-9235
> URL: https://issues.apache.org/jira/browse/FLINK-9235
> Project: Flink
>  Issue Type: Test
>Affects Versions: 1.5.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> We need to provide an integration test for flip-6 similar to 
> YARNSessionFIFOSecuredITCase for the legacy deployment mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5901: [FLINK-9235][Security] Add integration tests for YARN ker...

2018-05-03 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5901
  
@suez1224 Ah, I think the reason this test passes even without your fix in 
#5896 is that the test doesn't really test submission to a YARN cluster on 
different machines. The TaskManager runner will pick up the path of the key tab 
that exists on the local filesystem.


---


[jira] [Commented] (FLINK-9207) Client returns SUCCESS(0) return code for canceled job

2018-05-03 Thread Amit Jain (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462366#comment-16462366
 ] 

Amit Jain commented on FLINK-9207:
--

[~aljoscha] I've not got the time to check on 1.4.x however, I tested on 
Version: 1.3.2 and found correct return code i.e 1

> Client returns SUCCESS(0) return code for canceled job
> --
>
> Key: FLINK-9207
> URL: https://issues.apache.org/jira/browse/FLINK-9207
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.5.0
> Environment: Version: 1.5.0, Commit : 2af481a
>Reporter: Amit Jain
>Priority: Major
> Fix For: 1.6.0
>
>
> Flink Client returns zero return code when a job is deliberately canceled. 
> Steps to reproduced it:
> 1. bin/flink run -p 10 -m yarn-cluster -yjm 1024 -ytm 12288 WordCount.jar
> 2. User externally canceled the job.
> 3. Job Manager marked the job as CANCELED.
> 4. Although client code emits following logs, still returns zero return code.
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Killed 
> application application_1523726493647_.
> Job scheduler like Airflow would have hard-time detecting whether the 
> submitted job was canceled or not. 
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8500) Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)

2018-05-03 Thread Fred Teunissen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462345#comment-16462345
 ] 

Fred Teunissen commented on FLINK-8500:
---

[~aljoscha] I have some time that I can spend on this, so yes I'm interested. I 
can also update the current PR with the changes unless you think it's better to 
make a new fresh PR for this.

There are now 3 possible approaches.
 # Kafka {{ConsumerRecord}} as parameter in the Flink API
 # a new {{ConsumerRecordMetaInfo}} class as parameter for the {{deserialize}} 
method
 # extend the interface {{KeyedDeserializationSchema}} with a new method, 
although I don't know how to give an interface default behavior in java.

I'm leaning to almost the same approach I used now, creating a new separate 
interface {{RichDeserializationSchema}} with a {{deserialize}} method with a 
{{ConsumerRecordMetaInfo}} parameter. Also create a 
{{RichDeserializationSchemaWrapper}} for implementing the current API for 
backwards compatibility

Do you think this is the right approach?

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---
>
> Key: FLINK-8500
> URL: https://issues.apache.org/jira/browse/FLINK-8500
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: yanxiaobin
>Priority: Major
> Fix For: 1.6.0
>
> Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462341#comment-16462341
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185775633
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -589,7 +589,7 @@ private void restoreKeyGroupsInStateHandle()
private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
 
KeyedBackendSerializationProxy serializationProxy =
-   new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
+   new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, 
false);
--- End diff --

Maybe a small comment on this line why we can tolerate the absence of the 
serializer is helpful for future maintenance. And a matching comment for the 
other option on the corresponding line in the heap backend.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

2018-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185775633
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -589,7 +589,7 @@ private void restoreKeyGroupsInStateHandle()
private void restoreKVStateMetaData() throws IOException, 
StateMigrationException, RocksDBException {
 
KeyedBackendSerializationProxy serializationProxy =
-   new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
+   new 
KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader, 
false);
--- End diff --

Maybe a small comment on this line why we can tolerate the absence of the 
serializer is helpful for future maintenance. And a matching comment for the 
other option on the corresponding line in the heap backend.


---


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462326#comment-16462326
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185772501
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/ArtificialCNFErrorThrowingClassLoader.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.testutils;
+
+import java.util.Set;
+
+/**
+ * Utility classloader used in tests that allows simulating {@link 
ClassNotFoundException}s for specific classes.
+ */
+public class ArtificialCNFErrorThrowingClassLoader extends ClassLoader {
--- End diff --

`ArtificialCNFExceptionThrowingClassLoader` might be a better fit


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from 

[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

2018-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185772501
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/ArtificialCNFErrorThrowingClassLoader.java
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.testutils;
+
+import java.util.Set;
+
+/**
+ * Utility classloader used in tests that allows simulating {@link 
ClassNotFoundException}s for specific classes.
+ */
+public class ArtificialCNFErrorThrowingClassLoader extends ClassLoader {
--- End diff --

`ArtificialCNFExceptionThrowingClassLoader` might be a better fit


---


[jira] [Commented] (FLINK-9169) NPE when restoring from old savepoint and state serializer could not be deserialized

2018-05-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16462321#comment-16462321
 ] 

ASF GitHub Bot commented on FLINK-9169:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185771830
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -69,7 +69,7 @@
 * written using {@link #writeSerializer(DataOutputView, 
TypeSerializer)}.
 *
 * If deserialization fails for any reason (corrupted serializer 
bytes, serializer class
-* no longer in classpath, serializer class no longer valid, etc.), 
{@code null} will
+* no longer in classpath, serializer class no longer valid, etc.), an 
{@link IOException} is thrown.
--- End diff --

Comment in the next line should be deleted, looks like leftover from the 
copy-paste.


> NPE when restoring from old savepoint and state serializer could not be 
> deserialized
> 
>
> Key: FLINK-9169
> URL: https://issues.apache.org/jira/browse/FLINK-9169
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.5.0
>
>
> A user reported to have observed the following exception when restoring a 
> Flink job from a 1.3 savepoint with Flink 1.4.
> {code}
> 2018-04-02 21:44:18,146 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - ApplyAMUpdate (13/160) (7248adb0b85f4458ae4144963d65
> 6fa6) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.NullPointerException
> at 
> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
> at 
> org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo.(RegisteredKeyedBackendStateMetaInfo.java:53)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateB
> ackend.java:1216)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBKeye
> dStateBackend.java:1153)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullRestoreOperation.doRestore(RocksDBKeyedStateBackend.java:1
> 139)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.restore(RocksDBKeyedStateBackend.java:1034)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:773)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:283)
> ... 6 more
> {code}
> Looking at the {{KeyedBackendStateMetaInfoSnapshotReaderWriters}}, we create 
> {{RegisteredKeyedBackendStateMetaInfo.Snapshot}} where the 
> {{stateSerializer}} can be {{null}}. This is not the problem, however, in 
> {{RocksDBKeyedStateBackend#restoreKVStateMetaData}} we create a 
> {{RegisteredKeyedBackendStateMetaInfo}} from the deserialized {{Snapshot}} 
> where we null check the state serializer. This will then fail with an 
> indescriptive NPE.
> I think the same should happen when resuming with Flink 1.5 from a 1.4 
> savepoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5950: [FLINK-9169] [state-backend] Allow absence of old ...

2018-05-03 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5950#discussion_r185771830
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationUtil.java
 ---
@@ -69,7 +69,7 @@
 * written using {@link #writeSerializer(DataOutputView, 
TypeSerializer)}.
 *
 * If deserialization fails for any reason (corrupted serializer 
bytes, serializer class
-* no longer in classpath, serializer class no longer valid, etc.), 
{@code null} will
+* no longer in classpath, serializer class no longer valid, etc.), an 
{@link IOException} is thrown.
--- End diff --

Comment in the next line should be deleted, looks like leftover from the 
copy-paste.


---


  1   2   3   >