[jira] [Commented] (FLINK-9643) Flink allowing TLS 1.1 in spite of configuring TLS 1.2

2018-07-05 Thread Viktor Vlasov (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1653#comment-1653
 ] 

Viktor Vlasov commented on FLINK-9643:
--

Thank you for the comment, [~StephanEwen]. 
Actually, if talk about what exactly port was checked, I forgot to mention that 
I used TM data port (configured in taskmanager.data.port parameter). It's 
because I used a script to check all cases at once, to simplify the logic of it 
I used the same port each time. 

For another port I haven't found config option yet, maybe it will require to 
check it manually, but, I'm wondering, whether we have a single point of SSL 
configuration? If that is true, and what you have shown by links works for each 
port similar way, then maybe it's not necessary (correct me if I'm wrong).

Anyway, taking all information (including your links), I think I will perform 
some experiments with pure Java configuration SSL connection, maybe with only 
Akka to understand the background and be able to tell how it behaves on this 
level.

> Flink allowing TLS 1.1 in spite of configuring TLS 1.2
> --
>
> Key: FLINK-9643
> URL: https://issues.apache.org/jira/browse/FLINK-9643
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: Vinay
>Assignee: Viktor Vlasov
>Priority: Major
> Attachments: result.csv
>
>
> I have deployed Flink 1.3.2 and enabled SSL settings. From the ssl debug 
> logs it shows that Flink is using TLSv1.2. However based on the security 
> scans we have observed that it also allows TLSv1.0 and TLSv1.1. 
>   
> In order to strictly use TLSv1.2 we have updated the following property of 
> java.security file: 
> jdk.tls.disabledAlgorithms=MD5, SSLv3, DSA, RSA keySize < 2048, TLSv1, 
> TLSv1.1 
> But still it allows TLSv1.1 , verified this by hitting the following command 
> from master node: 
> openssl s_client -connect taskmanager1: -tls1 
> (here listening_address_port is part of 
> akka.ssl.tcp://flink@taskmanager1:port/user/taskmanager) 
> Now, when I hit the above command for the data port, it does not allow 
> TLSv1.1 and only allows TLSv1.2 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200251500
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

Currently `notifyBufferDestroyed` will do nothing, and we should be careful 
if implement this method future similar with `notifyBufferAvailable`.


---


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200252253
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

I am wondering whether we should rethrow this exception under below 
handling in the end.

For example: During `RemoteInputChannel#notifyBufferAvailable`, if the tag 
of  `isWaitingForFloatingBuffers` is not consistent, we should throw this 
exception to trigger failover, otherwise we can not find the potential bug.


---


[jira] [Commented] (FLINK-9676) Deadlock during canceling task and recycling exclusive buffer

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1655#comment-1655
 ] 

ASF GitHub Bot commented on FLINK-9676:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200251500
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

Currently `notifyBufferDestroyed` will do nothing, and we should be careful 
if implement this method future similar with `notifyBufferAvailable`.


> Deadlock during canceling task and recycling exclusive buffer
> -
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}} > {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside 
> the {{synchronized(availableMemorySegments) in }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle 
> can cover this case but the deadlock probability is very low, so this UT is 
> not stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9676) Deadlock during canceling task and recycling exclusive buffer

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1656#comment-1656
 ] 

ASF GitHub Bot commented on FLINK-9676:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200252253
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

I am wondering whether we should rethrow this exception under below 
handling in the end.

For example: During `RemoteInputChannel#notifyBufferAvailable`, if the tag 
of  `isWaitingForFloatingBuffers` is not consistent, we should throw this 
exception to trigger failover, otherwise we can not find the potential bug.


> Deadlock during canceling task and recycling exclusive buffer
> -
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}} > {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside 
> the {{synchronized(availableMemorySegments) in }}{{L

[GitHub] flink issue #6257: [FLINK-9676][network] clarify contracts of BufferListener...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/6257
  
Thanks for fixing this bug, and we also solve this problem in this way. 

This solution seems more lightweight than the way in 
[6254](https://github.com/apache/flink/pull/6254), and I also think the lock 
adjusting in `6254` has reference values.


---


[jira] [Commented] (FLINK-9676) Deadlock during canceling task and recycling exclusive buffer

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533343#comment-16533343
 ] 

ASF GitHub Bot commented on FLINK-9676:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/6257
  
Thanks for fixing this bug, and we also solve this problem in this way. 

This solution seems more lightweight than the way in 
[6254](https://github.com/apache/flink/pull/6254), and I also think the lock 
adjusting in `6254` has reference values.


> Deadlock during canceling task and recycling exclusive buffer
> -
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}} > {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside 
> the {{synchronized(availableMemorySegments) in }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle 
> can cover this case but the deadlock probability is very low, so this UT is 
> not stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9718) Add enviroment variable in start-scala-shell.sh & flink to enable remote debug

2018-07-05 Thread Jeff Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang reassigned FLINK-9718:
-

Assignee: Jeff Zhang

> Add enviroment variable in start-scala-shell.sh & flink to enable remote debug
> --
>
> Key: FLINK-9718
> URL: https://issues.apache.org/jira/browse/FLINK-9718
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Assignee: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9718) Add enviroment variable in start-scala-shell.sh & flink to enable remote debug

2018-07-05 Thread Jeff Zhang (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Zhang updated FLINK-9718:
--
Summary: Add enviroment variable in start-scala-shell.sh & flink to enable 
remote debug  (was: Add enviroment variable in start-scala-shell.sh to enable 
remote debug)

> Add enviroment variable in start-scala-shell.sh & flink to enable remote debug
> --
>
> Key: FLINK-9718
> URL: https://issues.apache.org/jira/browse/FLINK-9718
> Project: Flink
>  Issue Type: Improvement
>Reporter: Jeff Zhang
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6092: [FLINK-9352] In Standalone checkpoint recover mode many j...

2018-07-05 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6092
  
@tillrohrmann has refactored, please review again. the reason of the travis 
error is not because of this PR.


---


[jira] [Commented] (FLINK-9352) In Standalone checkpoint recover mode many jobs with same checkpoint interval cause IO pressure

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533407#comment-16533407
 ] 

ASF GitHub Bot commented on FLINK-9352:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6092
  
@tillrohrmann has refactored, please review again. the reason of the travis 
error is not because of this PR.


> In Standalone checkpoint recover mode many jobs with same checkpoint interval 
> cause IO pressure
> ---
>
> Key: FLINK-9352
> URL: https://issues.apache.org/jira/browse/FLINK-9352
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: vinoyang
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> currently, the periodic checkpoint coordinator startCheckpointScheduler uses 
> *baseInterval* as the initialDelay parameter. the *baseInterval* is also the 
> checkpoint interval. 
> In standalone checkpoint mode, many jobs config the same checkpoint interval. 
> When all jobs being recovered (the cluster restart or jobmanager leadership 
> switched), all jobs' checkpoint period will tend to accordance. All jobs' 
> CheckpointCoordinator would start and trigger in a approximate time point.
> This caused the high IO cost in the same time period in our production 
> scenario.
> I suggest let the scheduleAtFixedRate's initial delay parameter as a API 
> config which can let user scatter checkpoint in this scenario.
>  
> cc [~StephanEwen] [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200274038
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

true, that's why I created FLINK-9755 for this issue and already have code 
(have to add tests though) - expect a PR soon


---


[jira] [Commented] (FLINK-9676) Deadlock during canceling task and recycling exclusive buffer

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533408#comment-16533408
 ] 

ASF GitHub Bot commented on FLINK-9676:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200274038
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

true, that's why I created FLINK-9755 for this issue and already have code 
(have to add tests though) - expect a PR soon


> Deadlock during canceling task and recycling exclusive buffer
> -
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}} > {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside 
> the {{synchronized(availableMemorySegments) in }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle 
> can cover this case but the deadlock probability is very low, so this UT is 
> not stable.



--
This

[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200274433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

true, this ~could~ should be done outside the lock as well


---


[jira] [Commented] (FLINK-9676) Deadlock during canceling task and recycling exclusive buffer

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533409#comment-16533409
 ] 

ASF GitHub Bot commented on FLINK-9676:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200274433
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

true, this ~could~ should be done outside the lock as well


> Deadlock during canceling task and recycling exclusive buffer
> -
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}} > {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside 
> the {{synchronized(availableMemorySegments) in }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle 
> can cover this case but the deadlock probability is very low, so this UT is 
> not stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9759) give an irrelevant answer about the savepoint restore when stateless operators be added etc

2018-07-05 Thread lamber-ken (JIRA)
lamber-ken created FLINK-9759:
-

 Summary: give an irrelevant answer about the savepoint restore 
when stateless operators be added etc
 Key: FLINK-9759
 URL: https://issues.apache.org/jira/browse/FLINK-9759
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.4.2
Reporter: lamber-ken
Assignee: lamber-ken
 Fix For: 1.6.0, 1.5.1


give an irrelevant answer about the savepoint restore when stateless operators 
be added etc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread NicoK
Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200283214
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

actually, let's do this in a follow-up PR


---


[jira] [Commented] (FLINK-9676) Deadlock during canceling task and recycling exclusive buffer

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533425#comment-16533425
 ] 

ASF GitHub Bot commented on FLINK-9676:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200283214
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

actually, let's do this in a follow-up PR


> Deadlock during canceling task and recycling exclusive buffer
> -
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}} > {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside 
> the {{synchronized(availableMemorySegments) in }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle 
> can cover this case but the deadlock probability is very low, so this UT is 
> not stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9759) give an irrelevant answer about the savepoint restore when stateless operators be added etc

2018-07-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9759:
--
Labels: pull-request-available  (was: )

> give an irrelevant answer about the savepoint restore when stateless 
> operators be added etc
> ---
>
> Key: FLINK-9759
> URL: https://issues.apache.org/jira/browse/FLINK-9759
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> give an irrelevant answer about the savepoint restore when stateless 
> operators be added etc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6261: [FLINK-9759] [DOCS] give an right answer about the...

2018-07-05 Thread lamber-ken
GitHub user lamber-ken opened a pull request:

https://github.com/apache/flink/pull/6261

[FLINK-9759] [DOCS] give an right answer about the savepoint restore when 
stateless operators be added, deleted etc

## What is the purpose of the change
  - fix irrelevant answer about the savepoint restore when stateless 
operators be added, deleted etc

## Brief change log

  the question is about `stateless operator` , but old answer has something 
about `stateful operator`
  and some questions about `stateful operator` has been answered above.

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lamber-ken/flink FLINK-9759

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6261.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6261


commit 09f5a5bfc42538f5c4ec66a26d8ca70423d00a6d
Author: lamber-ken 
Date:   2018-07-05T09:10:14Z

give an right answer about the savepoint restore when stateless operators 
be added, deleted etc




---


[jira] [Commented] (FLINK-9759) give an irrelevant answer about the savepoint restore when stateless operators be added etc

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533433#comment-16533433
 ] 

ASF GitHub Bot commented on FLINK-9759:
---

GitHub user lamber-ken opened a pull request:

https://github.com/apache/flink/pull/6261

[FLINK-9759] [DOCS] give an right answer about the savepoint restore when 
stateless operators be added, deleted etc

## What is the purpose of the change
  - fix irrelevant answer about the savepoint restore when stateless 
operators be added, deleted etc

## Brief change log

  the question is about `stateless operator` , but old answer has something 
about `stateful operator`
  and some questions about `stateful operator` has been answered above.

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (JavaDocs)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lamber-ken/flink FLINK-9759

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6261.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6261


commit 09f5a5bfc42538f5c4ec66a26d8ca70423d00a6d
Author: lamber-ken 
Date:   2018-07-05T09:10:14Z

give an right answer about the savepoint restore when stateless operators 
be added, deleted etc




> give an irrelevant answer about the savepoint restore when stateless 
> operators be added etc
> ---
>
> Key: FLINK-9759
> URL: https://issues.apache.org/jira/browse/FLINK-9759
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> give an irrelevant answer about the savepoint restore when stateless 
> operators be added etc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200287730
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

👍


---


[jira] [Commented] (FLINK-9676) Deadlock during canceling task and recycling exclusive buffer

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533439#comment-16533439
 ] 

ASF GitHub Bot commented on FLINK-9676:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200287796
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

👍


> Deadlock during canceling task and recycling exclusive buffer
> -
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}} > {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside 
> the {{synchronized(availableMemorySegments) in }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle 
> can cover this case but the deadlock probability is very low, so this UT is 
> not stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200287796
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
--- End diff --

👍


---


[jira] [Commented] (FLINK-9676) Deadlock during canceling task and recycling exclusive buffer

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533438#comment-16533438
 ] 

ASF GitHub Bot commented on FLINK-9676:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/6257#discussion_r200287730
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
 ---
@@ -251,27 +257,56 @@ private MemorySegment requestMemorySegment(boolean 
isBlocking) throws Interrupte
 
@Override
public void recycle(MemorySegment segment) {
+   BufferListener listener;
synchronized (availableMemorySegments) {
if (isDestroyed || numberOfRequestedMemorySegments > 
currentPoolSize) {
returnMemorySegment(segment);
+   return;
}
else {
-   BufferListener listener = 
registeredListeners.poll();
+   listener = registeredListeners.poll();
 
if (listener == null) {
availableMemorySegments.add(segment);
availableMemorySegments.notify();
+   return;
}
-   else {
-   try {
-   boolean needMoreBuffers = 
listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
-   if (needMoreBuffers) {
-   
registeredListeners.add(listener);
-   }
+   }
+   }
+
+   // We do not know which locks have been acquired before the 
recycle() or are needed in the
+   // notification and which other threads also access them.
+   // -> call notifyBufferAvailable() outside of the synchronized 
block to avoid a deadlock (FLINK-9676)
+   boolean success = false;
+   boolean needMoreBuffers = false;
+   try {
+   needMoreBuffers = listener.notifyBufferAvailable(new 
NetworkBuffer(segment, this));
+   success = true;
+   } catch (Throwable ignored) {
+   // handled below, under the lock
+   }
+
+   if (!success || needMoreBuffers) {
+   synchronized (availableMemorySegments) {
+   if (isDestroyed) {
+   // cleanup tasks how they would have 
been done if we only had one synchronized block
+   if (needMoreBuffers) {
+   
listener.notifyBufferDestroyed();
}
-   catch (Throwable ignored) {
-   
availableMemorySegments.add(segment);
-   
availableMemorySegments.notify();
--- End diff --

👍


> Deadlock during canceling task and recycling exclusive buffer
> -
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}} > {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside 
> the {{synchronized(availableMemorySegments) in }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle 
> can cover this case but the deadlock probability is very low, so this UT is 
> not stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533440#comment-16533440
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6090
  
Hi @xccui, sorry for being pushy but when do you think you can update this 
PR? The feature freeze is in approx. 1 week and I need to coordinate my 
work/reviewing efforts. Let me know if you want me to take over.


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

2018-07-05 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6090
  
Hi @xccui, sorry for being pushy but when do you think you can update this 
PR? The feature freeze is in approx. 1 week and I need to coordinate my 
work/reviewing efforts. Let me know if you want me to take over.


---


[jira] [Commented] (FLINK-7344) Migrate usage of joda-time to the Java 8 DateTime API

2018-07-05 Thread Congxian Qiu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533442#comment-16533442
 ] 

Congxian Qiu commented on FLINK-7344:
-

Hi, [~yew1eb], are you still working on this, If you don't have time, I could 
take it over.

> Migrate usage of joda-time to the Java 8 DateTime API
> -
>
> Key: FLINK-7344
> URL: https://issues.apache.org/jira/browse/FLINK-7344
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Haohui Mai
>Assignee: Hai Zhou
>Priority: Major
>
> As the minimum Java version of Flink has been upgraded to 1.8, it is a good 
> time to migrate all usage of the joda-time package to the native Java 8 
> DateTime API.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-05 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533444#comment-16533444
 ] 

Timo Walther commented on FLINK-8858:
-

Hi [~liurenjie1024], would it be ok if I assign this issue to me? FLINK-8866 is 
close to be merged and {{INSERT INTO}} should go into Flink 1.6 (feature freeze 
is next week).

> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6262: [hotfix][filesystem] Remove incorrect equals metho...

2018-07-05 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/6262

[hotfix][filesystem] Remove incorrect equals methods in StreamWriters

This pull request removes incorrect equals methods in `StreamWriterBase` 
(and in classes that inherit from it) that were used for tests and moves their 
logic to test class.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink hash

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6262.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6262


commit a0fd4ac3e07a035effe0e103af97d1c4b2173c22
Author: Piotr Nowojski 
Date:   2018-07-05T09:28:35Z

[hotfix][filesystem] Remove incorrect equals methods in StreamWriters




---


[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

2018-07-05 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/6090
  
Hi @twalthr,  please give me one more day. I will commit the changes 
tomorrow. 😄 


---


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533450#comment-16533450
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/6090
  
Hi @twalthr,  please give me one more day. I will commit the changes 
tomorrow. 😄 


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-05 Thread Renjie Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533453#comment-16533453
 ] 

Renjie Liu commented on FLINK-8858:
---

Hi [~twalthr] That's ok since currently I have no time for that.

> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Renjie Liu
>Priority: Major
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6171: [FLINK-9593] Unified After Match semantics with SQL MATCH...

2018-07-05 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6171
  
Thanks @kl0u for review. I've addressed points 1 and 3. As the second one 
touches some critical parts, let's address it in a separate JIRA.


---


[jira] [Commented] (FLINK-9593) Unify AfterMatch semantics with SQL MATCH_RECOGNIZE

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533455#comment-16533455
 ] 

ASF GitHub Bot commented on FLINK-9593:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/6171
  
Thanks @kl0u for review. I've addressed points 1 and 3. As the second one 
touches some critical parts, let's address it in a separate JIRA.


> Unify AfterMatch semantics with SQL MATCH_RECOGNIZE
> ---
>
> Key: FLINK-9593
> URL: https://issues.apache.org/jira/browse/FLINK-9593
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6090: [FLINK-8863] [SQL] Add user-defined function support in S...

2018-07-05 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6090
  
@xccui perfect. thank you very much!


---


[jira] [Commented] (FLINK-8863) Add user-defined function support in SQL Client

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8863?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533460#comment-16533460
 ] 

ASF GitHub Bot commented on FLINK-8863:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/6090
  
@xccui perfect. thank you very much!


> Add user-defined function support in SQL Client
> ---
>
> Key: FLINK-8863
> URL: https://issues.apache.org/jira/browse/FLINK-8863
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> This issue is a subtask of part two "Full Embedded SQL Client" of the 
> implementation plan mentioned in 
> [FLIP-24|https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client].
>  
> It should be possible to declare user-defined functions in the SQL client. 
> For now, we limit the registration to classes that implement 
> {{ScalarFunction}}, {{TableFunction}}, {{AggregateFunction}}. Functions that 
> are implemented in SQL are not part of this issue. 
> I would suggest to introduce a {{functions}} top-level property. The 
> declaration could look similar to: 
> {code} 
> functions: 
>   - name: testFunction 
> from: class <-- optional, default: class 
> class: org.my.MyScalarFunction 
> constructor: <-- optional, needed for certain types of functions 
>   - 42.0 
>   - class: org.my.Class <-- possibility to create objects via properties 
> constructor: 
>   - 1 
>   - true 
>   - false 
>   - "whatever" 
>   - type: INT 
> value: 1 
> {code} 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9760) Return a single element from extractPatterns

2018-07-05 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-9760:
---

 Summary: Return a single element from extractPatterns
 Key: FLINK-9760
 URL: https://issues.apache.org/jira/browse/FLINK-9760
 Project: Flink
  Issue Type: Improvement
  Components: CEP
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz


Right now {{SharedBuffer#extractPatterns}} allows to extract multiple matches 
for the same ComputationState, but our NFA does not allow creating such. Thus 
we should optimize this method having in mind the method can only return one 
match.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8858) Add support for INSERT INTO in SQL Client

2018-07-05 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther reassigned FLINK-8858:
---

Assignee: Timo Walther  (was: Renjie Liu)

> Add support for INSERT INTO in SQL Client
> -
>
> Key: FLINK-8858
> URL: https://issues.apache.org/jira/browse/FLINK-8858
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Renjie Liu
>Assignee: Timo Walther
>Priority: Major
>
> The current design of SQL Client embedded mode doesn't support long running 
> queries. It would be useful for simple jobs that can be expressed in a single 
> sql statement if we can submit sql statements stored in files as long running 
> queries. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9759) give an irrelevant answer about the savepoint restore when stateless operators be added etc

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533468#comment-16533468
 ] 

ASF GitHub Bot commented on FLINK-9759:
---

Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6261
  
:100:, you are welcome, thank you for your explanation, I'll close this PR.


> give an irrelevant answer about the savepoint restore when stateless 
> operators be added etc
> ---
>
> Key: FLINK-9759
> URL: https://issues.apache.org/jira/browse/FLINK-9759
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> give an irrelevant answer about the savepoint restore when stateless 
> operators be added etc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6261: [FLINK-9759] [DOCS] remove irrelevant answer about...

2018-07-05 Thread lamber-ken
Github user lamber-ken closed the pull request at:

https://github.com/apache/flink/pull/6261


---


[GitHub] flink issue #6261: [FLINK-9759] [DOCS] remove irrelevant answer about the sa...

2018-07-05 Thread lamber-ken
Github user lamber-ken commented on the issue:

https://github.com/apache/flink/pull/6261
  
:100:, you are welcome, thank you for your explanation, I'll close this PR.


---


[jira] [Commented] (FLINK-9759) give an irrelevant answer about the savepoint restore when stateless operators be added etc

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533470#comment-16533470
 ] 

ASF GitHub Bot commented on FLINK-9759:
---

Github user lamber-ken closed the pull request at:

https://github.com/apache/flink/pull/6261


> give an irrelevant answer about the savepoint restore when stateless 
> operators be added etc
> ---
>
> Key: FLINK-9759
> URL: https://issues.apache.org/jira/browse/FLINK-9759
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> give an irrelevant answer about the savepoint restore when stateless 
> operators be added etc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6257: [FLINK-9676][network] clarify contracts of BufferL...

2018-07-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6257


---


[jira] [Commented] (FLINK-9676) Deadlock during canceling task and recycling exclusive buffer

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533479#comment-16533479
 ] 

ASF GitHub Bot commented on FLINK-9676:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6257


> Deadlock during canceling task and recycling exclusive buffer
> -
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}} > {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside 
> the {{synchronized(availableMemorySegments) in }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle 
> can cover this case but the deadlock probability is very low, so this UT is 
> not stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9743) PackagedProgram.extractContainedLibraries fails on Windows

2018-07-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9743:
--
Labels: pull-request-available  (was: )

> PackagedProgram.extractContainedLibraries fails on Windows
> --
>
> Key: FLINK-9743
> URL: https://issues.apache.org/jira/browse/FLINK-9743
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.4.3, 1.5.2
>
>
> Submitting a jar that contains other jars on Windows fails with an exception:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Unknown I/O error 
> while extracting contained jar files.
> at 
> org.apache.flink.client.program.PackagedProgram.extractContainedLibraries(PackagedProgram.java:752)
> at 
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:194)
> at 
> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:833)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:201)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: An I/O 
> error occurred while creating temporary file to extract nested library 
> 'lib/antrunner.jar'.
> at 
> org.apache.flink.client.program.PackagedProgram.extractContainedLibraries(PackagedProgram.java:708)
> ... 7 more
> Caused by: java.io.IOException: Unable to create temporary file, 
> C:\Users\XXX\AppData\Local\Temp\1751416743_6922010711856647205lib\antrunner.jar
> at java.io.File$TempDirectory.generateFile(Unknown Source)
> at java.io.File.createTempFile(Unknown Source)
> at java.io.File.createTempFile(Unknown Source)
> at 
> org.apache.flink.client.program.PackagedProgram.extractContainedLibraries(PackagedProgram.java:702)
> ... 7 more
> {code}
> {{PackagedProgram.extractContainedLibraries}} tries to replace all path 
> separators using the platform-dependent {{File.separateChar}}, however the 
> path separator for jars (and zips for that matter) is always {{/}}.
> {code}
> final JarEntry entry = containedJarFileEntries.get(i);
> String name = entry.getName();
> name = name.replace(File.separatorChar, '_');
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9743) PackagedProgram.extractContainedLibraries fails on Windows

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533480#comment-16533480
 ] 

ASF GitHub Bot commented on FLINK-9743:
---

GitHub user snuyanzin opened a pull request:

https://github.com/apache/flink/pull/6263

[FLINK-9743][Client] Use correct zip/jar path separator

## What is the purpose of the change

*This PR resolves libraries extraction issue from jars*

## Brief change log

  - *Always in case of zip/jar use '/' path separator*
  - *Test with generated jar emulating the real case*

## Verifying this change
  - Added test generates fake jar with a structure
test.jar
 |- lib
 |--|- internalTest.jar 
and then calls for `PackagedProgram#extractContainedLibraries` to check 
if it extracts internalTest.jar correctly

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/snuyanzin/flink FLINK_9743

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6263.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6263


commit 649b9f1a0893aea32bd2dc3cfa1702bfb77ab29f
Author: snuyanzin 
Date:   2018-07-05T08:58:33Z

[FLINK-9743] use correct zip path separator,
PackagedProgramTest#testExtractContainedLibraries to check 
PackagedProgram#extractContainedLibraries




> PackagedProgram.extractContainedLibraries fails on Windows
> --
>
> Key: FLINK-9743
> URL: https://issues.apache.org/jira/browse/FLINK-9743
> Project: Flink
>  Issue Type: Bug
>  Components: Client, Job-Submission
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: Chesnay Schepler
>Assignee: Sergey Nuyanzin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.4.3, 1.5.2
>
>
> Submitting a jar that contains other jars on Windows fails with an exception:
> {code}
> org.apache.flink.client.program.ProgramInvocationException: Unknown I/O error 
> while extracting contained jar files.
> at 
> org.apache.flink.client.program.PackagedProgram.extractContainedLibraries(PackagedProgram.java:752)
> at 
> org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:194)
> at 
> org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:833)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:201)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1020)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1096)
> at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1096)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: An I/O 
> error occurred while creating temporary file to extract nested library 
> 'lib/antrunner.jar'.
> at 
> org.apache.flink.client.program.PackagedProgram.extractContainedLibraries(PackagedProgram.java:708)
> ... 7 more
> Caused by: java.io.IOException: Unable to create temporary file, 
> C:\Users\XXX\AppData\Local\Temp\1751416743_6922010711856647205lib\antrunner.jar
> at java.io.File$TempDirectory.generateFile(Unknown Source)
> at java.io.File.createTempFile(Unknown Source)
> at java.io.File.createTempFile(Unknown Source)
> at 
> org.apache.flink.client.program.PackagedProgram.extractContainedLibraries(PackagedProgram.java:702)
> ... 7 more
> {code}
> {{PackagedProgram.extractContainedLibraries}} tries to replace all path 
> separators using the platform-dependent {{File.separateChar}}, however the 
> path separator for jars (and zips for that matter) is always {{/}}.
> {code}
> final JarEntry entry = containedJarFileEntries.get(i);
> String name = entry.getName();
> name = name.replace(File.separatorChar, '

[GitHub] flink pull request #6263: [FLINK-9743][Client] Use correct zip/jar path sepa...

2018-07-05 Thread snuyanzin
GitHub user snuyanzin opened a pull request:

https://github.com/apache/flink/pull/6263

[FLINK-9743][Client] Use correct zip/jar path separator

## What is the purpose of the change

*This PR resolves libraries extraction issue from jars*

## Brief change log

  - *Always in case of zip/jar use '/' path separator*
  - *Test with generated jar emulating the real case*

## Verifying this change
  - Added test generates fake jar with a structure
test.jar
 |- lib
 |--|- internalTest.jar 
and then calls for `PackagedProgram#extractContainedLibraries` to check 
if it extracts internalTest.jar correctly

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/snuyanzin/flink FLINK_9743

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6263.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6263


commit 649b9f1a0893aea32bd2dc3cfa1702bfb77ab29f
Author: snuyanzin 
Date:   2018-07-05T08:58:33Z

[FLINK-9743] use correct zip path separator,
PackagedProgramTest#testExtractContainedLibraries to check 
PackagedProgram#extractContainedLibraries




---


[jira] [Resolved] (FLINK-9676) Deadlock during canceling task and recycling exclusive buffer

2018-07-05 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber resolved FLINK-9676.

Resolution: Fixed

Fixed via
1.6.0: a9b5579b397b6c56e44e21ebb8b2a6a7e6d8b2d1
1.5.0: 6ce8211bab84308a17043dc4901d6a93b2777da8

> Deadlock during canceling task and recycling exclusive buffer
> -
>
> Key: FLINK-9676
> URL: https://issues.apache.org/jira/browse/FLINK-9676
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: Nico Kruber
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.6.0, 1.5.1
>
>
> It may cause deadlock between task canceler thread and task thread.
> The detail is as follows:
> {{Task canceler thread -> IC1#releaseAllResources -> recycle floating buffers 
> -> {color:#d04437}lock{color}(LocalBufferPool#availableMemorySegments) -> 
> IC2#notifyBufferAvailable}} > {color:#d04437}try to 
> lock{color}(IC2#bufferQueue)
> {{Task thread -> IC2#recycle -> {color:#d04437}lock{color}(IC2#bufferQueue) 
> -> bufferQueue#addExclusiveBuffer}} -> {{floatingBuffer#recycleBuffer}} -> 
> {color:#d04437}try to lock{color}(LocalBufferPool#availableMemorySegments)
> One solution is that {{listener#notifyBufferAvailable}} can be called outside 
> the {{synchronized(availableMemorySegments) in }}{{LocalBufferPool#recycle.}}
> The existing RemoteInputChannelTest#testConcurrentOnSenderBacklogAndRecycle 
> can cover this case but the deadlock probability is very low, so this UT is 
> not stable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9761) Potential buffer leak in PartitionRequestClientHandler during job failures

2018-07-05 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-9761:
--

 Summary: Potential buffer leak in PartitionRequestClientHandler 
during job failures
 Key: FLINK-9761
 URL: https://issues.apache.org/jira/browse/FLINK-9761
 Project: Flink
  Issue Type: Bug
  Components: Network
Affects Versions: 1.5.0
Reporter: Nico Kruber
Assignee: Nico Kruber
 Fix For: 1.6.0, 1.5.2


{{PartitionRequestClientHandler#stagedMessages}} may be accessed from multiple 
threads:
1) Netty's IO thread
2) During cancellation, 
{{PartitionRequestClientHandler.BufferListenerTask#notifyBufferDestroyed}} is 
called

If {{PartitionRequestClientHandler.BufferListenerTask#notifyBufferDestroyed}} 
thinks, {{stagesMessages}} is empty, however, it will not install the 
{{stagedMessagesHandler}} that consumes and releases buffers from received 
messages.
Unless some unexpected combination of code calls prevents this from happening, 
this would leak the non-recycled buffers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9736) Potential null reference in KeyGroupPartitionedPriorityQueue#poll()

2018-07-05 Thread Sihua Zhou (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533506#comment-16533506
 ] 

Sihua Zhou commented on FLINK-9736:
---

Hi [~yuzhih...@gmail.com] AFAIK, {{heapOfKeyGroupHeaps}} will never be empty, 
as I mentioned above, it created in the constructor of  
{{KeyGroupPartitionedPriorityQueue}} to maintain the timer structure base on 
the heap of each key group. The number of elements in heapOfKeyGroupHeaps 
should be equal to the number of key groups in each Task, but as a double check 
I think maybe [~stefanrichte...@gmail.com] could help to confirm this.

> Potential null reference in KeyGroupPartitionedPriorityQueue#poll()
> ---
>
> Key: FLINK-9736
> URL: https://issues.apache.org/jira/browse/FLINK-9736
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
> final PQ headList = heapOfkeyGroupedHeaps.peek();
> final T head = headList.poll();
> {code}
> {{peek}} call may return null.
> The return value should be checked before de-referencing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533509#comment-16533509
 ] 

ASF GitHub Bot commented on FLINK-8558:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6264

[FLINK-8558] [table] Add unified format interfaces and separate formats 
from connectors

## What is the purpose of the change

This PR introduces a format discovery mechanism based on Java Service 
Providers. The general `TableFormatFactory` is similar to the existing table 
source discovery mechanism. However, it allows for arbitrary format interfaces 
that might be introduced in the future. At the moment, a connector can request 
configured instances of `DeserializationSchema` and `SerializationSchema`. In 
the future we can add interfaces such as a `Writer` or 
`KeyedSerializationSchema` without breaking backwards compatibility.

This PR deprecates the existing strong coupling of connector and format for 
the Kafa table sources and table source factories. It introduces 
descriptor-based alternatives.

## Brief change log

- Introduction of `TableFormatService` with `TableFormatFactory` and 
specific `DeserializationSchemaFactory` and `SerializationSchemaFactory`
- Decoupling of existing connectors (i.e. Kafka) from formats (i.e. JSON 
and Avro)
- Exposing the descriptor-based approach, deprecate the old builders and 
make table source internal


## Verifying this change

- Existing tests for coupled sources and factories are still working
- New tests for format discovery, formats, and decoupled table sources

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? more docs for the descriptors 
will follow in a separate PR


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8558

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6264.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6264


commit 9415066578d6fac420ea68edfa4509d5625f2a82
Author: Timo Walther 
Date:   2018-06-27T11:16:49Z

[FLINK-8558] [table] Add unified format interfaces and separate formats 
from connectors

This PR introduces a format discovery mechanism based on Java Service 
Providers. The general `TableFormatFactory` is similar to the existing table 
source discovery mechanism. However, it allows for arbirary format interfaces 
that might be introduced in the future. At the moment, a connector can request 
configured instances of `DeserializationSchema` and `SerializationSchema`. In 
the future we can add interfaces such as a `Writer` or 
`KeyedSerializationSchema` without breaking backwards compatibility.

This PR deprecates the existing strong coupling of connector and format for 
the Kafa table sources and table source factories. It introduces 
descriptor-based alternatives.




> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8558) Add unified format interfaces and format discovery

2018-07-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-8558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-8558:
--
Labels: pull-request-available  (was: )

> Add unified format interfaces and format discovery
> --
>
> Key: FLINK-8558
> URL: https://issues.apache.org/jira/browse/FLINK-8558
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> In the last release, we introduced a new module {{flink-formats}}. Currently 
> only {{flink-avro}} is located there but we will add more formats such as 
> {{flink-json}}, {{flink-protobuf}}, and so on. For better separation of 
> concerns we want decouple connectors from formats: e.g., remove 
> {{KafkaAvroTableSource}} and {{KafkaJsonTableSource}}.
> A newly introduced {{FormatFactory}} will use Java service loaders to 
> discovery available formats in the classpath (similar to how file systems are 
> discovered now). A {{Format}} will provide a method for converting {{byte[]}} 
> to target record type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6264: [FLINK-8558] [table] Add unified format interfaces...

2018-07-05 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6264

[FLINK-8558] [table] Add unified format interfaces and separate formats 
from connectors

## What is the purpose of the change

This PR introduces a format discovery mechanism based on Java Service 
Providers. The general `TableFormatFactory` is similar to the existing table 
source discovery mechanism. However, it allows for arbitrary format interfaces 
that might be introduced in the future. At the moment, a connector can request 
configured instances of `DeserializationSchema` and `SerializationSchema`. In 
the future we can add interfaces such as a `Writer` or 
`KeyedSerializationSchema` without breaking backwards compatibility.

This PR deprecates the existing strong coupling of connector and format for 
the Kafa table sources and table source factories. It introduces 
descriptor-based alternatives.

## Brief change log

- Introduction of `TableFormatService` with `TableFormatFactory` and 
specific `DeserializationSchemaFactory` and `SerializationSchemaFactory`
- Decoupling of existing connectors (i.e. Kafka) from formats (i.e. JSON 
and Avro)
- Exposing the descriptor-based approach, deprecate the old builders and 
make table source internal


## Verifying this change

- Existing tests for coupled sources and factories are still working
- New tests for format discovery, formats, and decoupled table sources

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? more docs for the descriptors 
will follow in a separate PR


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-8558

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6264.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6264


commit 9415066578d6fac420ea68edfa4509d5625f2a82
Author: Timo Walther 
Date:   2018-06-27T11:16:49Z

[FLINK-8558] [table] Add unified format interfaces and separate formats 
from connectors

This PR introduces a format discovery mechanism based on Java Service 
Providers. The general `TableFormatFactory` is similar to the existing table 
source discovery mechanism. However, it allows for arbirary format interfaces 
that might be introduced in the future. At the moment, a connector can request 
configured instances of `DeserializationSchema` and `SerializationSchema`. In 
the future we can add interfaces such as a `Writer` or 
`KeyedSerializationSchema` without breaking backwards compatibility.

This PR deprecates the existing strong coupling of connector and format for 
the Kafa table sources and table source factories. It introduces 
descriptor-based alternatives.




---


[jira] [Created] (FLINK-9762) CoreOptions.TMP_DIRS wrongly managed on Yarn

2018-07-05 Thread Oleksandr Nitavskyi (JIRA)
Oleksandr Nitavskyi created FLINK-9762:
--

 Summary: CoreOptions.TMP_DIRS wrongly managed on Yarn
 Key: FLINK-9762
 URL: https://issues.apache.org/jira/browse/FLINK-9762
 Project: Flink
  Issue Type: Bug
  Components: YARN
Affects Versions: 1.5.0
Reporter: Oleksandr Nitavskyi


The issue on Yarn is that it is impossible to have different LOCAL_DIRS on 
JobManager and TaskManager, despite LOCAL_DIRS value depends on the container.

The issue is that CoreOptions.TMP_DIRS is configured to the default value 
during JobManager initialization and added to the configuration object. When 
TaskManager is launched the appropriate configuration object is cloned with 
LOCAL_DIRS which makes sense only for Job Manager container. When YARN 
container with TaskManager from his point of view CoreOptions.TMP_DIRS is 
always equal either to path in flink.yml or to the or to the LOCAL_DIRS of Job 
Manager (default behaviour). Is TaskManager’s container do not have an access 
to another folders, that folders allocated by YARN TaskManager cannot be 
started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9736) Potential null reference in KeyGroupPartitionedPriorityQueue#poll()

2018-07-05 Thread Stefan Richter (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9736?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stefan Richter closed FLINK-9736.
-
Resolution: Not A Bug

> Potential null reference in KeyGroupPartitionedPriorityQueue#poll()
> ---
>
> Key: FLINK-9736
> URL: https://issues.apache.org/jira/browse/FLINK-9736
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
> final PQ headList = heapOfkeyGroupedHeaps.peek();
> final T head = headList.poll();
> {code}
> {{peek}} call may return null.
> The return value should be checked before de-referencing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9736) Potential null reference in KeyGroupPartitionedPriorityQueue#poll()

2018-07-05 Thread Stefan Richter (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533515#comment-16533515
 ] 

Stefan Richter commented on FLINK-9736:
---

[~sihuazhou] Is correct, this heap is always initialized with > 1 objects and 
we never remove them. I know that static code analysis highlights from the 
methods annotation but this is not a bug.

> Potential null reference in KeyGroupPartitionedPriorityQueue#poll()
> ---
>
> Key: FLINK-9736
> URL: https://issues.apache.org/jira/browse/FLINK-9736
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
> final PQ headList = heapOfkeyGroupedHeaps.peek();
> final T head = headList.poll();
> {code}
> {{peek}} call may return null.
> The return value should be checked before de-referencing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Widen scope of Expression.r...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6252
  
Thanks for opening this PR @HeartSaVioR! (cc @twalthr)

I thought about this again and I don't think we should make the method 
public. The problem is that `Expression` is one of the core classes of the 
Table API. By making the method public it becomes visible to all users of the 
API.

A better approach is to provide a publicly accessible util object (in 
org.apache.flink.table.api...) that provides access to the result type (and 
possibly other properties) of ab `Expression`.

What do you think? 

Best, Fabian


---


[jira] [Commented] (FLINK-9742) Widen scope of Expression.resultType to 'public'

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533519#comment-16533519
 ] 

ASF GitHub Bot commented on FLINK-9742:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6252
  
Thanks for opening this PR @HeartSaVioR! (cc @twalthr)

I thought about this again and I don't think we should make the method 
public. The problem is that `Expression` is one of the core classes of the 
Table API. By making the method public it becomes visible to all users of the 
API.

A better approach is to provide a publicly accessible util object (in 
org.apache.flink.table.api...) that provides access to the result type (and 
possibly other properties) of ab `Expression`.

What do you think? 

Best, Fabian


> Widen scope of Expression.resultType to 'public'
> 
>
> Key: FLINK-9742
> URL: https://issues.apache.org/jira/browse/FLINK-9742
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> I have use case of TableSource which requires custom implementation of 
> TimestampExtractor. To ensure new TimestampExtractor to cover more general 
> use cases, accessing Expression.resultType is necessary, but its scope is now 
> defined as package private for "org.apache.flink".
> Below is the implementation of custom TimestampExtractor which leverages 
> Expression.resultType, hence had to place it to org.apache.flink package 
> (looks like a hack).
> {code:java}
> class IsoDateStringAwareExistingField(val field: String) extends 
> TimestampExtractor {
>   override def getArgumentFields: Array[String] = Array(field)
>   override def validateArgumentFields(argumentFieldTypes: 
> Array[TypeInformation[_]]): Unit = {
> val fieldType = argumentFieldTypes(0)
> fieldType match {
>   case Types.LONG => // OK
>   case Types.SQL_TIMESTAMP => // OK
>   case Types.STRING => // OK
>   case _: TypeInformation[_] =>
> throw ValidationException(
>   s"Field '$field' must be of type Long or Timestamp or String but is 
> of type $fieldType.")
> }
>   }
>   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
> Expression = {
> val fieldAccess: Expression = fieldAccesses(0)
> fieldAccess.resultType match {
>   case Types.LONG =>
> // access LONG field
> fieldAccess
>   case Types.SQL_TIMESTAMP =>
> // cast timestamp to long
> Cast(fieldAccess, Types.LONG)
>   case Types.STRING =>
> Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
> }
>   }
> }{code}
> It would be better to just make Expression.resultType public to cover other 
> cases as well. (I'm not sure other methods would be also better to be public 
> as well.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9763) Flink SQL Client bat script

2018-07-05 Thread Pavel Shvetsov (JIRA)
Pavel Shvetsov created FLINK-9763:
-

 Summary: Flink SQL Client bat script
 Key: FLINK-9763
 URL: https://issues.apache.org/jira/browse/FLINK-9763
 Project: Flink
  Issue Type: Sub-task
  Components: Client
Reporter: Pavel Shvetsov
Assignee: Pavel Shvetsov


Create .bat script for flink SQL client launch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Widen scope of Expression.r...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/flink/pull/6252
  
@fhueske 
Yeah, that would be great. I'm happy to revert the change and introduce new 
class. 

I'm just not sure where to place and how to name it, since it will be going 
to have just one method, `getReturnType(Expression): TypeInformation[_]`.

Would `org.apache.flink.table.api.ExpressionUtil` be OK for new utility 
class?

Thanks in advance!


---


[jira] [Commented] (FLINK-9742) Widen scope of Expression.resultType to 'public'

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533542#comment-16533542
 ] 

ASF GitHub Bot commented on FLINK-9742:
---

Github user HeartSaVioR commented on the issue:

https://github.com/apache/flink/pull/6252
  
@fhueske 
Yeah, that would be great. I'm happy to revert the change and introduce new 
class. 

I'm just not sure where to place and how to name it, since it will be going 
to have just one method, `getReturnType(Expression): TypeInformation[_]`.

Would `org.apache.flink.table.api.ExpressionUtil` be OK for new utility 
class?

Thanks in advance!


> Widen scope of Expression.resultType to 'public'
> 
>
> Key: FLINK-9742
> URL: https://issues.apache.org/jira/browse/FLINK-9742
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> I have use case of TableSource which requires custom implementation of 
> TimestampExtractor. To ensure new TimestampExtractor to cover more general 
> use cases, accessing Expression.resultType is necessary, but its scope is now 
> defined as package private for "org.apache.flink".
> Below is the implementation of custom TimestampExtractor which leverages 
> Expression.resultType, hence had to place it to org.apache.flink package 
> (looks like a hack).
> {code:java}
> class IsoDateStringAwareExistingField(val field: String) extends 
> TimestampExtractor {
>   override def getArgumentFields: Array[String] = Array(field)
>   override def validateArgumentFields(argumentFieldTypes: 
> Array[TypeInformation[_]]): Unit = {
> val fieldType = argumentFieldTypes(0)
> fieldType match {
>   case Types.LONG => // OK
>   case Types.SQL_TIMESTAMP => // OK
>   case Types.STRING => // OK
>   case _: TypeInformation[_] =>
> throw ValidationException(
>   s"Field '$field' must be of type Long or Timestamp or String but is 
> of type $fieldType.")
> }
>   }
>   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
> Expression = {
> val fieldAccess: Expression = fieldAccesses(0)
> fieldAccess.resultType match {
>   case Types.LONG =>
> // access LONG field
> fieldAccess
>   case Types.SQL_TIMESTAMP =>
> // cast timestamp to long
> Cast(fieldAccess, Types.LONG)
>   case Types.STRING =>
> Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
> }
>   }
> }{code}
> It would be better to just make Expression.resultType public to cover other 
> cases as well. (I'm not sure other methods would be also better to be public 
> as well.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9763) Flink SQL Client bat script

2018-07-05 Thread Pavel Shvetsov (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pavel Shvetsov updated FLINK-9763:
--
Component/s: (was: Client)
 Startup Shell Scripts

> Flink SQL Client bat script
> ---
>
> Key: FLINK-9763
> URL: https://issues.apache.org/jira/browse/FLINK-9763
> Project: Flink
>  Issue Type: Sub-task
>  Components: Startup Shell Scripts
>Reporter: Pavel Shvetsov
>Assignee: Pavel Shvetsov
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Create .bat script for flink SQL client launch.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6239) Sharing of State Across Operators

2018-07-05 Thread Stephan Ewen (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-6239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533553#comment-16533553
 ] 

Stephan Ewen commented on FLINK-6239:
-

Thanks for sharing the details. Materialized table joins are in the works.

There is a design doc for adding an extended version of this (Time Versioned 
Table Join) in SQL and the Table API: 
https://docs.google.com/document/d/1KaAkPZjWFeu-ffrC9FhYuxE6CIKsatHTTxyrxSBR8Sk



> Sharing of State Across Operators
> -
>
> Key: FLINK-6239
> URL: https://issues.apache.org/jira/browse/FLINK-6239
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.1.4
>Reporter: Elias Levy
>Priority: Major
>
> Currently state cannot be shared across operators.  On a keyed stream, the 
> state is implicitly keyed by the operator id, in addition to the stream key.  
> This can make it more difficult and inefficient to implement complex 
> topologies, where multiple operator may need to access the same state.  It 
> would be value to be able to access keyed value and map stated across 
> operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Widen scope of Expression.r...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6252
  
I think it can also be a new public method in 
`org.apache.flink.table.expressions.ExpressionUtils`.

Thanks, Fabian


---


[jira] [Commented] (FLINK-9742) Widen scope of Expression.resultType to 'public'

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533558#comment-16533558
 ] 

ASF GitHub Bot commented on FLINK-9742:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6252
  
I think it can also be a new public method in 
`org.apache.flink.table.expressions.ExpressionUtils`.

Thanks, Fabian


> Widen scope of Expression.resultType to 'public'
> 
>
> Key: FLINK-9742
> URL: https://issues.apache.org/jira/browse/FLINK-9742
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> I have use case of TableSource which requires custom implementation of 
> TimestampExtractor. To ensure new TimestampExtractor to cover more general 
> use cases, accessing Expression.resultType is necessary, but its scope is now 
> defined as package private for "org.apache.flink".
> Below is the implementation of custom TimestampExtractor which leverages 
> Expression.resultType, hence had to place it to org.apache.flink package 
> (looks like a hack).
> {code:java}
> class IsoDateStringAwareExistingField(val field: String) extends 
> TimestampExtractor {
>   override def getArgumentFields: Array[String] = Array(field)
>   override def validateArgumentFields(argumentFieldTypes: 
> Array[TypeInformation[_]]): Unit = {
> val fieldType = argumentFieldTypes(0)
> fieldType match {
>   case Types.LONG => // OK
>   case Types.SQL_TIMESTAMP => // OK
>   case Types.STRING => // OK
>   case _: TypeInformation[_] =>
> throw ValidationException(
>   s"Field '$field' must be of type Long or Timestamp or String but is 
> of type $fieldType.")
> }
>   }
>   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
> Expression = {
> val fieldAccess: Expression = fieldAccesses(0)
> fieldAccess.resultType match {
>   case Types.LONG =>
> // access LONG field
> fieldAccess
>   case Types.SQL_TIMESTAMP =>
> // cast timestamp to long
> Cast(fieldAccess, Types.LONG)
>   case Types.STRING =>
> Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
> }
>   }
> }{code}
> It would be better to just make Expression.resultType public to cover other 
> cases as well. (I'm not sure other methods would be also better to be public 
> as well.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9764) Failure in LocalRecoveryRocksDBFullITCase

2018-07-05 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber updated FLINK-9764:
---
Labels: test-stability  (was: )

> Failure in LocalRecoveryRocksDBFullITCase
> -
>
> Key: FLINK-9764
> URL: https://issues.apache.org/jira/browse/FLINK-9764
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Streaming
>Affects Versions: 1.6.0
>Reporter: Nico Kruber
>Priority: Major
>  Labels: test-stability
>
> {code}
> Running org.apache.flink.test.checkpointing.LocalRecoveryRocksDBFullITCase
> Starting null#executeTest.
> org.apache.flink.runtime.client.JobExecutionException: 
> java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> but 
> was:<1209>
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
>   at 
> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
>   at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.testTumblingTimeWindow(AbstractEventTimeWindowCheckpointingITCase.java:286)
>   at 
> org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:82)
>   at 
> org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:74)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
>   at 
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
>   at 
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
> Caused by: java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> 
> but was:<1209>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:733)
>   at 
> org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:669)
>   at 
> org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:

[jira] [Created] (FLINK-9764) Failure in LocalRecoveryRocksDBFullITCase

2018-07-05 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-9764:
--

 Summary: Failure in LocalRecoveryRocksDBFullITCase
 Key: FLINK-9764
 URL: https://issues.apache.org/jira/browse/FLINK-9764
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing, Streaming
Affects Versions: 1.6.0
Reporter: Nico Kruber


{code}
Running org.apache.flink.test.checkpointing.LocalRecoveryRocksDBFullITCase
Starting null#executeTest.
org.apache.flink.runtime.client.JobExecutionException: 
java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> but 
was:<1209>
at 
org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
at 
org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:79)
at org.apache.flink.test.util.TestUtils.tryExecute(TestUtils.java:35)
at 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.testTumblingTimeWindow(AbstractEventTimeWindowCheckpointingITCase.java:286)
at 
org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:82)
at 
org.apache.flink.test.checkpointing.AbstractLocalRecoveryITCase.executeTest(AbstractLocalRecoveryITCase.java:74)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Caused by: java.lang.AssertionError: Window start: 0 end: 100 expected:<4950> 
but was:<1209>
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.failNotEquals(Assert.java:834)
at org.junit.Assert.assertEquals(Assert.java:645)
at 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:733)
at 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase$ValidatingSink.invoke(AbstractEventTimeWindowCheckpointingITCase.java:669)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
{code}

https://travis-ci.org/NicoK/flink/jobs/400323147



--
This message was sent by Atlassian JIRA
(v7.6.3#7

[jira] [Updated] (FLINK-9765) Improve CLI responsiveness when cluster is not reachable

2018-07-05 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-9765:

Affects Version/s: 1.5.0

> Improve CLI responsiveness when cluster is not reachable
> 
>
> Key: FLINK-9765
> URL: https://issues.apache.org/jira/browse/FLINK-9765
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> If the cluster was not started or is not reachable it takes a long time to 
> cancel a result. This should not affect the main thread. The CLI should be 
> responsive at all times.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9765) Improve CLI responsiveness when cluster is not reachable

2018-07-05 Thread Timo Walther (JIRA)
Timo Walther created FLINK-9765:
---

 Summary: Improve CLI responsiveness when cluster is not reachable
 Key: FLINK-9765
 URL: https://issues.apache.org/jira/browse/FLINK-9765
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther


If the cluster was not started or is not reachable it takes a long time to 
cancel a result. This should not affect the main thread. The CLI should be 
responsive at all times.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6253: [WIP][FLINK-8094][Table API & SQL] Support other types fo...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6253
  
It's a good question whether to add a new class or not. Right now, 
extending the implementation of `ExistingField` seems like a better approach to 
me. Once we add a `ParsingExistingField` extractor that can be configured with 
a timestamp format, the ISO date String support in `ExistingField` would be 
obsolete, but IMO that's not a big problem. 

+1 to change the implementation to extend `ExistingField`.

Regarding the tests, I think `ExistingField` is used in a few ITCases, but 
there are no unit tests yet.
Adding unit tests is a bit tricky, because we would need to integrate it 
with the code generator, etc.
So, a big +1 if you would like to look into that, but I'd also be fine by 
adding another ITCase.

For the documentation, we might want to extend the bullet point about 
`timestampExtractor` in the [Defining a Rowtime 
Attribute](https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#defining-a-rowtime-attribute)
 section.

Thanks, Fabian


---


[jira] [Commented] (FLINK-8094) Support other types for ExistingField rowtime extractor

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533579#comment-16533579
 ] 

ASF GitHub Bot commented on FLINK-8094:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6253
  
It's a good question whether to add a new class or not. Right now, 
extending the implementation of `ExistingField` seems like a better approach to 
me. Once we add a `ParsingExistingField` extractor that can be configured with 
a timestamp format, the ISO date String support in `ExistingField` would be 
obsolete, but IMO that's not a big problem. 

+1 to change the implementation to extend `ExistingField`.

Regarding the tests, I think `ExistingField` is used in a few ITCases, but 
there are no unit tests yet.
Adding unit tests is a bit tricky, because we would need to integrate it 
with the code generator, etc.
So, a big +1 if you would like to look into that, but I'd also be fine by 
adding another ITCase.

For the documentation, we might want to extend the bullet point about 
`timestampExtractor` in the [Defining a Rowtime 
Attribute](https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#defining-a-rowtime-attribute)
 section.

Thanks, Fabian


> Support other types for ExistingField rowtime extractor
> ---
>
> Key: FLINK-8094
> URL: https://issues.apache.org/jira/browse/FLINK-8094
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the {{ExistingField}} rowtime extractor only supports {{Long}} and 
> {{Timestamp}} fields. To enable other data types (e.g., {{String}}), we can 
> provide some system extraction functions and allow users to pass some 
> parameters via the constructor of {{ExistingField}}. There's [a simple 
> demo|https://github.com/xccui/flink/commit/afcc5f1a0ad92db08294199e61be5df72c1514f8]
>  which enables the {{String}} type rowtime by adding a UDF {{str2EventTime}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6265: [FLINK-9765] [sql-client] Improve CLI responsivene...

2018-07-05 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6265

[FLINK-9765] [sql-client] Improve CLI responsiveness when cluster is not 
reachable

## What is the purpose of the change

Moves the job cancellation into the final phase of the refresh thread in 
order to keep the CLI responsive. The result of the cancellation is not used 
anyway.

## Brief change log

Moves the job cancellation into the phase of the refresh thread

## Verifying this change

Manually verified.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9765

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6265.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6265


commit 8204ae6e2a638b6c9f0d093bcd35b7710b0dbf61
Author: Timo Walther 
Date:   2018-07-05T11:25:28Z

[FLINK-9765] [sql-client] Improve CLI responsiveness when cluster is not 
reachable




---


[jira] [Commented] (FLINK-9765) Improve CLI responsiveness when cluster is not reachable

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533581#comment-16533581
 ] 

ASF GitHub Bot commented on FLINK-9765:
---

GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/6265

[FLINK-9765] [sql-client] Improve CLI responsiveness when cluster is not 
reachable

## What is the purpose of the change

Moves the job cancellation into the final phase of the refresh thread in 
order to keep the CLI responsive. The result of the cancellation is not used 
anyway.

## Brief change log

Moves the job cancellation into the phase of the refresh thread

## Verifying this change

Manually verified.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-9765

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6265.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6265


commit 8204ae6e2a638b6c9f0d093bcd35b7710b0dbf61
Author: Timo Walther 
Date:   2018-07-05T11:25:28Z

[FLINK-9765] [sql-client] Improve CLI responsiveness when cluster is not 
reachable




> Improve CLI responsiveness when cluster is not reachable
> 
>
> Key: FLINK-9765
> URL: https://issues.apache.org/jira/browse/FLINK-9765
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> If the cluster was not started or is not reachable it takes a long time to 
> cancel a result. This should not affect the main thread. The CLI should be 
> responsive at all times.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9765) Improve CLI responsiveness when cluster is not reachable

2018-07-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9765?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9765:
--
Labels: pull-request-available  (was: )

> Improve CLI responsiveness when cluster is not reachable
> 
>
> Key: FLINK-9765
> URL: https://issues.apache.org/jira/browse/FLINK-9765
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> If the cluster was not started or is not reachable it takes a long time to 
> cancel a result. This should not affect the main thread. The CLI should be 
> responsive at all times.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6266: [FLINK-9682] Add setDescription to execution envir...

2018-07-05 Thread yanghua
GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6266

[FLINK-9682] Add setDescription to execution environment and display it in 
the UI

## What is the purpose of the change

*This pull request add setDescription to execution environment*

## Brief change log

  - *add setDescription API to execution environment*

## Verifying this change


This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9682

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6266.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6266


commit a03253bf5dc888d6e68afff725138c69488a5fa3
Author: yanghua 
Date:   2018-07-05T12:20:56Z

[FLINK-9682] Add setDescription to execution environment and display it in 
the UI




---


[jira] [Updated] (FLINK-9682) Add setDescription to execution environment and display it in the UI

2018-07-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-9682:
--
Labels: pull-request-available  (was: )

> Add setDescription to execution environment and display it in the UI
> 
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9682) Add setDescription to execution environment and display it in the UI

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533585#comment-16533585
 ] 

ASF GitHub Bot commented on FLINK-9682:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/6266

[FLINK-9682] Add setDescription to execution environment and display it in 
the UI

## What is the purpose of the change

*This pull request add setDescription to execution environment*

## Brief change log

  - *add setDescription API to execution environment*

## Verifying this change


This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-9682

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6266.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6266


commit a03253bf5dc888d6e68afff725138c69488a5fa3
Author: yanghua 
Date:   2018-07-05T12:20:56Z

[FLINK-9682] Add setDescription to execution environment and display it in 
the UI




> Add setDescription to execution environment and display it in the UI
> 
>
> Key: FLINK-9682
> URL: https://issues.apache.org/jira/browse/FLINK-9682
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Elias Levy
>Assignee: vinoyang
>Priority: Major
>  Labels: pull-request-available
>
> Currently you can provide a job name to {{execute}} in the execution 
> environment.  In an environment where many version of a job may be executing, 
> such as a development or test environment, identifying which running job is 
> of a specific version via the UI can be difficult unless the version is 
> embedded into the job name given the {{execute}}.  But the job name is uses 
> for other purposes, such as for namespacing metrics.  Thus, it is not ideal 
> to modify the job name, as that could require modifying metric dashboards and 
> monitors each time versions change.
> I propose a new method be added to the execution environment, 
> {{setDescription}}, that would allow a user to pass in an arbitrary 
> description that would be displayed in the dashboard, allowing users to 
> distinguish jobs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6265: [FLINK-9765] [sql-client] Improve CLI responsiveness when...

2018-07-05 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6265
  
+1


---


[jira] [Commented] (FLINK-9765) Improve CLI responsiveness when cluster is not reachable

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533588#comment-16533588
 ] 

ASF GitHub Bot commented on FLINK-9765:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6265
  
+1


> Improve CLI responsiveness when cluster is not reachable
> 
>
> Key: FLINK-9765
> URL: https://issues.apache.org/jira/browse/FLINK-9765
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> If the cluster was not started or is not reachable it takes a long time to 
> cancel a result. This should not affect the main thread. The CLI should be 
> responsive at all times.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9759) give an irrelevant answer about the savepoint restore when stateless operators be added etc

2018-07-05 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9759?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9759.
---
   Resolution: Not A Problem
Fix Version/s: (was: 1.5.1)
   (was: 1.6.0)

> give an irrelevant answer about the savepoint restore when stateless 
> operators be added etc
> ---
>
> Key: FLINK-9759
> URL: https://issues.apache.org/jira/browse/FLINK-9759
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: lamber-ken
>Assignee: lamber-ken
>Priority: Major
>  Labels: pull-request-available
>
> give an irrelevant answer about the savepoint restore when stateless 
> operators be added etc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Widen scope of Expression.r...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/flink/pull/6252
  
@fhueske 
Ah I didn't notice the object is already existing. Thanks for letting me 
know!


---


[jira] [Commented] (FLINK-9742) Widen scope of Expression.resultType to 'public'

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533614#comment-16533614
 ] 

ASF GitHub Bot commented on FLINK-9742:
---

Github user HeartSaVioR commented on the issue:

https://github.com/apache/flink/pull/6252
  
@fhueske 
Ah I didn't notice the object is already existing. Thanks for letting me 
know!


> Widen scope of Expression.resultType to 'public'
> 
>
> Key: FLINK-9742
> URL: https://issues.apache.org/jira/browse/FLINK-9742
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> I have use case of TableSource which requires custom implementation of 
> TimestampExtractor. To ensure new TimestampExtractor to cover more general 
> use cases, accessing Expression.resultType is necessary, but its scope is now 
> defined as package private for "org.apache.flink".
> Below is the implementation of custom TimestampExtractor which leverages 
> Expression.resultType, hence had to place it to org.apache.flink package 
> (looks like a hack).
> {code:java}
> class IsoDateStringAwareExistingField(val field: String) extends 
> TimestampExtractor {
>   override def getArgumentFields: Array[String] = Array(field)
>   override def validateArgumentFields(argumentFieldTypes: 
> Array[TypeInformation[_]]): Unit = {
> val fieldType = argumentFieldTypes(0)
> fieldType match {
>   case Types.LONG => // OK
>   case Types.SQL_TIMESTAMP => // OK
>   case Types.STRING => // OK
>   case _: TypeInformation[_] =>
> throw ValidationException(
>   s"Field '$field' must be of type Long or Timestamp or String but is 
> of type $fieldType.")
> }
>   }
>   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
> Expression = {
> val fieldAccess: Expression = fieldAccesses(0)
> fieldAccess.resultType match {
>   case Types.LONG =>
> // access LONG field
> fieldAccess
>   case Types.SQL_TIMESTAMP =>
> // cast timestamp to long
> Cast(fieldAccess, Types.LONG)
>   case Types.STRING =>
> Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
> }
>   }
> }{code}
> It would be better to just make Expression.resultType public to cover other 
> cases as well. (I'm not sure other methods would be also better to be public 
> as well.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6255: [FLINK-9681] [table] Make sure difference between ...

2018-07-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6255#discussion_r200330683
  
--- Diff: docs/dev/table/streaming.md ---
@@ -591,16 +589,14 @@ qConfig.withIdleStateRetentionTime(Time.hours(12);
 
 val qConfig: StreamQueryConfig = ???
 
-// set idle state retention time: min = 12 hour, max = 16 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16))
-// set idle state retention time. min = max = 12 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12)
+// set idle state retention time: min = 12 hour, max = 24 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
 
 {% endhighlight %}
 
 
 
-Configuring different minimum and maximum idle state retention times is 
more efficient because it reduces the internal book-keeping of a query for when 
to remove state.
+Configuring different minimum and maximum idle state retention times is 
more efficient because it reduces the internal book-keeping of a query for when 
to remove state. Difference between minTime and maxTime shoud be at least 5 
minutes.
--- End diff --

The "... more efficient ..." does not apply anymore. Maybe rephrase to 

> Cleaning up state requires additional bookkeeping which becomes less 
expensive for larger differences of `minTime` and `maxTime`. The difference 
between `minTime` and `maxTime` must be at least 5 minutes.




---


[GitHub] flink pull request #6255: [FLINK-9681] [table] Make sure difference between ...

2018-07-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6255#discussion_r200333630
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ---
@@ -384,4 +386,12 @@ object HarnessTestBase {
   value.row.getField(selectorField).asInstanceOf[T]
 }
   }
+
+  /**
+* Test class used to test min and max retention time.
+*/
+  class StreamQueryConfigTest(min: Time, max: Time) extends 
StreamQueryConfig {
--- End diff --

I would rename the class to `TestStreamQueryConfig` because the `Test` at 
the end suggests that this class is testing something instead of being a util 
for a test.


---


[jira] [Commented] (FLINK-9681) Make sure minRetentionTime not equal to maxRetentionTime

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533619#comment-16533619
 ] 

ASF GitHub Bot commented on FLINK-9681:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6255#discussion_r200333630
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ---
@@ -384,4 +386,12 @@ object HarnessTestBase {
   value.row.getField(selectorField).asInstanceOf[T]
 }
   }
+
+  /**
+* Test class used to test min and max retention time.
+*/
+  class StreamQueryConfigTest(min: Time, max: Time) extends 
StreamQueryConfig {
--- End diff --

I would rename the class to `TestStreamQueryConfig` because the `Test` at 
the end suggests that this class is testing something instead of being a util 
for a test.


> Make sure minRetentionTime not equal to maxRetentionTime
> 
>
> Key: FLINK-9681
> URL: https://issues.apache.org/jira/browse/FLINK-9681
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, for a group by(or other operators), if minRetentionTime equals to 
> maxRetentionTime, the group by operator will register a timer for each record 
> coming at different time which cause performance problem. The reasoning for 
> having two parameters is that we can avoid to register many timers if we have 
> more freedom when to discard state. As min equals to max cause performance 
> problem it is better to make sure these two parameters are not same.
> Any suggestions are welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9681) Make sure minRetentionTime not equal to maxRetentionTime

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533618#comment-16533618
 ] 

ASF GitHub Bot commented on FLINK-9681:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6255#discussion_r200330683
  
--- Diff: docs/dev/table/streaming.md ---
@@ -591,16 +589,14 @@ qConfig.withIdleStateRetentionTime(Time.hours(12);
 
 val qConfig: StreamQueryConfig = ???
 
-// set idle state retention time: min = 12 hour, max = 16 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16))
-// set idle state retention time. min = max = 12 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12)
+// set idle state retention time: min = 12 hour, max = 24 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
 
 {% endhighlight %}
 
 
 
-Configuring different minimum and maximum idle state retention times is 
more efficient because it reduces the internal book-keeping of a query for when 
to remove state.
+Configuring different minimum and maximum idle state retention times is 
more efficient because it reduces the internal book-keeping of a query for when 
to remove state. Difference between minTime and maxTime shoud be at least 5 
minutes.
--- End diff --

The "... more efficient ..." does not apply anymore. Maybe rephrase to 

> Cleaning up state requires additional bookkeeping which becomes less 
expensive for larger differences of `minTime` and `maxTime`. The difference 
between `minTime` and `maxTime` must be at least 5 minutes.




> Make sure minRetentionTime not equal to maxRetentionTime
> 
>
> Key: FLINK-9681
> URL: https://issues.apache.org/jira/browse/FLINK-9681
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>  Labels: pull-request-available
>
> Currently, for a group by(or other operators), if minRetentionTime equals to 
> maxRetentionTime, the group by operator will register a timer for each record 
> coming at different time which cause performance problem. The reasoning for 
> having two parameters is that we can avoid to register many timers if we have 
> more freedom when to discard state. As min equals to max cause performance 
> problem it is better to make sure these two parameters are not same.
> Any suggestions are welcome.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9742) Expose Expression.resultType to public

2018-07-05 Thread Jungtaek Lim (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9742?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated FLINK-9742:

Summary: Expose Expression.resultType to public  (was: Widen scope of 
Expression.resultType to 'public')

> Expose Expression.resultType to public
> --
>
> Key: FLINK-9742
> URL: https://issues.apache.org/jira/browse/FLINK-9742
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> I have use case of TableSource which requires custom implementation of 
> TimestampExtractor. To ensure new TimestampExtractor to cover more general 
> use cases, accessing Expression.resultType is necessary, but its scope is now 
> defined as package private for "org.apache.flink".
> Below is the implementation of custom TimestampExtractor which leverages 
> Expression.resultType, hence had to place it to org.apache.flink package 
> (looks like a hack).
> {code:java}
> class IsoDateStringAwareExistingField(val field: String) extends 
> TimestampExtractor {
>   override def getArgumentFields: Array[String] = Array(field)
>   override def validateArgumentFields(argumentFieldTypes: 
> Array[TypeInformation[_]]): Unit = {
> val fieldType = argumentFieldTypes(0)
> fieldType match {
>   case Types.LONG => // OK
>   case Types.SQL_TIMESTAMP => // OK
>   case Types.STRING => // OK
>   case _: TypeInformation[_] =>
> throw ValidationException(
>   s"Field '$field' must be of type Long or Timestamp or String but is 
> of type $fieldType.")
> }
>   }
>   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
> Expression = {
> val fieldAccess: Expression = fieldAccesses(0)
> fieldAccess.resultType match {
>   case Types.LONG =>
> // access LONG field
> fieldAccess
>   case Types.SQL_TIMESTAMP =>
> // cast timestamp to long
> Cast(fieldAccess, Types.LONG)
>   case Types.STRING =>
> Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
> }
>   }
> }{code}
> It would be better to just make Expression.resultType public to cover other 
> cases as well. (I'm not sure other methods would be also better to be public 
> as well.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Expose Expression.resultTyp...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/flink/pull/6252
  
@fhueske Addressed. Please take a look again.


---


[jira] [Commented] (FLINK-9742) Expose Expression.resultType to public

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533623#comment-16533623
 ] 

ASF GitHub Bot commented on FLINK-9742:
---

Github user HeartSaVioR commented on the issue:

https://github.com/apache/flink/pull/6252
  
@fhueske Addressed. Please take a look again.


> Expose Expression.resultType to public
> --
>
> Key: FLINK-9742
> URL: https://issues.apache.org/jira/browse/FLINK-9742
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> I have use case of TableSource which requires custom implementation of 
> TimestampExtractor. To ensure new TimestampExtractor to cover more general 
> use cases, accessing Expression.resultType is necessary, but its scope is now 
> defined as package private for "org.apache.flink".
> Below is the implementation of custom TimestampExtractor which leverages 
> Expression.resultType, hence had to place it to org.apache.flink package 
> (looks like a hack).
> {code:java}
> class IsoDateStringAwareExistingField(val field: String) extends 
> TimestampExtractor {
>   override def getArgumentFields: Array[String] = Array(field)
>   override def validateArgumentFields(argumentFieldTypes: 
> Array[TypeInformation[_]]): Unit = {
> val fieldType = argumentFieldTypes(0)
> fieldType match {
>   case Types.LONG => // OK
>   case Types.SQL_TIMESTAMP => // OK
>   case Types.STRING => // OK
>   case _: TypeInformation[_] =>
> throw ValidationException(
>   s"Field '$field' must be of type Long or Timestamp or String but is 
> of type $fieldType.")
> }
>   }
>   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
> Expression = {
> val fieldAccess: Expression = fieldAccesses(0)
> fieldAccess.resultType match {
>   case Types.LONG =>
> // access LONG field
> fieldAccess
>   case Types.SQL_TIMESTAMP =>
> // cast timestamp to long
> Cast(fieldAccess, Types.LONG)
>   case Types.STRING =>
> Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
> }
>   }
> }{code}
> It would be better to just make Expression.resultType public to cover other 
> cases as well. (I'm not sure other methods would be also better to be public 
> as well.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6252: [FLINK-9742][Table API & SQL] Expose Expression.re...

2018-07-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6252#discussion_r200345063
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
 ---
@@ -22,13 +22,16 @@ import java.lang.{Boolean => JBoolean, Byte => JByte, 
Double => JDouble, Float =
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Time, Timestamp}
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
 import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
 
 object ExpressionUtils {
+  def getReturnType(expr: Expression): TypeInformation[_] = {
--- End diff --

Please add Scala docs since this is a public method.

Rename to `getResultType` for consistency?




---


[jira] [Commented] (FLINK-9742) Expose Expression.resultType to public

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533626#comment-16533626
 ] 

ASF GitHub Bot commented on FLINK-9742:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6252#discussion_r200345063
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
 ---
@@ -22,13 +22,16 @@ import java.lang.{Boolean => JBoolean, Byte => JByte, 
Double => JDouble, Float =
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Time, Timestamp}
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
 import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
 
 object ExpressionUtils {
+  def getReturnType(expr: Expression): TypeInformation[_] = {
--- End diff --

Please add Scala docs since this is a public method.

Rename to `getResultType` for consistency?




> Expose Expression.resultType to public
> --
>
> Key: FLINK-9742
> URL: https://issues.apache.org/jira/browse/FLINK-9742
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> I have use case of TableSource which requires custom implementation of 
> TimestampExtractor. To ensure new TimestampExtractor to cover more general 
> use cases, accessing Expression.resultType is necessary, but its scope is now 
> defined as package private for "org.apache.flink".
> Below is the implementation of custom TimestampExtractor which leverages 
> Expression.resultType, hence had to place it to org.apache.flink package 
> (looks like a hack).
> {code:java}
> class IsoDateStringAwareExistingField(val field: String) extends 
> TimestampExtractor {
>   override def getArgumentFields: Array[String] = Array(field)
>   override def validateArgumentFields(argumentFieldTypes: 
> Array[TypeInformation[_]]): Unit = {
> val fieldType = argumentFieldTypes(0)
> fieldType match {
>   case Types.LONG => // OK
>   case Types.SQL_TIMESTAMP => // OK
>   case Types.STRING => // OK
>   case _: TypeInformation[_] =>
> throw ValidationException(
>   s"Field '$field' must be of type Long or Timestamp or String but is 
> of type $fieldType.")
> }
>   }
>   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
> Expression = {
> val fieldAccess: Expression = fieldAccesses(0)
> fieldAccess.resultType match {
>   case Types.LONG =>
> // access LONG field
> fieldAccess
>   case Types.SQL_TIMESTAMP =>
> // cast timestamp to long
> Cast(fieldAccess, Types.LONG)
>   case Types.STRING =>
> Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
> }
>   }
> }{code}
> It would be better to just make Expression.resultType public to cover other 
> cases as well. (I'm not sure other methods would be also better to be public 
> as well.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Expose Expression.resultTyp...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6252
  
Looks good. Just one last comment.


---


[jira] [Commented] (FLINK-9742) Expose Expression.resultType to public

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9742?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533627#comment-16533627
 ] 

ASF GitHub Bot commented on FLINK-9742:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6252
  
Looks good. Just one last comment.


> Expose Expression.resultType to public
> --
>
> Key: FLINK-9742
> URL: https://issues.apache.org/jira/browse/FLINK-9742
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Jungtaek Lim
>Priority: Major
>  Labels: pull-request-available
>
> I have use case of TableSource which requires custom implementation of 
> TimestampExtractor. To ensure new TimestampExtractor to cover more general 
> use cases, accessing Expression.resultType is necessary, but its scope is now 
> defined as package private for "org.apache.flink".
> Below is the implementation of custom TimestampExtractor which leverages 
> Expression.resultType, hence had to place it to org.apache.flink package 
> (looks like a hack).
> {code:java}
> class IsoDateStringAwareExistingField(val field: String) extends 
> TimestampExtractor {
>   override def getArgumentFields: Array[String] = Array(field)
>   override def validateArgumentFields(argumentFieldTypes: 
> Array[TypeInformation[_]]): Unit = {
> val fieldType = argumentFieldTypes(0)
> fieldType match {
>   case Types.LONG => // OK
>   case Types.SQL_TIMESTAMP => // OK
>   case Types.STRING => // OK
>   case _: TypeInformation[_] =>
> throw ValidationException(
>   s"Field '$field' must be of type Long or Timestamp or String but is 
> of type $fieldType.")
> }
>   }
>   override def getExpression(fieldAccesses: Array[ResolvedFieldReference]): 
> Expression = {
> val fieldAccess: Expression = fieldAccesses(0)
> fieldAccess.resultType match {
>   case Types.LONG =>
> // access LONG field
> fieldAccess
>   case Types.SQL_TIMESTAMP =>
> // cast timestamp to long
> Cast(fieldAccess, Types.LONG)
>   case Types.STRING =>
> Cast(Cast(fieldAccess, SqlTimeTypeInfo.TIMESTAMP), Types.LONG)
> }
>   }
> }{code}
> It would be better to just make Expression.resultType public to cover other 
> cases as well. (I'm not sure other methods would be also better to be public 
> as well.)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6267: [FLINK-5750] Incorrect parse of brackets inside VA...

2018-07-05 Thread AlexanderKoltsov
GitHub user AlexanderKoltsov opened a pull request:

https://github.com/apache/flink/pull/6267

[FLINK-5750] Incorrect parse of brackets inside VALUES subquery

## What is the purpose of the change

*This pull request adds supporting multiple inputs in DataSetUnionRule*


## Brief change log

  - *DataSetUnionRule should consider all inputs instead of only the 1st 
and 2nd*


## Verifying this change

*This change added the following test:*
*- Added unit test testValuesWithCast that validates VALUES operator with 
values which have to to be casted. This query will be transform to UNION of 
VALUES in plan optimizer since values arguments are not literal value*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AlexanderKoltsov/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6267.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6267


commit 6e0448b4039c577b017dd3bf2b09e68e0b53969f
Author: Alexander Koltsov 
Date:   2018-07-05T09:39:40Z

[FLINK-5750] Incorrect parse of brackets inside VALUES subquery

DataSetUnionRule should consider all inputs instead of only the 1st and 2nd.




---


[jira] [Commented] (FLINK-5750) Incorrect parse of brackets inside VALUES subquery

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533634#comment-16533634
 ] 

ASF GitHub Bot commented on FLINK-5750:
---

GitHub user AlexanderKoltsov opened a pull request:

https://github.com/apache/flink/pull/6267

[FLINK-5750] Incorrect parse of brackets inside VALUES subquery

## What is the purpose of the change

*This pull request adds supporting multiple inputs in DataSetUnionRule*


## Brief change log

  - *DataSetUnionRule should consider all inputs instead of only the 1st 
and 2nd*


## Verifying this change

*This change added the following test:*
*- Added unit test testValuesWithCast that validates VALUES operator with 
values which have to to be casted. This query will be transform to UNION of 
VALUES in plan optimizer since values arguments are not literal value*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AlexanderKoltsov/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6267.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6267


commit 6e0448b4039c577b017dd3bf2b09e68e0b53969f
Author: Alexander Koltsov 
Date:   2018-07-05T09:39:40Z

[FLINK-5750] Incorrect parse of brackets inside VALUES subquery

DataSetUnionRule should consider all inputs instead of only the 1st and 2nd.




> Incorrect parse of brackets inside VALUES subquery
> --
>
> Key: FLINK-5750
> URL: https://issues.apache.org/jira/browse/FLINK-5750
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Anton Mushin
>Assignee: Alexander Koltsov
>Priority: Minor
>  Labels: pull-request-available
>
> {code:java}
> @Test
>   public void testValuesWithCast() throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>   BatchTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env, config());
>   String sqlQuery = "VALUES (1, cast(1 as BIGINT) )," +
>   "(2, cast(2 as BIGINT))," +
>   "(3, cast(3 as BIGINT))";
>   String sqlQuery2 = "VALUES (1,1)," +
>   "(2, 2)," +
>   "(3, 3)";
>   Table result = tableEnv.sql(sqlQuery);
>   DataSet resultSet = tableEnv.toDataSet(result, Row.class);
>   List results = resultSet.collect();
>   Table result2 = tableEnv.sql(sqlQuery2);
>   DataSet resultSet2 = tableEnv.toDataSet(result2, 
> Row.class);
>   List results2 = resultSet2.collect();
>   String expected = "1,1\n2,2\n3,3";
>   compareResultAsText(results2, expected);
>   compareResultAsText(results, expected);
>   }
> {code}
> AR for {{results}} variable
> {noformat}
> java.lang.AssertionError: Different elements in arrays: expected 3 elements 
> and received 2
>  expected: [1,1, 2,2, 3,3]
>  received: [1,1, 2,2] 
> Expected :3
> Actual   :2
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-5750) Incorrect parse of brackets inside VALUES subquery

2018-07-05 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-5750?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-5750:
--
Labels: pull-request-available  (was: )

> Incorrect parse of brackets inside VALUES subquery
> --
>
> Key: FLINK-5750
> URL: https://issues.apache.org/jira/browse/FLINK-5750
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Affects Versions: 1.2.0
>Reporter: Anton Mushin
>Assignee: Alexander Koltsov
>Priority: Minor
>  Labels: pull-request-available
>
> {code:java}
> @Test
>   public void testValuesWithCast() throws Exception {
>   ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>   BatchTableEnvironment tableEnv = 
> TableEnvironment.getTableEnvironment(env, config());
>   String sqlQuery = "VALUES (1, cast(1 as BIGINT) )," +
>   "(2, cast(2 as BIGINT))," +
>   "(3, cast(3 as BIGINT))";
>   String sqlQuery2 = "VALUES (1,1)," +
>   "(2, 2)," +
>   "(3, 3)";
>   Table result = tableEnv.sql(sqlQuery);
>   DataSet resultSet = tableEnv.toDataSet(result, Row.class);
>   List results = resultSet.collect();
>   Table result2 = tableEnv.sql(sqlQuery2);
>   DataSet resultSet2 = tableEnv.toDataSet(result2, 
> Row.class);
>   List results2 = resultSet2.collect();
>   String expected = "1,1\n2,2\n3,3";
>   compareResultAsText(results2, expected);
>   compareResultAsText(results, expected);
>   }
> {code}
> AR for {{results}} variable
> {noformat}
> java.lang.AssertionError: Different elements in arrays: expected 3 elements 
> and received 2
>  expected: [1,1, 2,2, 3,3]
>  received: [1,1, 2,2] 
> Expected :3
> Actual   :2
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6253: [FLINK-8094][Table API & SQL] Support other types for Exi...

2018-07-05 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/flink/pull/6253
  
@fhueske 
Thanks for the nice feedback! Addressed your comment except regarding unit 
test. Let me try out adding unit test later, since it might bring too high 
barrier as of now.


---


[GitHub] flink issue #6262: [hotfix][filesystem] Remove incorrect equals methods in S...

2018-07-05 Thread pnowojski
Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/6262
  
Thanks, fixed travis failure and merged.


---


[jira] [Commented] (FLINK-8094) Support other types for ExistingField rowtime extractor

2018-07-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-8094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16533635#comment-16533635
 ] 

ASF GitHub Bot commented on FLINK-8094:
---

Github user HeartSaVioR commented on the issue:

https://github.com/apache/flink/pull/6253
  
@fhueske 
Thanks for the nice feedback! Addressed your comment except regarding unit 
test. Let me try out adding unit test later, since it might bring too high 
barrier as of now.


> Support other types for ExistingField rowtime extractor
> ---
>
> Key: FLINK-8094
> URL: https://issues.apache.org/jira/browse/FLINK-8094
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Xingcan Cui
>Priority: Major
>  Labels: pull-request-available
>
> Currently, the {{ExistingField}} rowtime extractor only supports {{Long}} and 
> {{Timestamp}} fields. To enable other data types (e.g., {{String}}), we can 
> provide some system extraction functions and allow users to pass some 
> parameters via the constructor of {{ExistingField}}. There's [a simple 
> demo|https://github.com/xccui/flink/commit/afcc5f1a0ad92db08294199e61be5df72c1514f8]
>  which enables the {{String}} type rowtime by adding a UDF {{str2EventTime}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >