[GitHub] flink issue #5409: [FLINK-8555][TableAPI & SQL] Fix TableFunction varargs le...

2018-02-05 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5409
  
Thanks for the PR @Xpray. The changes look good. I will make sure that the 
`SqlOperandTypeChecker` is also used for the other function types.


---


[jira] [Commented] (FLINK-8555) Fix TableFunction varargs length exceeds 254 for SQL

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353528#comment-16353528
 ] 

ASF GitHub Bot commented on FLINK-8555:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5409
  
Thanks for the PR @Xpray. The changes look good. I will make sure that the 
`SqlOperandTypeChecker` is also used for the other function types.


> Fix TableFunction varargs length exceeds 254 for SQL
> 
>
> Key: FLINK-8555
> URL: https://issues.apache.org/jira/browse/FLINK-8555
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API  SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> With Varargs, TableAPI can handle table function call with parameters exceeds 
> 254 correctly.
> This issue is intend to support long parameters for SQL



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8563) Support consecutive DOT operators

2018-02-05 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8563:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-8507

> Support consecutive DOT operators 
> --
>
> Key: FLINK-8563
> URL: https://issues.apache.org/jira/browse/FLINK-8563
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Priority: Major
>
> We added support for accessing fields of arrays of composite types in 
> FLINK-7923. However, accessing another nested subfield is not supported by 
> Calcite. See CALCITE-2162. We should fix this once we upgrade to Calcite 1.16.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5374: [FLINK-8101][flink-connectors] Elasticsearch 5.3+ (Transp...

2018-02-05 Thread cjolif
Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/5374
  
@tzulitai did you have a chance to look at this? If you have any question 
please let me know?


---


[jira] [Commented] (FLINK-8101) Elasticsearch 6.x support

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353511#comment-16353511
 ] 

ASF GitHub Bot commented on FLINK-8101:
---

Github user cjolif commented on the issue:

https://github.com/apache/flink/pull/5374
  
@tzulitai did you have a chance to look at this? If you have any question 
please let me know?


> Elasticsearch 6.x support
> -
>
> Key: FLINK-8101
> URL: https://issues.apache.org/jira/browse/FLINK-8101
> Project: Flink
>  Issue Type: New Feature
>  Components: ElasticSearch Connector
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Flavio Pompermaier
>Priority: Major
> Fix For: 1.5.0
>
>
> Recently, elasticsearch 6.0.0 was released: 
> https://www.elastic.co/blog/elasticsearch-6-0-0-released  
> The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8563) Support consecutive DOT operators

2018-02-05 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8563:
---

 Summary: Support consecutive DOT operators 
 Key: FLINK-8563
 URL: https://issues.apache.org/jira/browse/FLINK-8563
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Timo Walther


We added support for accessing fields of arrays of composite types in 
FLINK-7923. However, accessing another nested subfield is not supported by 
Calcite. See CALCITE-2162. We should fix this once we upgrade to Calcite 1.16.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-05 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353508#comment-16353508
 ] 

Xingcan Cui commented on FLINK-8538:


Thanks for your reply, [~fhueske]. I'll think it over.

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column

2018-02-05 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353506#comment-16353506
 ] 

Timo Walther edited comment on FLINK-7923 at 2/6/18 7:32 AM:
-

Fixed in 1.5.0: 91b7d011022fc1211aebe9114dfa6b497da49553.


was (Author: twalthr):
Fixed in 91b7d011022fc1211aebe9114dfa6b497da49553.

> Support accessing subfields of a Composite element in an Object Array type 
> column
> -
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353507#comment-16353507
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5367


> Support accessing subfields of a Composite element in an Object Array type 
> column
> -
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column

2018-02-05 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-7923.
-
Resolution: Fixed

Fixed in 91b7d011022fc1211aebe9114dfa6b497da49553.

> Support accessing subfields of a Composite element in an Object Array type 
> column
> -
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-02-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5367


---


[jira] [Commented] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353486#comment-16353486
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5367
  
Btw I will remove the null tests because fields of tuples and case classes 
are not allowed to be null. The serializers would throw an exception.


> Support accessing subfields of a Composite element in an Object Array type 
> column
> -
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...

2018-02-05 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5367
  
Btw I will remove the null tests because fields of tuples and case classes 
are not allowed to be null. The serializers would throw an exception.


---


[jira] [Commented] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353479#comment-16353479
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5367
  
Thanks for the update @suez1224. The changes look good. I will merge this...


> Support accessing subfields of a Composite element in an Object Array type 
> column
> -
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...

2018-02-05 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5367
  
Thanks for the update @suez1224. The changes look good. I will merge this...


---


[jira] [Updated] (FLINK-8562) Fix YARNSessionFIFOSecuredITCase

2018-02-05 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen updated FLINK-8562:
--
Description: Currently, YARNSessionFIFOSecuredITCase will not fail even if 
the current Flink YARN Kerberos integration is failing in production. Please 
see FLINK-8275.  (was: Currently, YARNSessionFIFOSecuredITCase will not fail 
even if the current Flink YARN Kerberos integration test is failing. Please see 
FLINK-8275.)

> Fix YARNSessionFIFOSecuredITCase
> 
>
> Key: FLINK-8562
> URL: https://issues.apache.org/jira/browse/FLINK-8562
> Project: Flink
>  Issue Type: Bug
>  Components: Security
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
>
> Currently, YARNSessionFIFOSecuredITCase will not fail even if the current 
> Flink YARN Kerberos integration is failing in production. Please see 
> FLINK-8275.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8562) Fix YARNSessionFIFOSecuredITCase

2018-02-05 Thread Shuyi Chen (JIRA)
Shuyi Chen created FLINK-8562:
-

 Summary: Fix YARNSessionFIFOSecuredITCase
 Key: FLINK-8562
 URL: https://issues.apache.org/jira/browse/FLINK-8562
 Project: Flink
  Issue Type: Bug
  Components: Security
Reporter: Shuyi Chen
Assignee: Shuyi Chen


Currently, YARNSessionFIFOSecuredITCase will not fail even if the current Flink 
YARN Kerberos integration test is failing. Please see FLINK-8275.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-6469) Configure Memory Sizes with units

2018-02-05 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-6469:
---

Assignee: vinoyang

> Configure Memory Sizes with units
> -
>
> Key: FLINK-6469
> URL: https://issues.apache.org/jira/browse/FLINK-6469
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> Currently, memory sizes are configured by pure numbers, the interpretation is 
> different from configuration parameter to parameter.
> For example, heap sizes are configured in megabytes, network buffer memory is 
> configured in bytes, alignment thresholds are configured in bytes.
> I propose to configure all memory parameters the same way, with units similar 
> to time. The JVM itself configured heap size similarly: {{Xmx5g}} or 
> {{Xmx2000m}}.
> {code}
> 1  -> bytes
> 10 kb
> 64 mb
> 1 gb
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353271#comment-16353271
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r166175837
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ---
@@ -131,10 +131,14 @@ public StreamInputProcessor(
long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
-   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-   + " must be positive or -1 
(infinite)");
+   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
--- End diff --

I think we can change the current `CheckpointBarrierHandler` interface into 
abstract class and then add a `createBarrierHanlder` method for extracting the 
common parts in `StreamInputProcessor` and `StreamTwoInputProcessor`. Or we 
define a new class for the common method. I prefer the first way. 
What do you think?


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r166175837
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ---
@@ -131,10 +131,14 @@ public StreamInputProcessor(
long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
-   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-   + " must be positive or -1 
(infinite)");
+   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
--- End diff --

I think we can change the current `CheckpointBarrierHandler` interface into 
abstract class and then add a `createBarrierHanlder` method for extracting the 
common parts in `StreamInputProcessor` and `StreamTwoInputProcessor`. Or we 
define a new class for the common method. I prefer the first way. 
What do you think?


---


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353265#comment-16353265
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r166174743
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
+ * flow control.
+ */
+@Internal
+public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedBarrierBuffer.class);
+
+   /** The gate that the buffer draws its input from. */
+   private final InputGate inputGate;
+
+   /** Flags that indicate whether a channel is currently 
blocked/buffered. */
+   private final boolean[] blockedChannels;
+
+   /** The total number of channels that this buffer handles data from. */
+   private final int totalNumberOfInputChannels;
+
+   /** The utility to buffer blocked data in the memory queue. */
+   private final CreditBasedBufferBlocker bufferBlocker;
+
+   /**
+* The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
+* from the input gate.
+*/
+   private final ArrayDeque queuedBuffered;
--- End diff --

I think we can not directly mix all the blocked buffers for different 
checkpoint ids into one `ArrayDeque`. It also needs the `BufferOrEventSequence` 
which indicates the blocked buffers for a specific checkpoint id, otherwise we 
can not know when the blocked buffers are exhausted after reset a specific 
checkpoint id. 

If we want to use only one `ArrayDeque` for blocking all buffers, we may 
need to insert extra hints of checkpoint id into this queue for helping when to 
stop reading blocked buffers from the queue.

For example:
channel1: [cp1,cp2,b1,cp3,b2,b3]
channel2: [cp2]

1. When reading cp1 first from channel1, [cp2,b1,cp3,b2,b3] are blocked as 
separate sequence1.

[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r166174743
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
+ * flow control.
+ */
+@Internal
+public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedBarrierBuffer.class);
+
+   /** The gate that the buffer draws its input from. */
+   private final InputGate inputGate;
+
+   /** Flags that indicate whether a channel is currently 
blocked/buffered. */
+   private final boolean[] blockedChannels;
+
+   /** The total number of channels that this buffer handles data from. */
+   private final int totalNumberOfInputChannels;
+
+   /** The utility to buffer blocked data in the memory queue. */
+   private final CreditBasedBufferBlocker bufferBlocker;
+
+   /**
+* The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
+* from the input gate.
+*/
+   private final ArrayDeque queuedBuffered;
--- End diff --

I think we can not directly mix all the blocked buffers for different 
checkpoint ids into one `ArrayDeque`. It also needs the `BufferOrEventSequence` 
which indicates the blocked buffers for a specific checkpoint id, otherwise we 
can not know when the blocked buffers are exhausted after reset a specific 
checkpoint id. 

If we want to use only one `ArrayDeque` for blocking all buffers, we may 
need to insert extra hints of checkpoint id into this queue for helping when to 
stop reading blocked buffers from the queue.

For example:
channel1: [cp1,cp2,b1,cp3,b2,b3]
channel2: [cp2]

1. When reading cp1 first from channel1, [cp2,b1,cp3,b2,b3] are blocked as 
separate sequence1.
2. When reading cp2 from channel2, the cp1 is released and begins to read 
sequence1.
3. When reading cp2 from seq1, the following buffers will be blocked in new 
seq2.
4. When reading cp3 from seq1,the cp2 is released and the 

[jira] [Commented] (FLINK-8552) CliFrontend don't exit normal after job has been submitted with 'bin/flink run some_jar_ball'

2018-02-05 Thread Lynch Lee (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16353191#comment-16353191
 ] 

Lynch Lee commented on FLINK-8552:
--

OK. Thanks !

> CliFrontend don't exit normal after job has been submitted with 'bin/flink 
> run some_jar_ball'
> -
>
> Key: FLINK-8552
> URL: https://issues.apache.org/jira/browse/FLINK-8552
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.4.0
>Reporter: Lynch Lee
>Priority: Major
>
> I used cmd 'bin/flink run some_jar_ball' to submit my job into remote 
> cluster, but I found it the java process did not exit normally while my 
> submitting action is done and job status changed into RUNNING . 
>  
> Is this a Bug to fixed ?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5295: [FLINK-8384] [streaming] Session Window Assigner w...

2018-02-05 Thread dyanarose
Github user dyanarose closed the pull request at:

https://github.com/apache/flink/pull/5295


---


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352692#comment-16352692
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
I can see it's gone through Travis and is now in master, so closing as 
requested


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Assignee: Dyana Rose
>Priority: Minor
> Fix For: 1.5.0
>
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5295: [FLINK-8384] [streaming] Session Window Assigner with Dyn...

2018-02-05 Thread dyanarose
Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
I can see it's gone through Travis and is now in master, so closing as 
requested


---


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352693#comment-16352693
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose closed the pull request at:

https://github.com/apache/flink/pull/5295


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Assignee: Dyana Rose
>Priority: Minor
> Fix For: 1.5.0
>
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-05 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352657#comment-16352657
 ] 

Fabian Hueske commented on FLINK-8538:
--

Hi [~xccui], please see my comments:
 # Yes, the idea is to make this quite modular. Not only for users that 
implement other connectors, but also for us. For instance, we'd like to reuse 
code for JSON formats when reading from Kafka, Kinesis, Pravega, Files, etc.
 # This is not clear yet and will be driven by user demand. We'll focus on the 
most popular systems for which there are source functions that we can wrap 
(Kafka, Kinesis, file system, etc.)
 # I think so too. Most of the existing sources (which are not too many) would 
need to be refactored to become more modular. For instance, it does not really 
make sense to have Kafka table sources for each version of Kafka and (Avro, 
JSON). This will become hard to maintain once we add more formats like 
ProtoBuf, CSV, or ...
 # The current version does not feature format factories, but this is something 
we are thinking about to add soon as well. A format factory would need to 
provide a format parser that parses data in the format but also returns the 
{{TypeInformation}} of the returned type.

It's great that you want to help with this effort! It would be great if we 
could have a first {{KafkaJsonTableSourceFactory}} soon, to have a Kafka source 
that can be used with the SQL CLI client. Starting from the 
+CsvTableSourceFactory+ sounds like a good idea to me. We should make sure 
though, that the things I discussed above can be later refactored without 
breaking too much API.

Thanks, Fabian

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.

2018-02-05 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8561.
-
Resolution: Fixed

> SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.
> --
>
> Key: FLINK-8561
> URL: https://issues.apache.org/jira/browse/FLINK-8561
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.

2018-02-05 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8561?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352646#comment-16352646
 ] 

Kostas Kloudas commented on FLINK-8561:
---

Merged on master 3d323ba10171da6e6416efca832b76da46ea010b

and on 1.4             33efaf74a12d7fe436a9d61d5377329496c6c6a0

> SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.
> --
>
> Key: FLINK-8561
> URL: https://issues.apache.org/jira/browse/FLINK-8561
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column

2018-02-05 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen updated FLINK-7923:
--
Fix Version/s: 1.5.0

> Support accessing subfields of a Composite element in an Object Array type 
> column
> -
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-05 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352634#comment-16352634
 ] 

Xingcan Cui commented on FLINK-8538:


Thanks for the explanation [~twalthr]. I've glanced over the PR for FLINK-8240 
and tried to comprehend the work.

1. We are going to provide a unified interface, which is described by general 
string properties, to define the table sources and sinks. Ideally, the users 
can combine the descriptors (in different dimensions such as type, format, and 
schema) to make their own table sources/sinks in the future, right?

2. Do you have any plans for how many table source/sink factories we are going 
to support?

3. I got a feeling that we must do some refactorings for the existing 
connectors since the source types and formats seem to be heavily coupled.

4. When mentioned {{format}} discovery, did you mean to implement different 
{{FormatDescriptors}} and to match them just like using the {{SPI}} for the 
{{TableSourceFactories}}?

I'd like to assist to implement the whole feature :)  For now, maybe I can take 
the {{CsvTableSourceFactory}} as a demo and imitate it to implement an 
elementary {{KafkaTableSourceFactory}}.

What do you think?

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5415: Multipath merged

2018-02-05 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/5415

Multipath merged

## What is the purpose of the change

* Add support to `FileInputFormat` to read from multiple paths. At the 
moment only a single path is supported that can be recursively read if the path 
is a directory.
* Maintain compatibility with existing input formats that extend 
`FileInputFormat`

This PR is an extension / rework of PR #1990.

## Brief change log

`FileInputFormat` is declared as `@Public` interface.

* deprecate the `protected` variable `FileInputFormat.filePath` of type 
`Path`. We need to store more than one path to support multiple paths.
* deprecate the method `FileInputFormat.getFilePath()`. We need to return a 
list of paths.
* add a private member variable to `FileInputFormat` to store multiple 
paths.
* add methods to set and get multiple paths.
* add a `deprecated` method `public boolean 
FileInputFormat.supportsMultiPaths()` with default implementation `return 
false;` that input formats can override if they support multiple paths. If the 
method returns `true`, the deprecated `filePath` variable is not used anymore 
and will always be `null`. 
* changed all public sub classes of `FileInputFormat` to not directly 
access the deprecated `filePath` variable but use the more generic 
`getFilePaths()` method.
* We cannot override `supportsMultiPaths` in `DelimitedInputFormat` and 
`BinaryInputFormat` because they are part of the public API and not final.

## Verifying this change

* added a couple of tests to validate that splits and statistics are 
correctly generated.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **YES**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **yes**
  - If yes, how is the feature documented? Only in JavaDocs so far.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink multipath-merged

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5415.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5415


commit 13e84ff078d6112648dace7559139dea85f46c5d
Author: Phetsarath, Sourigna 
Date:   2016-05-13T16:24:46Z

[FLINK-3655] Multiple File Paths for FileInputFormat.

commit d46b32726e3043debc94d097f2dc296841689d36
Author: Fabian Hueske 
Date:   2018-02-05T13:40:50Z

[FLINK-3655] Multiple File Paths for FileInputFormat.

- Reverted API-breaking changes.




---


[jira] [Updated] (FLINK-7923) Support accessing subfields of a Composite element in an Object Array type column

2018-02-05 Thread Shuyi Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shuyi Chen updated FLINK-7923:
--
Issue Type: New Feature  (was: Bug)
   Summary: Support accessing subfields of a Composite element in an Object 
Array type column  (was: SQL parser exception when accessing subfields of a 
Composite element in an Object Array type column)

> Support accessing subfields of a Composite element in an Object Array type 
> column
> -
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8243) OrcTableSource should recursively read all files in nested directories of the input path.

2018-02-05 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-8243.

Resolution: Fixed

Implemented for 1.4.1 with e5c1261919765876b3ad873abbd9f21bee2fe12b
Implemented for 1.5.0 with de3d85ba19d11ad8a7ab38b30b74953421e6383d

> OrcTableSource should recursively read all files in nested directories of the 
> input path.
> -
>
> Key: FLINK-8243
> URL: https://issues.apache.org/jira/browse/FLINK-8243
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{OrcTableSource}} only reads files on the first level of the provided 
> input path. 
> Hive's default behavior is to recursively read all nested files in the input 
> path. We should follow this behavior and add a switch to disable recursive 
> reading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8242) ClassCastException in OrcTableSource.toOrcPredicate

2018-02-05 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske closed FLINK-8242.

Resolution: Fixed

Fixed for 1.4.1 with 19fcd5ebadabdebc4d56716920937f229f06f5d3
Fixed for 1.5.0 with 63a19e879bfb4c4981f151e7b50481df094dcb09

> ClassCastException in OrcTableSource.toOrcPredicate
> ---
>
> Key: FLINK-8242
> URL: https://issues.apache.org/jira/browse/FLINK-8242
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{OrcTableSource}} tries to cast all predicate literals to 
> {{Serializable}} in its {{toOrcPredicate()}} method. This fails with a 
> {{ClassCastException}} if a literal is not serializable.
> Instead of failing, we should ignore the predicate and log a WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5414: Cep inv

2018-02-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5414


---


[jira] [Commented] (FLINK-8243) OrcTableSource should recursively read all files in nested directories of the input path.

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352600#comment-16352600
 ] 

ASF GitHub Bot commented on FLINK-8243:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5344


> OrcTableSource should recursively read all files in nested directories of the 
> input path.
> -
>
> Key: FLINK-8243
> URL: https://issues.apache.org/jira/browse/FLINK-8243
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{OrcTableSource}} only reads files on the first level of the provided 
> input path. 
> Hive's default behavior is to recursively read all nested files in the input 
> path. We should follow this behavior and add a switch to disable recursive 
> reading.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8242) ClassCastException in OrcTableSource.toOrcPredicate

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8242?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352601#comment-16352601
 ] 

ASF GitHub Bot commented on FLINK-8242:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5345


> ClassCastException in OrcTableSource.toOrcPredicate
> ---
>
> Key: FLINK-8242
> URL: https://issues.apache.org/jira/browse/FLINK-8242
> Project: Flink
>  Issue Type: Bug
>  Components: Batch Connectors and Input/Output Formats
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> The {{OrcTableSource}} tries to cast all predicate literals to 
> {{Serializable}} in its {{toOrcPredicate()}} method. This fails with a 
> {{ClassCastException}} if a literal is not serializable.
> Instead of failing, we should ignore the predicate and log a WARN message.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5345: [FLINK-8242] [orc] Fix predicate push-down of OrcT...

2018-02-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5345


---


[GitHub] flink pull request #5344: [FLINK-8243] [orc] OrcTableSource reads input path...

2018-02-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5344


---


[GitHub] flink pull request #5414: Cep inv

2018-02-05 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5414#discussion_r166021070
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -519,8 +498,9 @@ public int hashCode() {
private final ValueTimeWrapper valueTime;
private final Set> edges;
private final SharedBufferPage page;
+
private int referenceCounter;
-   private transient int entryId;
+   private int entryId;
--- End diff --

we were not serializing it anyway and the class is not `Serializable`.


---


[GitHub] flink pull request #5414: Cep inv

2018-02-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5414#discussion_r166018249
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -609,22 +580,27 @@ public String toString() {
 
@Override
public boolean equals(Object obj) {
-   if (obj instanceof SharedBufferEntry) {
-   @SuppressWarnings("unchecked")
-   SharedBufferEntry other = 
(SharedBufferEntry) obj;
+   if (!(obj instanceof SharedBufferEntry)) {
+   return false;
+   }
 
-   return valueTime.equals(other.valueTime) &&
+   @SuppressWarnings("unchecked")
+   SharedBufferEntry other = (SharedBufferEntry) obj;
+
+   return valueTime.equals(other.valueTime) &&
getKey().equals(other.getKey()) &&
referenceCounter == 
other.referenceCounter &&
-   edges.equals(other.edges);
-   } else {
-   return false;
-   }
+   Objects.equals(edges, other.edges);
}
 
@Override
public int hashCode() {
-   return Objects.hash(valueTime, getKey(), 
referenceCounter, edges);
+   int result = 1;
--- End diff --

Why not use `Objects.hash()` here anymore?


---


[GitHub] flink pull request #5414: Cep inv

2018-02-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5414#discussion_r166020117
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -542,15 +542,12 @@ public void addEdge(SharedBufferEdge edge) {
/**
 * Remove edges with the specified targets.
 */
-   private void removeEdges(final List> 
prunedEntries) {
-   Iterator> itor = 
edges.iterator();
-   while (itor.hasNext()) {
-   SharedBufferEdge edge = itor.next();
-   for (SharedBufferEntry prunedEntry : 
prunedEntries) {
-   if (prunedEntry == edge.getTarget()) {
-   itor.remove();
-   break;
-   }
+   private void removeEdges(final Set> 
prunedEntries) {
+   Iterator> it = edges.iterator();
+   while (it.hasNext()) {
+   SharedBufferEdge edge = it.next();
+   if (prunedEntries.contains(edge.getTarget())) {
--- End diff --

Very subtle ... 😅 The `equals()` is used internally in `Set.contains()`.


---


[GitHub] flink pull request #5414: Cep inv

2018-02-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5414#discussion_r166017493
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -437,72 +425,63 @@ public void add(final ValueTimeWrapper valueTime, 
final SharedBufferEntry> prunedEntries) {
-   Iterator>> iterator = entries.entrySet().iterator();
-   boolean continuePruning = true;
-
-   while (iterator.hasNext() && continuePruning) {
-   SharedBufferEntry entry = 
iterator.next().getValue();
-
+   private void prune(final long pruningTimestamp, final 
List> prunedEntries) {
+   Iterator>> it = entries.entrySet().iterator();
+   while (it.hasNext()) {
+   SharedBufferEntry entry = 
it.next().getValue();
if (entry.getValueTime().getTimestamp() <= 
pruningTimestamp) {
prunedEntries.add(entry);
-   iterator.remove();
-   } else {
-   continuePruning = false;
+   it.remove();
--- End diff --

was the early cancel not working before? Or: why was `continuePruning` 
removed.


---


[GitHub] flink pull request #5414: Cep inv

2018-02-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5414#discussion_r166017932
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -519,8 +498,9 @@ public int hashCode() {
private final ValueTimeWrapper valueTime;
private final Set> edges;
private final SharedBufferPage page;
+
private int referenceCounter;
-   private transient int entryId;
+   private int entryId;
--- End diff --

Why is this not `transient` anymore?


---


[GitHub] flink pull request #5414: Cep inv

2018-02-05 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5414#discussion_r166018025
  
--- Diff: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
 ---
@@ -578,14 +555,8 @@ private void removeEdges(final 
List> prunedEntries) {
}
}
 
-   public boolean remove() {
-   if (page != null) {
-   page.remove(valueTime);
-
-   return true;
-   } else {
-   return false;
-   }
+   public void remove() {
+   page.remove(valueTime);
--- End diff --

Did we never need the check before?


---


[jira] [Resolved] (FLINK-7797) Add support for windowed outer joins for streaming tables

2018-02-05 Thread Xingcan Cui (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui resolved FLINK-7797.

   Resolution: Fixed
Fix Version/s: 1.5.0

> Add support for windowed outer joins for streaming tables
> -
>
> Key: FLINK-7797
> URL: https://issues.apache.org/jira/browse/FLINK-7797
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, only windowed inner joins for streaming tables are supported.
> This issue is about adding support for windowed LEFT, RIGHT, and FULL OUTER 
> joins.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5414: Cep inv

2018-02-05 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5414

Cep inv

R @aljoscha 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink cep-inv

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5414.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5414


commit 41dba903691b9b3c4e8fcc8831a1c57930ee3071
Author: kkloudas 
Date:   2018-01-25T09:59:30Z

[hotfix] [cep] SharedBuffer refactoring.

commit 7a3d9327e0877fcd9496ea2b367ba69e6220568c
Author: kkloudas 
Date:   2018-02-05T13:36:53Z

[FLINK-8561] [cep] Fix SharedBuffer.removeEdges to use .equals().




---


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352575#comment-16352575
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
Thanks a lot for working on this and iterating so quickly!  

I merged this but could you please close the PR if it doesn't close 
automatically?


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Assignee: Dyana Rose
>Priority: Minor
> Fix For: 1.5.0
>
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5295: [FLINK-8384] [streaming] Session Window Assigner with Dyn...

2018-02-05 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5295
  
Thanks a lot for working on this and iterating so quickly! 👍 

I merged this but could you please close the PR if it doesn't close 
automatically?


---


[jira] [Assigned] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-8384:
---

Assignee: Dyana Rose

> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Assignee: Dyana Rose
>Priority: Minor
> Fix For: 1.5.0
>
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8384.
---
Resolution: Fixed

Implemented on master in
ede4c0751b630503605248e8d22f29977f58624a

> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Assignee: Dyana Rose
>Priority: Minor
> Fix For: 1.5.0
>
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-05 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8384:

Fix Version/s: 1.5.0

> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
> Fix For: 1.5.0
>
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7803) Update savepoint Documentation

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352541#comment-16352541
 ] 

ASF GitHub Bot commented on FLINK-7803:
---

Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/4809#discussion_r166008336
  
--- Diff: docs/ops/state/savepoints.md ---
@@ -120,6 +120,10 @@ This will atomically trigger a savepoint for the job 
with ID `:jobid` and cancel
 
 If you don't specify a target directory, you need to have [configured a 
default directory](#configuration). Otherwise, cancelling the job with a 
savepoint will fail.
 
+
--- End diff --

 


> Update savepoint Documentation
> --
>
> Key: FLINK-7803
> URL: https://issues.apache.org/jira/browse/FLINK-7803
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Razvan
>Assignee: Razvan
>Priority: Major
>
> Can you please update 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
>  to specify the savepoint location *MUST* always be a location accessible by 
> all hosts?
> I spent quite some time believing it'S a bug and trying to find solutions, 
> see https://issues.apache.org/jira/browse/FLINK-7750. It's not obvious in the 
> current documentation and other might waste time also believing it's an 
> actual issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4809: [FLINK-7803][Documentation] Add missing savepoint ...

2018-02-05 Thread uce
Github user uce commented on a diff in the pull request:

https://github.com/apache/flink/pull/4809#discussion_r166008336
  
--- Diff: docs/ops/state/savepoints.md ---
@@ -120,6 +120,10 @@ This will atomically trigger a savepoint for the job 
with ID `:jobid` and cancel
 
 If you don't specify a target directory, you need to have [configured a 
default directory](#configuration). Otherwise, cancelling the job with a 
savepoint will fail.
 
+
--- End diff --

👍 


---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352526#comment-16352526
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai please see changes and couple questions.


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-05 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai please see changes and couple questions.


---


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352520#comment-16352520
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , thanks for reviews!

I understand your concerns and I should deduplicate some common utils in 
these tests. I will do that tomorrow together with other comments!


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7803) Update savepoint Documentation

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352524#comment-16352524
 ] 

ASF GitHub Bot commented on FLINK-7803:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4809#discussion_r166001086
  
--- Diff: docs/ops/state/savepoints.md ---
@@ -120,6 +120,10 @@ This will atomically trigger a savepoint for the job 
with ID `:jobid` and cancel
 
 If you don't specify a target directory, you need to have [configured a 
default directory](#configuration). Otherwise, cancelling the job with a 
savepoint will fail.
 
+
--- End diff --

Should be something like:
```

Attention: targetDirectory has to be a 
location accessible by both the JobManager(s) and TaskManager(s), e.g., a 
location on a distributed file system.

```
because markdown is not rendered within HTML tags.

Also, the *warning* you have added is currently in the *Cancel Job with 
Savepoint* section. I think it should be moved to the section above (*Trigger a 
Savepoint*), or somewhere else since it does not only apply to the cancelation 
case.

@uce 


> Update savepoint Documentation
> --
>
> Key: FLINK-7803
> URL: https://issues.apache.org/jira/browse/FLINK-7803
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Razvan
>Assignee: Razvan
>Priority: Major
>
> Can you please update 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
>  to specify the savepoint location *MUST* always be a location accessible by 
> all hosts?
> I spent quite some time believing it'S a bug and trying to find solutions, 
> see https://issues.apache.org/jira/browse/FLINK-7750. It's not obvious in the 
> current documentation and other might waste time also believing it's an 
> actual issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5400: [FLINK-8547][network] Implement CheckpointBarrierHandler ...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/5400
  
@pnowojski , thanks for reviews!

I understand your concerns and I should deduplicate some common utils in 
these tests. I will do that tomorrow together with other comments!


---


[GitHub] flink pull request #4809: [FLINK-7803][Documentation] Add missing savepoint ...

2018-02-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/4809#discussion_r166001086
  
--- Diff: docs/ops/state/savepoints.md ---
@@ -120,6 +120,10 @@ This will atomically trigger a savepoint for the job 
with ID `:jobid` and cancel
 
 If you don't specify a target directory, you need to have [configured a 
default directory](#configuration). Otherwise, cancelling the job with a 
savepoint will fail.
 
+
--- End diff --

Should be something like:
```

Attention: targetDirectory has to be a 
location accessible by both the JobManager(s) and TaskManager(s), e.g., a 
location on a distributed file system.

```
because markdown is not rendered within HTML tags.

Also, the *warning* you have added is currently in the *Cancel Job with 
Savepoint* section. I think it should be moved to the section above (*Trigger a 
Savepoint*), or somewhere else since it does not only apply to the cancelation 
case.

@uce 


---


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165998584
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ---
@@ -131,10 +131,14 @@ public StreamInputProcessor(
long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
-   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-   + " must be positive or -1 
(infinite)");
+   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
--- End diff --

yes, i will consider a proper way


---


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352509#comment-16352509
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165998584
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 ---
@@ -131,10 +131,14 @@ public StreamInputProcessor(
long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
if (!(maxAlign == -1 || maxAlign > 0)) {
throw new IllegalConfigurationException(
-   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
-   + " must be positive or -1 
(infinite)");
+   
TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()
--- End diff --

yes, i will consider a proper way


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8020) Deadlock found in Async I/O operator

2018-02-05 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8020:

Priority: Critical  (was: Blocker)

> Deadlock found in Async I/O operator
> 
>
> Key: FLINK-8020
> URL: https://issues.apache.org/jira/browse/FLINK-8020
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming, Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode
>Reporter: Weihua Jiang
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jstack53009(2).out, jstack67976-2.log
>
>
> Our streaming job run into trouble in these days after a long time smooth 
> running. One issue we found is 
> [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this 
> one.
> After analyzing the jstack, we believe  we found a DEAD LOCK in flink:
> 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" 
> hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940.
> 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: 
> hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock 
> 0x0007b6aa1788. 
> This DEADLOCK made the job fail to proceed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352505#comment-16352505
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165997853
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
+ * flow control.
+ */
+@Internal
+public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedBarrierBuffer.class);
+
+   /** The gate that the buffer draws its input from. */
+   private final InputGate inputGate;
+
+   /** Flags that indicate whether a channel is currently 
blocked/buffered. */
+   private final boolean[] blockedChannels;
+
+   /** The total number of channels that this buffer handles data from. */
+   private final int totalNumberOfInputChannels;
+
+   /** The utility to buffer blocked data in the memory queue. */
+   private final CreditBasedBufferBlocker bufferBlocker;
+
+   /**
+* The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
+* from the input gate.
+*/
+   private final ArrayDeque queuedBuffered;
--- End diff --

The current implementation keeps the same logic with `BarrierBuffer`. I am 
wondering whether it can make sense if only keeping one 
`ArrayDeque` for holding all blocking buffers for different 
checkpoint ids. Especially for the uncommon case mentioned on line 496 in 
`BarrierBuffer`. I will double check that logic and reply to you later.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue 

[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165997853
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
+ * flow control.
+ */
+@Internal
+public class CreditBasedBarrierBuffer implements CheckpointBarrierHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedBarrierBuffer.class);
+
+   /** The gate that the buffer draws its input from. */
+   private final InputGate inputGate;
+
+   /** Flags that indicate whether a channel is currently 
blocked/buffered. */
+   private final boolean[] blockedChannels;
+
+   /** The total number of channels that this buffer handles data from. */
+   private final int totalNumberOfInputChannels;
+
+   /** The utility to buffer blocked data in the memory queue. */
+   private final CreditBasedBufferBlocker bufferBlocker;
+
+   /**
+* The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
+* from the input gate.
+*/
+   private final ArrayDeque queuedBuffered;
--- End diff --

The current implementation keeps the same logic with `BarrierBuffer`. I am 
wondering whether it can make sense if only keeping one 
`ArrayDeque` for holding all blocking buffers for different 
checkpoint ids. Especially for the uncommon case mentioned on line 496 in 
`BarrierBuffer`. I will double check that logic and reply to you later.


---


[GitHub] flink pull request #5413: [hotfix][table][tests] Set @Ignore description for...

2018-02-05 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5413

[hotfix][table][tests] Set @Ignore description for RowCsvInputFormatT…

Trivial change that moves the reasoning for `@Ignore` from a comment into 
the annotation itself.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink hotfix_ignore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5413.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5413


commit 9a189ee22769ca34ee71d3145ee48d5cecd5271c
Author: zentol 
Date:   2018-02-05T15:01:48Z

[hotfix][table][tests] Set @Ignore description for 
RowCsvInputFormatTest#testParserCorrectness




---


[jira] [Commented] (FLINK-8471) Add Flip-6 configuration switch

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8471?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352489#comment-16352489
 ] 

ASF GitHub Bot commented on FLINK-8471:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5334
  
I have a usability concern.
It's possible to run
```
FLINK_MODE=flip6 bin/start-cluster.sh
```
This will start a cluster in flilp6 mode.

However,
```
FLINK_MODE=flip6 bin/flink list
```
won't work.

If the first way is not intended, then it doesn't matter.


> Add Flip-6 configuration switch
> ---
>
> Key: FLINK-8471
> URL: https://issues.apache.org/jira/browse/FLINK-8471
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should add a configuration switch to activate and de-activate the Flip-6 
> code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5334: [FLINK-8471] [flip6] Introduce configuration switch for F...

2018-02-05 Thread GJL
Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5334
  
I have a usability concern.
It's possible to run
```
FLINK_MODE=flip6 bin/start-cluster.sh
```
This will start a cluster in flilp6 mode.

However,
```
FLINK_MODE=flip6 bin/flink list
```
won't work.

If the first way is not intended, then it doesn't matter.


---


[jira] [Commented] (FLINK-8534) if insert too much BucketEntry into one bucket in join of iteration will cause a error (Caused : java.io.FileNotFoundException release file error)

2018-02-05 Thread zhu.qing (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352487#comment-16352487
 ] 

zhu.qing commented on FLINK-8534:
-

Do I need provide more information?





-- 

-- 

Best Regards

Qing zhu

East China Normal University


> if insert too much BucketEntry into one bucket in join of iteration will 
> cause a error (Caused : java.io.FileNotFoundException release file error)
> --
>
> Key: FLINK-8534
> URL: https://issues.apache.org/jira/browse/FLINK-8534
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
> Environment: windows, intellij idea, 8g ram, 4core i5 cpu, Flink 
> 1.4.0, and parallelism = 2 will cause problem and others will not.
>Reporter: zhu.qing
>Priority: Major
> Attachments: T2AdjSetBfs.java
>
>
> When insert too much entry into bucket (MutableHashTable insertBucketEntry() 
> line 1054 more than 255) will cause  spillPartition() (HashPartition line 
> 317). So 
> this.buildSideChannel = ioAccess.createBlockChannelWriter(targetChannel, 
> bufferReturnQueue); 
> And in 
> prepareNextPartition() of ReOpenableMutableHashTable (line 156)
> furtherPartitioning = true; 
> so in 
> finalizeProbePhase() in HashPartition (line 367)
>  this.probeSideChannel.close();
> //the file will be delete 
>  this.buildSideChannel.deleteChannel();
>  this.probeSideChannel.deleteChannel();
> after deleteChannel the next iteartion will fail.
>  
> And I use web-google(SNAP) as dataset. 
>  
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>  at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
>  at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>  at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>  at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>  at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>  at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Channel to path 
> 'C:\Users\sanquan.qz\AppData\Local\Temp\flink-io-5af23edc-1ec0-4718-87a5-916ee022a8be\fc08af25b6f879b8e7bb24291c47ea1d.18.channel'
>  could not be opened.
>  at 
> org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.(AbstractFileIOChannel.java:61)
>  at 
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.(AsynchronousFileIOChannel.java:86)
>  at 
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.(AsynchronousBulkBlockReader.java:46)
>  at 
> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.(AsynchronousBulkBlockReader.java:39)
>  at 
> org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBulkBlockChannelReader(IOManagerAsync.java:294)
>  at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:880)
>  at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:637)
>  at 
> org.apache.flink.runtime.operators.hash.ReOpenableMutableHashTable.prepareNextPartition(ReOpenableMutableHashTable.java:170)
>  at 
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:675)
>  at 
> org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator.callWithNextKey(NonReusingBuildFirstHashJoinIterator.java:117)
>  at 
> org.apache.flink.runtime.operators.AbstractCachedBuildSideJoinDriver.run(AbstractCachedBuildSideJoinDriver.java:176)
>  at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>  at 
> org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:145)
>  at 
> org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:93)
>  at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>  at java.lang.Thread.run(Thread.java:745)
> 

[jira] [Commented] (FLINK-7758) Fix bug Kafka09Fetcher add offset metrics

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352461#comment-16352461
 ] 

ASF GitHub Bot commented on FLINK-7758:
---

Github user yew1eb closed the pull request at:

https://github.com/apache/flink/pull/4769


> Fix bug  Kafka09Fetcher add offset metrics 
> ---
>
> Key: FLINK-7758
> URL: https://issues.apache.org/jira/browse/FLINK-7758
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Metrics
>Affects Versions: 1.3.2
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Major
>
> in Kafka09Fetcher, add _KafkaConsumer_ kafkaMetricGroup. 
> No judgment that the useMetrics variable is true.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352458#comment-16352458
 ] 

ASF GitHub Bot commented on FLINK-7984:
---

Github user yew1eb closed the pull request at:

https://github.com/apache/flink/pull/5072


> Bump snappy-java to 1.1.4
> -
>
> Key: FLINK-7984
> URL: https://issues.apache.org/jira/browse/FLINK-7984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.5.0
>
>
> Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older 
> version has some issues like memory leak 
> (https://github.com/xerial/snappy-java/issues/91).
> Snappy Java [Release 
> Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #4769: [FLINK-7758][kafka][hotfix] Fix bug Kafka09Fetcher...

2018-02-05 Thread yew1eb
Github user yew1eb closed the pull request at:

https://github.com/apache/flink/pull/4769


---


[GitHub] flink pull request #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4

2018-02-05 Thread yew1eb
Github user yew1eb closed the pull request at:

https://github.com/apache/flink/pull/5072


---


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352441#comment-16352441
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983714
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
--- End diff --

sure


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983714
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import 
org.apache.flink.runtime.checkpoint.decline.AlignmentLimitExceededException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException;
+import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException;
+import 
org.apache.flink.runtime.checkpoint.decline.InputEndOfStreamException;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.streaming.runtime.io.CreditBasedBufferBlocker.BufferOrEventSequence;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * The barrier buffer is {@link CheckpointBarrierHandler} that blocks 
inputs with barriers until
+ * all inputs have received the barrier for a given checkpoint.
+ *
+ * The BarrierBuffer continues receiving buffers from the blocked 
channels and buffered them
+ * internally until the blocks are released. It will not cause deadlocks 
based on credit-based
--- End diff --

sure


---


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352436#comment-16352436
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983496
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -184,6 +184,18 @@
key("taskmanager.network.detailed-metrics")
.defaultValue(false);
 
+   /**
+* Config parameter defining whether to spill data for channels with 
barrier or not in exactly-once
+* mode based on credit-based flow control.
+*
+* @deprecated Will be removed for Flink 1.6 when the old code will be 
dropped in favour of
+* credit-based flow control.
+*/
+   @Deprecated
+   public static final ConfigOption 
EXACTLY_ONCE_BLOCKING_DATA_ENABLED =
+   key("taskmanager.exactly-once.blocking.data.enabled")
+   .defaultValue(false);
--- End diff --

yes, the default value should be true, but I think it should be changed 
after the `FLINK-7456` is merged to make the credit-based work.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983496
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java 
---
@@ -184,6 +184,18 @@
key("taskmanager.network.detailed-metrics")
.defaultValue(false);
 
+   /**
+* Config parameter defining whether to spill data for channels with 
barrier or not in exactly-once
+* mode based on credit-based flow control.
+*
+* @deprecated Will be removed for Flink 1.6 when the old code will be 
dropped in favour of
+* credit-based flow control.
+*/
+   @Deprecated
+   public static final ConfigOption 
EXACTLY_ONCE_BLOCKING_DATA_ENABLED =
+   key("taskmanager.exactly-once.blocking.data.enabled")
+   .defaultValue(false);
--- End diff --

yes, the default value should be true, but I think it should be changed 
after the `FLINK-7456` is merged to make the credit-based work.


---


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352438#comment-16352438
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983607
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
--- End diff --

the checkstyle failures are fixed


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-05 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/5400#discussion_r165983607
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBuffer.java
 ---
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
--- End diff --

the checkstyle failures are fixed


---


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352418#comment-16352418
 ] 

ASF GitHub Bot commented on FLINK-7608:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5161
  
merging.


> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.5.0
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8559) Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to get stuck

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352419#comment-16352419
 ] 

ASF GitHub Bot commented on FLINK-8559:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5412
  
merging.


> Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to 
> get stuck
> -
>
> Key: FLINK-8559
> URL: https://issues.apache.org/jira/browse/FLINK-8559
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
>
> In the {{RocksDBKeyedStatebackend#snapshotIncrementally}} we can find this 
> code
>  
> {code:java}
> final RocksDBIncrementalSnapshotOperation snapshotOperation =
>   new RocksDBIncrementalSnapshotOperation<>(
>   this,
>   checkpointStreamFactory,
>   checkpointId,
>   checkpointTimestamp);
> snapshotOperation.takeSnapshot();
> return new FutureTask(
>   new Callable() {
>   @Override
>   public KeyedStateHandle call() throws Exception {
>   return snapshotOperation.materializeSnapshot();
>   }
>   }
> ) {
>   @Override
>   public boolean cancel(boolean mayInterruptIfRunning) {
>   snapshotOperation.stop();
>   return super.cancel(mayInterruptIfRunning);
>   }
>   @Override
>   protected void done() {
>   snapshotOperation.releaseResources(isCancelled());
>   }
> };
> {code}
> In the constructor of RocksDBIncrementalSnapshotOperation we call 
> {{aquireResource()}} on the RocksDB {{ResourceGuard}}. If 
> {{snapshotOperation.takeSnapshot()}} fails with an exception these resources 
> are never released. When the task is shutdown due to the exception it will 
> get stuck on releasing RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5161: [FLINK-7608][metric] Refactor latency statistics metric

2018-02-05 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5161
  
merging.


---


[GitHub] flink issue #5412: [FLINK-8559][RocksDB] Release resources if snapshot opera...

2018-02-05 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5412
  
merging.


---


[jira] [Commented] (FLINK-7984) Bump snappy-java to 1.1.4

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352409#comment-16352409
 ] 

ASF GitHub Bot commented on FLINK-7984:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5072
  
@yew1eb Could you close the PR? The issue was addressed in 
f1e4d25c11a678688064492d50ffad38c39ea877.


> Bump snappy-java to 1.1.4
> -
>
> Key: FLINK-7984
> URL: https://issues.apache.org/jira/browse/FLINK-7984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.5.0
>
>
> Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older 
> version has some issues like memory leak 
> (https://github.com/xerial/snappy-java/issues/91).
> Snappy Java [Release 
> Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5072: [FLINK-7984][build] Bump snappy-java to 1.1.4

2018-02-05 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5072
  
@yew1eb Could you close the PR? The issue was addressed in 
f1e4d25c11a678688064492d50ffad38c39ea877.


---


[jira] [Closed] (FLINK-7984) Bump snappy-java to 1.1.4

2018-02-05 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7984.
---
Resolution: Fixed

master: f1e4d25c11a678688064492d50ffad38c39ea877

> Bump snappy-java to 1.1.4
> -
>
> Key: FLINK-7984
> URL: https://issues.apache.org/jira/browse/FLINK-7984
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Affects Versions: 1.4.0
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.5.0
>
>
> Upgrade the snappy java version to 1.1.4(the latest, May, 2017). The older 
> version has some issues like memory leak 
> (https://github.com/xerial/snappy-java/issues/91).
> Snappy Java [Release 
> Notes|https://github.com/xerial/snappy-java/blob/master/Milestone.md]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8503) Port TaskManagerLogHandler to new REST endpoint

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352385#comment-16352385
 ] 

ASF GitHub Bot commented on FLINK-8503:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r165972923
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
+import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static 

[GitHub] flink pull request #5353: [FLINK-8503] [flip6] Display TaskExecutor logs and...

2018-02-05 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5353#discussion_r165972923
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandler.java
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.AbstractHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheLoader;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.LoadingCache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.RemovalNotification;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
+import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+/**
+ * Base class for serving files from the {@link 

[jira] [Commented] (FLINK-8172) Remove unnecessary synchronisation in RecordSerializer

2018-02-05 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352373#comment-16352373
 ] 

Chesnay Schepler commented on FLINK-8172:
-

[~pnowojski] Was this issue subsumed by FLINK-8178?

> Remove unnecessary synchronisation in RecordSerializer
> --
>
> Key: FLINK-8172
> URL: https://issues.apache.org/jira/browse/FLINK-8172
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 1.5.0
>
>
> While writing the records, RecordSerializer is the only owner of the `Buffer` 
> into which data are written. Yet we are synchronisation twice per record 
> while accessing MemorySegment. Removing this synchronisation speeds up the 
> Network throughput in point to point benchmark by a factor of two (from 
> ~12500records/ms up to 23000 records/ms).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.

2018-02-05 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8561:

Affects Version/s: 1.5.0

> SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.
> --
>
> Key: FLINK-8561
> URL: https://issues.apache.org/jira/browse/FLINK-8561
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.

2018-02-05 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8561:

Fix Version/s: 1.5.0

> SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.
> --
>
> Key: FLINK-8561
> URL: https://issues.apache.org/jira/browse/FLINK-8561
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0, 1.4.1
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8561) SharedBuffer line 573 uses == to compare BufferEntries instead of .equals.

2018-02-05 Thread Kostas Kloudas (JIRA)
Kostas Kloudas created FLINK-8561:
-

 Summary: SharedBuffer line 573 uses == to compare BufferEntries 
instead of .equals.
 Key: FLINK-8561
 URL: https://issues.apache.org/jira/browse/FLINK-8561
 Project: Flink
  Issue Type: Bug
  Components: CEP
Affects Versions: 1.4.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.4.1






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5544) Implement Internal Timer Service in RocksDB

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352365#comment-16352365
 ] 

ASF GitHub Bot commented on FLINK-5544:
---

Github user orsher commented on the issue:

https://github.com/apache/flink/pull/3359
  
Hi,

Is there any progress here?
This feature will made our life much easier!


> Implement Internal Timer Service in RocksDB
> ---
>
> Key: FLINK-5544
> URL: https://issues.apache.org/jira/browse/FLINK-5544
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Xiaogang Shi
>Assignee: Xiaogang Shi
>Priority: Major
>
> Now the only implementation of internal timer service is 
> HeapInternalTimerService which stores all timers in memory. In the cases 
> where the number of keys is very large, the timer service will cost too much 
> memory. A implementation which stores timers in RocksDB seems good to deal 
> with these cases.
> It might be a little challenging to implement a RocksDB timer service because 
> the timers are accessed in different ways. When timers are triggered, we need 
> to access timers in the order of timestamp. But when performing checkpoints, 
> we must have a method to obtain all timers of a given key group.
> A good implementation, as suggested by [~StephanEwen], follows the idea of 
> merge sorting. We can store timers in RocksDB with the format 
> {{KEY_GROUP#TIMER#KEY}}. In this way, the timers under a key group are put 
> together and are sorted. 
> Then we can deploy an in-memory heap which keeps the first timer of each key 
> group to get the next timer to trigger. When a key group's first timer is 
> updated, we can efficiently update the heap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #3359: [FLINK-5544][streaming] Add InternalTimerService implemen...

2018-02-05 Thread orsher
Github user orsher commented on the issue:

https://github.com/apache/flink/pull/3359
  
Hi,

Is there any progress here?
This feature will made our life much easier!


---


[jira] [Commented] (FLINK-8559) Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to get stuck

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352354#comment-16352354
 ] 

ASF GitHub Bot commented on FLINK-8559:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5412
  
LGTM


> Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to 
> get stuck
> -
>
> Key: FLINK-8559
> URL: https://issues.apache.org/jira/browse/FLINK-8559
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
>
> In the {{RocksDBKeyedStatebackend#snapshotIncrementally}} we can find this 
> code
>  
> {code:java}
> final RocksDBIncrementalSnapshotOperation snapshotOperation =
>   new RocksDBIncrementalSnapshotOperation<>(
>   this,
>   checkpointStreamFactory,
>   checkpointId,
>   checkpointTimestamp);
> snapshotOperation.takeSnapshot();
> return new FutureTask(
>   new Callable() {
>   @Override
>   public KeyedStateHandle call() throws Exception {
>   return snapshotOperation.materializeSnapshot();
>   }
>   }
> ) {
>   @Override
>   public boolean cancel(boolean mayInterruptIfRunning) {
>   snapshotOperation.stop();
>   return super.cancel(mayInterruptIfRunning);
>   }
>   @Override
>   protected void done() {
>   snapshotOperation.releaseResources(isCancelled());
>   }
> };
> {code}
> In the constructor of RocksDBIncrementalSnapshotOperation we call 
> {{aquireResource()}} on the RocksDB {{ResourceGuard}}. If 
> {{snapshotOperation.takeSnapshot()}} fails with an exception these resources 
> are never released. When the task is shutdown due to the exception it will 
> get stuck on releasing RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5412: [FLINK-8559][RocksDB] Release resources if snapshot opera...

2018-02-05 Thread StefanRRichter
Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/5412
  
LGTM


---


[jira] [Commented] (FLINK-8559) Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to get stuck

2018-02-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8559?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16352344#comment-16352344
 ] 

ASF GitHub Bot commented on FLINK-8559:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5412

[FLINK-8559][RocksDB] Release resources if snapshot operation fails

## What is the purpose of the change

This PR ensures that RocksDB resources are released if 
`RocksDBIncrementalSnapshotOperation#takeSnapshot` throws an Exception.

We now catch the exception, cancel the SnapshotOperation, and re-throw the 
original exception.

## Verifying this change

I've verified this manually by running 
`JobManagerHACheckpointRecoveryITCase` on Windows where `takeSnapshot` fails 
due to FLINK-8557.

I couldn't come up with proper test. The method hardly does anything in the 
first place and every solution i could think of would depend a lot on 
implementation details (like mocking `Checkpoint.create()` to throw an 
exception).

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

@StefanRRichter 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8559

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5412


commit 05f0ff5e353117894af4ba7dc096c3256d80450b
Author: zentol 
Date:   2018-02-05T12:15:29Z

[FLINK-8559][RocksDB] Release resources if snapshot operation fails




> Exceptions in RocksDBIncrementalSnapshotOperation#takeSnapshot cause job to 
> get stuck
> -
>
> Key: FLINK-8559
> URL: https://issues.apache.org/jira/browse/FLINK-8559
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Priority: Blocker
>
> In the {{RocksDBKeyedStatebackend#snapshotIncrementally}} we can find this 
> code
>  
> {code:java}
> final RocksDBIncrementalSnapshotOperation snapshotOperation =
>   new RocksDBIncrementalSnapshotOperation<>(
>   this,
>   checkpointStreamFactory,
>   checkpointId,
>   checkpointTimestamp);
> snapshotOperation.takeSnapshot();
> return new FutureTask(
>   new Callable() {
>   @Override
>   public KeyedStateHandle call() throws Exception {
>   return snapshotOperation.materializeSnapshot();
>   }
>   }
> ) {
>   @Override
>   public boolean cancel(boolean mayInterruptIfRunning) {
>   snapshotOperation.stop();
>   return super.cancel(mayInterruptIfRunning);
>   }
>   @Override
>   protected void done() {
>   snapshotOperation.releaseResources(isCancelled());
>   }
> };
> {code}
> In the constructor of RocksDBIncrementalSnapshotOperation we call 
> {{aquireResource()}} on the RocksDB {{ResourceGuard}}. If 
> {{snapshotOperation.takeSnapshot()}} fails with an exception these resources 
> are never released. When the task is shutdown due to the exception it will 
> get stuck on releasing RocksDB.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5412: [FLINK-8559][RocksDB] Release resources if snapsho...

2018-02-05 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5412

[FLINK-8559][RocksDB] Release resources if snapshot operation fails

## What is the purpose of the change

This PR ensures that RocksDB resources are released if 
`RocksDBIncrementalSnapshotOperation#takeSnapshot` throws an Exception.

We now catch the exception, cancel the SnapshotOperation, and re-throw the 
original exception.

## Verifying this change

I've verified this manually by running 
`JobManagerHACheckpointRecoveryITCase` on Windows where `takeSnapshot` fails 
due to FLINK-8557.

I couldn't come up with proper test. The method hardly does anything in the 
first place and every solution i could think of would depend a lot on 
implementation details (like mocking `Checkpoint.create()` to throw an 
exception).

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)

@StefanRRichter 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8559

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5412


commit 05f0ff5e353117894af4ba7dc096c3256d80450b
Author: zentol 
Date:   2018-02-05T12:15:29Z

[FLINK-8559][RocksDB] Release resources if snapshot operation fails




---


  1   2   >