[jira] [Commented] (FLINK-8020) Deadlock found in Async I/O operator

2018-02-01 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349926#comment-16349926
 ] 

Stephan Ewen commented on FLINK-8020:
-

I think the original diagnosis is not quite correct - The thread locks 
{{0x0007b692ad98}} (which is the checkpoint lock) and then waits in the 
lock (as in {{Object.wait()}}), which means it waits for a notification. The 
thread is not blocked on the lock (note that Java Thread state WAITING is not 
BLOCKING).

I am curious if this is still a problem? The stack trace actually does not show 
a specific problem - at a first glance, it looks as if Async I/O operations (to 
HBase?) do not complete. Because of that, the pipeline stops and waits for 
those async i/o requests.waits 

> Deadlock found in Async I/O operator
> 
>
> Key: FLINK-8020
> URL: https://issues.apache.org/jira/browse/FLINK-8020
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming, Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode
>Reporter: Weihua Jiang
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jstack53009(2).out, jstack67976-2.log
>
>
> Our streaming job run into trouble in these days after a long time smooth 
> running. One issue we found is 
> [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this 
> one.
> After analyzing the jstack, we believe  we found a DEAD LOCK in flink:
> 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" 
> hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940.
> 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: 
> hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock 
> 0x0007b6aa1788. 
> This DEADLOCK made the job fail to proceed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5400: [FLINK-8547][network] Implement CheckpointBarrierH...

2018-02-01 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/5400

[FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data 
for exactly-once

## What is the purpose of the change

*Currently in exactly-once mode, the BarrierBuffer would block inputs with 
barriers until all inputs have received the barrier for a given checkpoint. To 
avoid back-pressuring the input streams which may cause distributed deadlocks, 
the BarrierBuffer has to spill the data in disk files to recycle the buffers 
for blocked channels.*

*Based on credit-based flow control, every channel has exclusive buffers, 
so it is no need to spill data for avoiding deadlock. Then we implement a new 
CheckpointBarrierHandler for only buffering the data for blocked channels for 
better performance.*

*And this new CheckpointBarrierHandler can also be configured to use or not 
in order to rollback the original mode for unexpected risks.*

## Brief change log

  - *Implement the new `CreditBasedBarrierBuffer` and 
`CreditBasedBufferBlocker` for buffering data in blocked channels in 
exactly-once mode.*
  - *Define the parameter `taskmanager.exactly-once.blocking.data.enabled` 
for enabling the new handler or not.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added tests for the logic of `CreditBasedBarrierBuffer`*
  - *Added tests for the logic of `CreditBasedBufferBlocker`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-8547

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5400.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5400


commit 4d08e5d58c732e8f835016b48edc4494f8cb26fe
Author: Zhijiang 
Date:   2018-02-02T07:45:49Z

[FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data 
for exactly-once




---


[jira] [Commented] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349921#comment-16349921
 ] 

ASF GitHub Bot commented on FLINK-8547:
---

GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/5400

[FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data 
for exactly-once

## What is the purpose of the change

*Currently in exactly-once mode, the BarrierBuffer would block inputs with 
barriers until all inputs have received the barrier for a given checkpoint. To 
avoid back-pressuring the input streams which may cause distributed deadlocks, 
the BarrierBuffer has to spill the data in disk files to recycle the buffers 
for blocked channels.*

*Based on credit-based flow control, every channel has exclusive buffers, 
so it is no need to spill data for avoiding deadlock. Then we implement a new 
CheckpointBarrierHandler for only buffering the data for blocked channels for 
better performance.*

*And this new CheckpointBarrierHandler can also be configured to use or not 
in order to rollback the original mode for unexpected risks.*

## Brief change log

  - *Implement the new `CreditBasedBarrierBuffer` and 
`CreditBasedBufferBlocker` for buffering data in blocked channels in 
exactly-once mode.*
  - *Define the parameter `taskmanager.exactly-once.blocking.data.enabled` 
for enabling the new handler or not.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added tests for the logic of `CreditBasedBarrierBuffer`*
  - *Added tests for the logic of `CreditBasedBufferBlocker`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-8547

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5400.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5400


commit 4d08e5d58c732e8f835016b48edc4494f8cb26fe
Author: Zhijiang 
Date:   2018-02-02T07:45:49Z

[FLINK-8547][network] Implement CheckpointBarrierHandler not to spill data 
for exactly-once




> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-01 Thread zhijiang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhijiang updated FLINK-8547:

Description: 
Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
barriers until all inputs have received the barrier for a given checkpoint. To 
avoid back-pressuring the input streams which may cause distributed deadlocks, 
the {{BarrierBuffer}} has to spill the data in disk files to recycle the 
buffers for blocked channels.

 

Based on credit-based flow control, every channel has exclusive buffers, so it 
is no need to spill data for avoiding deadlock. Then we implement a new 
{{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
for better performance.

 

And this new {{CheckpointBarrierHandler}} can also be configured to use or not 
in order to rollback the original mode for unexpected risks.

  was:
Currently in exactly-once mode, the `BarrierBuffer` would block inputs with 
barriers until all inputs have received the barrier for a given checkpoint. To 
avoid back-pressuring the input streams which may cause distributed deadlocks, 
the `BarrierBuffer` has to spill the data in disk files to recycle the buffers 
for blocked channels.

 

Based on credit-based flow control, every channel has exclusive buffers, so it 
is no need to spill data for avoiding deadlock. Then we implement a new 
'CheckpointBarrierHandler` for only buffering the data for blocked channels for 
better performance.

 

And this new `CheckpointBarrierHandler` can also be configured to use or not in 
order to rollback the original mode for unexpected risks.


> Implement CheckpointBarrierHandler not to spill data for exactly-once based 
> on credit-based flow control
> 
>
> Key: FLINK-8547
> URL: https://issues.apache.org/jira/browse/FLINK-8547
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.5.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Major
>
> Currently in exactly-once mode, the {{BarrierBuffer}} would block inputs with 
> barriers until all inputs have received the barrier for a given checkpoint. 
> To avoid back-pressuring the input streams which may cause distributed 
> deadlocks, the {{BarrierBuffer}} has to spill the data in disk files to 
> recycle the buffers for blocked channels.
>  
> Based on credit-based flow control, every channel has exclusive buffers, so 
> it is no need to spill data for avoiding deadlock. Then we implement a new 
> {{CheckpointBarrierHandler}} for only buffering the data for blocked channels 
> for better performance.
>  
> And this new {{CheckpointBarrierHandler}} can also be configured to use or 
> not in order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8547) Implement CheckpointBarrierHandler not to spill data for exactly-once based on credit-based flow control

2018-02-01 Thread zhijiang (JIRA)
zhijiang created FLINK-8547:
---

 Summary: Implement CheckpointBarrierHandler not to spill data for 
exactly-once based on credit-based flow control
 Key: FLINK-8547
 URL: https://issues.apache.org/jira/browse/FLINK-8547
 Project: Flink
  Issue Type: Sub-task
  Components: Network
Affects Versions: 1.5.0
Reporter: zhijiang
Assignee: zhijiang


Currently in exactly-once mode, the `BarrierBuffer` would block inputs with 
barriers until all inputs have received the barrier for a given checkpoint. To 
avoid back-pressuring the input streams which may cause distributed deadlocks, 
the `BarrierBuffer` has to spill the data in disk files to recycle the buffers 
for blocked channels.

 

Based on credit-based flow control, every channel has exclusive buffers, so it 
is no need to spill data for avoiding deadlock. Then we implement a new 
'CheckpointBarrierHandler` for only buffering the data for blocked channels for 
better performance.

 

And this new `CheckpointBarrierHandler` can also be configured to use or not in 
order to rollback the original mode for unexpected risks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8020) Deadlock found in Async I/O operator

2018-02-01 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349913#comment-16349913
 ] 

Stephan Ewen commented on FLINK-8020:
-

Updated the title to better describe the root problem.

> Deadlock found in Async I/O operator
> 
>
> Key: FLINK-8020
> URL: https://issues.apache.org/jira/browse/FLINK-8020
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming, Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode
>Reporter: Weihua Jiang
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jstack53009(2).out, jstack67976-2.log
>
>
> Our streaming job run into trouble in these days after a long time smooth 
> running. One issue we found is 
> [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this 
> one.
> After analyzing the jstack, we believe  we found a DEAD LOCK in flink:
> 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" 
> hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940.
> 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: 
> hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock 
> 0x0007b6aa1788. 
> This DEADLOCK made the job fail to proceed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8020) Deadlock found in Async I/O operator

2018-02-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-8020:

Summary: Deadlock found in Async I/O operator  (was: Deadlock found in 
Flink Streaming job)

> Deadlock found in Async I/O operator
> 
>
> Key: FLINK-8020
> URL: https://issues.apache.org/jira/browse/FLINK-8020
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Streaming, Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Kafka 0.8.2 and Flink 1.3.2 on YARN mode
>Reporter: Weihua Jiang
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jstack53009(2).out, jstack67976-2.log
>
>
> Our streaming job run into trouble in these days after a long time smooth 
> running. One issue we found is 
> [https://issues.apache.org/jira/browse/FLINK-8019] and another one is this 
> one.
> After analyzing the jstack, we believe  we found a DEAD LOCK in flink:
> 1. The thread "cache-process0 -> async-operator0 -> Sink: hbase-sink0 (8/8)" 
> hold lock 0x0007b6aa1788 and is waiting for lock 0x0007b6aa1940.
> 2. The thread "Time Trigger for cache-process0 -> async-operator0 -> Sink: 
> hbase-sink0 (8/8)" hold lock 0x0007b6aa1940 and is waiting for lock 
> 0x0007b6aa1788. 
> This DEADLOCK made the job fail to proceed. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6763) Inefficient PojoSerializerConfigSnapshot serialization format

2018-02-01 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349909#comment-16349909
 ] 

Stephan Ewen commented on FLINK-6763:
-

As a related comment - I think the whole snapshot procedure can be optimized a 
bit. We can create the serializer snapshot one and then just keep the bytes and 
add those to every checkpoint. In smaller state programs, the majority of 
checkpoint time can be spent on serializer snapshots (still only, milliseconds, 
but optimization potential non the less)

> Inefficient PojoSerializerConfigSnapshot serialization format
> -
>
> Key: FLINK-6763
> URL: https://issues.apache.org/jira/browse/FLINK-6763
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Type Serialization System
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Till Rohrmann
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The {{PojoSerializerConfigSnapshot}} stores for each serializer the beginning 
> offset and ending offset in the serialization stream. This information is 
> also written if the serializer serialization is supposed to be ignored. The 
> beginning and ending offsets are stored as a sequence of integers at the 
> beginning of the serialization stream. We store this information to skip 
> broken serializers.
> I think we don't need both offsets. Instead I would suggest to write the 
> length of the serialized serializer first into the serialization stream and 
> then the serialized serializer. This can be done in 
> {{TypeSerializerSerializationUtil.writeSerializer}}. When reading the 
> serializer via {{TypeSerializerSerializationUtil.tryReadSerializer}}, we can 
> try to deserialize the serializer. If this operation fails, then we can skip 
> the number of serialized serializer because we know how long it was.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349905#comment-16349905
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5362#discussion_r165573730
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.InputStreamViewWrapper;
+
+import java.io.IOException;
+import java.io.PushbackInputStream;
+import java.util.Arrays;
+
+/**
+ * A {@link VersionedIOReadableWritable} which allows to differentiate 
whether the previous
+ * data was versioned with a {@link VersionedIOReadableWritable}. This can 
be used if previously
+ * written data was not versioned, and is to be migrated to a versioned 
format.
+ */
+@Internal
+public abstract class PostVersionedIOReadableWritable extends 
VersionedIOReadableWritable {
+
+   /** NOTE: CANNOT CHANGE! */
+   private static final byte[] VERSIONED_IDENTIFIER = new byte[] {-15, 
-51, -123, -97};
+
+   /**
+* Read from the provided {@link DataInputView in}. A flag {@code 
wasVersioned} can be
+* used to determine whether or not the data to read was previously 
written
+* by a {@link VersionedIOReadableWritable}.
+*/
+   protected abstract void read(DataInputView in, boolean wasVersioned) 
throws IOException;
+
+   @Override
+   public void write(DataOutputView out) throws IOException {
+   out.write(VERSIONED_IDENTIFIER);
+   super.write(out);
+   }
+
+   /**
+* This read attempts to first identify if the input view contains the 
special
+* {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few 
bytes.
+* If identified to be versioned, the usual version resolution read path
+* in {@link VersionedIOReadableWritable#read(DataInputView)} is 
invoked.
+* Otherwise, we "reset" the input view by pushing back the read 
buffered bytes
+* into the stream.
+*/
+   @Override
+   public final void read(DataInputView in) throws IOException {
+   PushbackInputStream stream = new PushbackInputStream(new 
InputStreamViewWrapper(in), VERSIONED_IDENTIFIER.length);
+
+   byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];
+   stream.read(tmp);
+
+   if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) {
+   super.read(in);
+   read(in, true);
+   } else {
+   stream.unread(tmp);
+   read(new DataInputViewStreamWrapper(stream), false);
+   }
--- End diff --

Not-so-nice things about this current implementation is:
1) it requires several layers of transforming back and forth between a 
`DataInputView` and `InputStream`, and 
2) it uses a separate `read(DataInputView, boolean)` method in order to 
wrap a "reset" `DataInputView` for the remaining reads.

I think the implementation would have been much more elegant if 
`DataInputView` has an `unread(byte[])` method, though I'm not sure how 
non-trivial it is to support this across all subclasses.
Maybe a food for thought for the future ..


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: 

[GitHub] flink pull request #5362: [FLINK-8421] [DataStream] Make timer serializers r...

2018-02-01 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5362#discussion_r165573730
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/core/io/PostVersionedIOReadableWritable.java
 ---
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.InputStreamViewWrapper;
+
+import java.io.IOException;
+import java.io.PushbackInputStream;
+import java.util.Arrays;
+
+/**
+ * A {@link VersionedIOReadableWritable} which allows to differentiate 
whether the previous
+ * data was versioned with a {@link VersionedIOReadableWritable}. This can 
be used if previously
+ * written data was not versioned, and is to be migrated to a versioned 
format.
+ */
+@Internal
+public abstract class PostVersionedIOReadableWritable extends 
VersionedIOReadableWritable {
+
+   /** NOTE: CANNOT CHANGE! */
+   private static final byte[] VERSIONED_IDENTIFIER = new byte[] {-15, 
-51, -123, -97};
+
+   /**
+* Read from the provided {@link DataInputView in}. A flag {@code 
wasVersioned} can be
+* used to determine whether or not the data to read was previously 
written
+* by a {@link VersionedIOReadableWritable}.
+*/
+   protected abstract void read(DataInputView in, boolean wasVersioned) 
throws IOException;
+
+   @Override
+   public void write(DataOutputView out) throws IOException {
+   out.write(VERSIONED_IDENTIFIER);
+   super.write(out);
+   }
+
+   /**
+* This read attempts to first identify if the input view contains the 
special
+* {@link #VERSIONED_IDENTIFIER} by reading and buffering the first few 
bytes.
+* If identified to be versioned, the usual version resolution read path
+* in {@link VersionedIOReadableWritable#read(DataInputView)} is 
invoked.
+* Otherwise, we "reset" the input view by pushing back the read 
buffered bytes
+* into the stream.
+*/
+   @Override
+   public final void read(DataInputView in) throws IOException {
+   PushbackInputStream stream = new PushbackInputStream(new 
InputStreamViewWrapper(in), VERSIONED_IDENTIFIER.length);
+
+   byte[] tmp = new byte[VERSIONED_IDENTIFIER.length];
+   stream.read(tmp);
+
+   if (Arrays.equals(tmp, VERSIONED_IDENTIFIER)) {
+   super.read(in);
+   read(in, true);
+   } else {
+   stream.unread(tmp);
+   read(new DataInputViewStreamWrapper(stream), false);
+   }
--- End diff --

Not-so-nice things about this current implementation is:
1) it requires several layers of transforming back and forth between a 
`DataInputView` and `InputStream`, and 
2) it uses a separate `read(DataInputView, boolean)` method in order to 
wrap a "reset" `DataInputView` for the remaining reads.

I think the implementation would have been much more elegant if 
`DataInputView` has an `unread(byte[])` method, though I'm not sure how 
non-trivial it is to support this across all subclasses.
Maybe a food for thought for the future ..


---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349869#comment-16349869
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai the PR is ready for review now 


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-01 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai the PR is ready for review now 


---


[jira] [Commented] (FLINK-6206) Log task state transitions as warn/error for FAILURE scenarios

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349853#comment-16349853
 ] 

ASF GitHub Bot commented on FLINK-6206:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5399
  
Could you modify the title to `[FLINK-6206] [runtime] Use LOG.error() when 
logging failure state changes`? THere's actually a JIRA ticket that covers this 
change.


> Log task state transitions as warn/error for FAILURE scenarios
> --
>
> Key: FLINK-6206
> URL: https://issues.apache.org/jira/browse/FLINK-6206
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Dan Bress
>Priority: Critical
>
> If a task fails due to an exception, I would like that to be logged at a warn 
> or an error level.  currently its info
> {code}
> private boolean transitionState(ExecutionState currentState, ExecutionState 
> newState, Throwable cause) {
>   if (STATE_UPDATER.compareAndSet(this, currentState, newState)) {
>   if (cause == null) {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState);
>   } else {
>   LOG.info("{} ({}) switched from {} to {}.", 
> taskNameWithSubtask, executionId, currentState, newState, cause);
>   }
>   return true;
>   } else {
>   return false;
>   }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5399: [hotfix] Use LOG.error() when logging failure state chang...

2018-02-01 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5399
  
Could you modify the title to `[FLINK-6206] [runtime] Use LOG.error() when 
logging failure state changes`? THere's actually a JIRA ticket that covers this 
change.


---


[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349806#comment-16349806
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165560261
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class CalcTest  extends TableTestBase {
+
+  @Test
+  def testArrayElement(): Unit = {
+val util = streamTestUtil()
+util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b)
+
+val expected = unaryNode(
+  "DataStreamCalc",
+  streamTableNode(0),
+  term("select",
+"a",
+"DOT(ITEM(b, 1), '_1') AS b11"
--- End diff --

Will fix it.


> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-02-01 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165560261
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CalcTest.scala
 ---
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class CalcTest  extends TableTestBase {
+
+  @Test
+  def testArrayElement(): Unit = {
+val util = streamTestUtil()
+util.addTable[(Long, Array[(String, Int)])]("MyTable", 'a, 'b)
+
+val expected = unaryNode(
+  "DataStreamCalc",
+  streamTableNode(0),
+  term("select",
+"a",
+"DOT(ITEM(b, 1), '_1') AS b11"
--- End diff --

Will fix it.


---


[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349805#comment-16349805
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165560235
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
--- End diff --

But I agree I should refactor the code in visitFieldAccess() to reuse it as 
much as possible.


> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-02-01 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165560235
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
--- End diff --

But I agree I should refactor the code in visitFieldAccess() to reuse it as 
much as possible.


---


[GitHub] flink pull request #5399: [hotfix] Use LOG.error() when logging failure stat...

2018-02-01 Thread casidiablo
GitHub user casidiablo opened a pull request:

https://github.com/apache/flink/pull/5399

[hotfix] Use LOG.error() when logging failure state changes

It's very inconvenient to have these logged with `INFO`. It makes it hard 
to detect errors when inspecting logs with, say, Kibana.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/casidiablo/flink use-error-log

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5399.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5399


commit 62160ed6e3ab9c0336bdb8869b59d38a2aab30ee
Author: Cristian 
Date:   2018-02-02T03:48:45Z

Use LOG.error() when logging failure state changes




---


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-01 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349724#comment-16349724
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/2/18 3:30 AM:


Hi, [~fhueske]  Great to hear your suggestions. I agree with you, so let's 
start with the {{DataStream}} to {{Table}} conversion first. 

I like your proposal especially for the upsert check. Besides, we may also need 
to support ingesting with delete messages.  Something looks like:
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all 
deletes are on condition of deleting an existing record. I don't have a 
complete solution yet. Maybe we need some refactors to adapt this. I will think 
more about it. Thanks.

 


was (Author: hequn8128):
Hi, [~fhueske]  Great to hear your suggestions. I agree with you, so let's 
start with the {{DataStream}} to {{Table}} conversion first. 

I like your proposal especially for the upsert check. Besides, we may also need 
to support ingesting with delete messages.  Something looks like:
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all 
deletes are on condition of deleting an existing record. I don't have a 
complete solution yet. Maybe we may need some refactors to adapt this. I will 
think more about it. Thanks.

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8545) Implement upsert stream table source

2018-02-01 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349724#comment-16349724
 ] 

Hequn Cheng commented on FLINK-8545:


Hi, [~fhueske]  Great to hear your suggestions. I agree with you, so let's 
start with the \{{DataStream}} to \{{Table}} conversion first. 

I like your proposal especially for the upsert check. Besides, we may also need 
to support ingesting with delete messages.  Something looks like:

 
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all 
deletes are on condition of deleting an existing record. I don't have a 
complete solution yet. Maybe we may need some refactors to adapt this. I will 
think more about it. Thanks.

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-8545) Implement upsert stream table source

2018-02-01 Thread Hequn Cheng (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349724#comment-16349724
 ] 

Hequn Cheng edited comment on FLINK-8545 at 2/2/18 3:29 AM:


Hi, [~fhueske]  Great to hear your suggestions. I agree with you, so let's 
start with the {{DataStream}} to {{Table}} conversion first. 

I like your proposal especially for the upsert check. Besides, we may also need 
to support ingesting with delete messages.  Something looks like:
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all 
deletes are on condition of deleting an existing record. I don't have a 
complete solution yet. Maybe we may need some refactors to adapt this. I will 
think more about it. Thanks.

 


was (Author: hequn8128):
Hi, [~fhueske]  Great to hear your suggestions. I agree with you, so let's 
start with the \{{DataStream}} to \{{Table}} conversion first. 

I like your proposal especially for the upsert check. Besides, we may also need 
to support ingesting with delete messages.  Something looks like:

 
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all 
deletes are on condition of deleting an existing record. I don't have a 
complete solution yet. Maybe we may need some refactors to adapt this. I will 
think more about it. Thanks.

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349559#comment-16349559
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165527528
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
--- End diff --

In Calcite, once it sees array element access, the subsequent field access 
is translated into DOT RexCall, not RexFieldAccess. Therefore, we need to a 
custom handling for the DOT RexCall.


> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-02-01 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165527528
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 ---
@@ -984,6 +987,63 @@ object ScalarOperators {
 }
   }
 
+  def generateDot(codeGenerator: CodeGenerator,
--- End diff --

In Calcite, once it sees array element access, the subsequent field access 
is translated into DOT RexCall, not RexFieldAccess. Therefore, we need to a 
custom handling for the DOT RexCall.


---


[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349458#comment-16349458
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5367
  
@hequn8128 , thanks for the review. Where do you think we can add it in 
sql.md?


> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7923) SQL parser exception when accessing subfields of a Composite element in an Object Array type column

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349454#comment-16349454
 ] 

ASF GitHub Bot commented on FLINK-7923:
---

Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165515039
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testArrayElementAtFromTableForTuple(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val data = List(
+  (1, Array((12, 45), (2, 5))),
--- End diff --

Added null check.
for nested tuple input, it wont work for now due to 
https://issues.apache.org/jira/browse/CALCITE-2162. I've submitted a fix to it, 
should be available in Calcite 1.16.


> SQL parser exception when accessing subfields of a Composite element in an 
> Object Array type column
> ---
>
> Key: FLINK-7923
> URL: https://issues.apache.org/jira/browse/FLINK-7923
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: Rong Rong
>Assignee: Shuyi Chen
>Priority: Major
>
> Access type such as:
> {code:SQL}
> SELECT 
>   a[1].f0 
> FROM 
>   MyTable
> {code}
> will cause problem. 
> See following test sample for more details:
> https://github.com/walterddr/flink/commit/03c93bcb0fb30bd2d327e35b5e244322d449b06a



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5367: [FLINK-7923][Table API & SQL] Support field access of com...

2018-02-01 Thread suez1224
Github user suez1224 commented on the issue:

https://github.com/apache/flink/pull/5367
  
@hequn8128 , thanks for the review. Where do you think we can add it in 
sql.md?


---


[GitHub] flink pull request #5367: [FLINK-7923][Table API & SQL] Support field access...

2018-02-01 Thread suez1224
Github user suez1224 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5367#discussion_r165515039
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -469,6 +473,148 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testArrayElementAtFromTableForTuple(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val data = List(
+  (1, Array((12, 45), (2, 5))),
--- End diff --

Added null check.
for nested tuple input, it wont work for now due to 
https://issues.apache.org/jira/browse/CALCITE-2162. I've submitted a fix to it, 
should be available in Calcite 1.16.


---


[jira] [Commented] (FLINK-8472) Extend migration tests for Flink 1.4

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349131#comment-16349131
 ] 

ASF GitHub Bot commented on FLINK-8472:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5364#discussion_r165459894
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
 ---
@@ -100,9 +100,7 @@ public static void main(String[] args) throws Exception 
{
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4);
 
-   if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
--- End diff --

yeah it should be alright to remove that, but let's chain the uid call to 
the operator creation as we do for the others for style points.


> Extend migration tests for Flink 1.4
> 
>
> Key: FLINK-8472
> URL: https://issues.apache.org/jira/browse/FLINK-8472
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The following migration tests should be extended to cover migrating Flink 
> jobs with a 1.4 savepoint.
>  * {{WindowOperatorMigrationTest}}
>  * {{CEPMigrationTest}}
>  * {{StatefulJobSavepointMigrationTestITCase}}
>  * {{FlinkKinesisConsumerMigrationTest}}
>  * {{FlinkKafkaConsumerBaseMigrationTest}}
>  * {{ContinuousFileProcessingMigrationTest}}
>  * {{BucketingSinkMigrationTest}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5364: [FLINK-8472] [test] Extend all migration tests for...

2018-02-01 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5364#discussion_r165459894
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
 ---
@@ -100,9 +100,7 @@ public static void main(String[] args) throws Exception 
{
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4);
 
-   if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
--- End diff --

yeah it should be alright to remove that, but let's chain the uid call to 
the operator creation as we do for the others for style points.


---


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16349100#comment-16349100
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

GitHub user tweise reopened a pull request:

https://github.com/apache/flink/pull/5393

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer

## What is the purpose of the change

Allow the user to customize Kinesis shard to subtask assignment in the 
Kinesis consumer.

## Brief change log

Added pluggable shard assigner.

## Verifying this change

Added unit test. 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

Javadoc

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-8516.shardHashing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5393.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5393


commit ad4dbe6fb5bf2af52726c54b6361089ef3f4e369
Author: Thomas Weise 
Date:   2018-01-31T01:44:44Z

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer




> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2, 1.5.0
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-01 Thread tweise
GitHub user tweise reopened a pull request:

https://github.com/apache/flink/pull/5393

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer

## What is the purpose of the change

Allow the user to customize Kinesis shard to subtask assignment in the 
Kinesis consumer.

## Brief change log

Added pluggable shard assigner.

## Verifying this change

Added unit test. 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

Javadoc

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tweise/flink FLINK-8516.shardHashing

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5393.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5393


commit ad4dbe6fb5bf2af52726c54b6361089ef3f4e369
Author: Thomas Weise 
Date:   2018-01-31T01:44:44Z

[FLINK-8516] Allow for custom hash function for shard to subtask mapping in 
Kinesis consumer




---


[jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348936#comment-16348936
 ] 

ASF GitHub Bot commented on FLINK-8421:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5362
  
@aljoscha As discussed offline, I've:
- replaced `ByteArrayPrependedInputStream` with Java's `PushbackInputStream`
- use negative values in `VERSIONED_IDENTIFIER` to be extra safe


> HeapInternalTimerService should reconfigure compatible key / namespace 
> serializers on restore
> -
>
> Key: FLINK-8421
> URL: https://issues.apache.org/jira/browse/FLINK-8421
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on 
> restored / newly provided serializers for compatibility checks. This should 
> be replaced with the {{TypeSerializer::ensureCompatibility}} checks instead, 
> so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and 
> namespace serializer in the {{HeapInternalTimerService}} also needs to be 
> written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the 
> {{KryoSerializer}} has different default base registrations than before due 
> to FLINK-7420. i.e if the key of a window is serialized using the 
> {{KryoSerializer}} in 1.3.x, the restore would never succeed in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the 
> {{HeapInternalTimerService}} restore will make use of serializer 
> reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from 
> **raw** state. Apparently, the serializer compatibility checks were only 
> implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test 
> job that uses a key type which required the {{KryoSerializer}}, and uses 
> windows, would have caught this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5362: [FLINK-8421] [DataStream] Make timer serializers reconfig...

2018-02-01 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5362
  
@aljoscha As discussed offline, I've:
- replaced `ByteArrayPrependedInputStream` with Java's `PushbackInputStream`
- use negative values in `VERSIONED_IDENTIFIER` to be extra safe


---


[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348895#comment-16348895
 ] 

ASF GitHub Bot commented on FLINK-8308:
---

Github user StevenLangbroek commented on the issue:

https://github.com/apache/flink/pull/5395
  
@alpinegizmo hawkins is back.


> Update yajl-ruby dependency to 1.3.1 or higher
> --
>
> Key: FLINK-8308
> URL: https://issues.apache.org/jira/browse/FLINK-8308
> Project: Flink
>  Issue Type: Task
>  Components: Project Website
>Reporter: Fabian Hueske
>Assignee: Steven Langbroek
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> We got notified that yajl-ruby < 1.3.1, a dependency which is used to build 
> the Flink website, has a  security vulnerability of high severity.
> We should update yajl-ruby to 1.3.1 or higher.
> Since the website is built offline and served as static HTML, I don't think 
> this is a super critical issue (please correct me if I'm wrong), but we 
> should resolve this soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-01 Thread StevenLangbroek
Github user StevenLangbroek commented on the issue:

https://github.com/apache/flink/pull/5395
  
@alpinegizmo hawkins is back.


---


[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348869#comment-16348869
 ] 

ASF GitHub Bot commented on FLINK-8308:
---

Github user StevenLangbroek commented on the issue:

https://github.com/apache/flink/pull/5395
  
@alpinegizmo if we can't upgrade Jekyll we can't do what the ticket set out 
to do: get rid of yajl-ruby.


> Update yajl-ruby dependency to 1.3.1 or higher
> --
>
> Key: FLINK-8308
> URL: https://issues.apache.org/jira/browse/FLINK-8308
> Project: Flink
>  Issue Type: Task
>  Components: Project Website
>Reporter: Fabian Hueske
>Assignee: Steven Langbroek
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> We got notified that yajl-ruby < 1.3.1, a dependency which is used to build 
> the Flink website, has a  security vulnerability of high severity.
> We should update yajl-ruby to 1.3.1 or higher.
> Since the website is built offline and served as static HTML, I don't think 
> this is a super critical issue (please correct me if I'm wrong), but we 
> should resolve this soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-01 Thread StevenLangbroek
Github user StevenLangbroek commented on the issue:

https://github.com/apache/flink/pull/5395
  
@alpinegizmo if we can't upgrade Jekyll we can't do what the ticket set out 
to do: get rid of yajl-ruby.


---


[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348859#comment-16348859
 ] 

ASF GitHub Bot commented on FLINK-8308:
---

Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5395
  
These lines need to be restored to the Gemfile. The hawkins plugin is 
needed for the incremental build  and live reload feature

group :jekyll_plugins do
  gem 'hawkins'
end

The bundled version of jekyll (3.7.2) requires ruby >= 2.1, but our build 
machines use ruby 2.0. If we can't get the apache INFRA team to upgrade ruby, 
we'll have to rework this.

The Gemfile.lock doesn't work with ruby 2.1 -- I get this error

ruby_dep-1.5.0 requires ruby version >= 2.2.5, which is incompatible 
with the current version, ruby 2.1.10p492

but re-bundling fixes this. I think we should commit a Gemfile.lock file 
that is compatible back to Ruby 2.1, if we determine we can get ruby 2.1 -- 
otherwise we'll have to roll back jekyll and then re-bundle with ruby 2.0.

This PR works fine on ruby 2.3 and 2.4. The latest stable release of rvm 
doesn't yet support ruby 2.5, so I didn't test it.




> Update yajl-ruby dependency to 1.3.1 or higher
> --
>
> Key: FLINK-8308
> URL: https://issues.apache.org/jira/browse/FLINK-8308
> Project: Flink
>  Issue Type: Task
>  Components: Project Website
>Reporter: Fabian Hueske
>Assignee: Steven Langbroek
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> We got notified that yajl-ruby < 1.3.1, a dependency which is used to build 
> the Flink website, has a  security vulnerability of high severity.
> We should update yajl-ruby to 1.3.1 or higher.
> Since the website is built offline and served as static HTML, I don't think 
> this is a super critical issue (please correct me if I'm wrong), but we 
> should resolve this soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-01 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5395
  
These lines need to be restored to the Gemfile. The hawkins plugin is 
needed for the incremental build  and live reload feature

group :jekyll_plugins do
  gem 'hawkins'
end

The bundled version of jekyll (3.7.2) requires ruby >= 2.1, but our build 
machines use ruby 2.0. If we can't get the apache INFRA team to upgrade ruby, 
we'll have to rework this.

The Gemfile.lock doesn't work with ruby 2.1 -- I get this error

ruby_dep-1.5.0 requires ruby version >= 2.2.5, which is incompatible 
with the current version, ruby 2.1.10p492

but re-bundling fixes this. I think we should commit a Gemfile.lock file 
that is compatible back to Ruby 2.1, if we determine we can get ruby 2.1 -- 
otherwise we'll have to roll back jekyll and then re-bundle with ruby 2.0.

This PR works fine on ruby 2.3 and 2.4. The latest stable release of rvm 
doesn't yet support ruby 2.5, so I didn't test it.




---


[jira] [Closed] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

2018-02-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-5820.
---

> Extend State Backend Abstraction to support Global Cleanup Hooks
> 
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (transitively for RocksDB as well), this means a re-structuring of 
> the storage directories as follows:
> {code}
> ..//job1-id/
>   /shared/<-- shared checkpoint data
>   /chk-1/...  <-- data exclusive to checkpoint 1
>   /chk-2/...  <-- data exclusive to checkpoint 2
>   /chk-3/...  <-- data exclusive to checkpoint 3
> ..//job2-id/
>   /shared/...
>   /chk-1/...
>   /chk-2/...
>   /chk-3/...
> ..//savepoint-1/savepoint-root
>  /file-1-uid
>  /file-2-uid
>  /file-3-uid
>  /savepoint-2/savepoint-root
>  /file-1-uid
>  /file-2-uid
>  /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

2018-02-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-5820.
-
Resolution: Fixed

Fixed by resolution of all subtasks

> Extend State Backend Abstraction to support Global Cleanup Hooks
> 
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (transitively for RocksDB as well), this means a re-structuring of 
> the storage directories as follows:
> {code}
> ..//job1-id/
>   /shared/<-- shared checkpoint data
>   /chk-1/...  <-- data exclusive to checkpoint 1
>   /chk-2/...  <-- data exclusive to checkpoint 2
>   /chk-3/...  <-- data exclusive to checkpoint 3
> ..//job2-id/
>   /shared/...
>   /chk-1/...
>   /chk-2/...
>   /chk-3/...
> ..//savepoint-1/savepoint-root
>  /file-1-uid
>  /file-2-uid
>  /file-3-uid
>  /savepoint-2/savepoint-root
>  /file-1-uid
>  /file-2-uid
>  /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8472) Extend migration tests for Flink 1.4

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348831#comment-16348831
 ] 

ASF GitHub Bot commented on FLINK-8472:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5364
  
Looks good but let's wait what @zentol has to say.


> Extend migration tests for Flink 1.4
> 
>
> Key: FLINK-8472
> URL: https://issues.apache.org/jira/browse/FLINK-8472
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The following migration tests should be extended to cover migrating Flink 
> jobs with a 1.4 savepoint.
>  * {{WindowOperatorMigrationTest}}
>  * {{CEPMigrationTest}}
>  * {{StatefulJobSavepointMigrationTestITCase}}
>  * {{FlinkKinesisConsumerMigrationTest}}
>  * {{FlinkKafkaConsumerBaseMigrationTest}}
>  * {{ContinuousFileProcessingMigrationTest}}
>  * {{BucketingSinkMigrationTest}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5364: [FLINK-8472] [test] Extend all migration tests for Flink ...

2018-02-01 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5364
  
Looks good but let's wait what @zentol has to say.


---


[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348805#comment-16348805
 ] 

ASF GitHub Bot commented on FLINK-5820:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5396
  
Merged in 31e97e57ceeaf37264ab6db078552b73ee5121bf


> Extend State Backend Abstraction to support Global Cleanup Hooks
> 
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (transitively for RocksDB as well), this means a re-structuring of 
> the storage directories as follows:
> {code}
> ..//job1-id/
>   /shared/<-- shared checkpoint data
>   /chk-1/...  <-- data exclusive to checkpoint 1
>   /chk-2/...  <-- data exclusive to checkpoint 2
>   /chk-3/...  <-- data exclusive to checkpoint 3
> ..//job2-id/
>   /shared/...
>   /chk-1/...
>   /chk-2/...
>   /chk-3/...
> ..//savepoint-1/savepoint-root
>  /file-1-uid
>  /file-2-uid
>  /file-3-uid
>  /savepoint-2/savepoint-root
>  /file-1-uid
>  /file-2-uid
>  /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5820) Extend State Backend Abstraction to support Global Cleanup Hooks

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348806#comment-16348806
 ] 

ASF GitHub Bot commented on FLINK-5820:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5396


> Extend State Backend Abstraction to support Global Cleanup Hooks
> 
>
> Key: FLINK-5820
> URL: https://issues.apache.org/jira/browse/FLINK-5820
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.2.0
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The current state backend abstraction has the limitation that each piece of 
> state is only meaningful in the context of its state handle. There is no 
> possibility of a view onto "all state associated with checkpoint X".
> That causes several issues
>   - State might not be cleaned up in the process of failures. When a 
> TaskManager hands over a state handle to the JobManager and either of them 
> has a failure, the state handle may be lost and state lingers.
>   - State might also linger if a cleanup operation failed temporarily, and 
> the checkpoint metadata was already disposed
>   - State cleanup is more expensive than necessary in many cases. Each state 
> handle is individually released. For large jobs, this means 1000s of release 
> operations (typically file deletes) per checkpoint, which can be expensive on 
> some file systems.
>   - It is hard to guarantee cleanup of parent directories with the current 
> architecture.
> The core changes proposed here are:
>   1. Each job has one core {{StateBackend}}. In the future, operators may 
> have different {{KeyedStateBackends}} and {{OperatorStateBackends}} to mix 
> and match for example RocksDB storabe and in-memory storage.
>   2. The JobManager needs to be aware of the {{StateBackend}}.
>   3. Storing checkpoint metadata becomes responsibility of the state backend, 
> not the "completed checkpoint store". The later only stores the pointers to 
> the available latest checkpoints (either in process or in ZooKeeper).
>   4. The StateBackend may optionally have a hook to drop all checkpointed 
> state that belongs to only one specific checkpoint (shared state comes as 
> part of incremental checkpointing).
>   5.  The StateBackend needs to have a hook to drop all checkpointed state up 
> to a specific checkpoint (for all previously discarded checkpoints).
>   6. In the future, this must support periodic cleanup hooks that track 
> orphaned shared state from incremental checkpoints.
> For the {{FsStateBackend}}, which stores most of the checkpointes state 
> currently (transitively for RocksDB as well), this means a re-structuring of 
> the storage directories as follows:
> {code}
> ..//job1-id/
>   /shared/<-- shared checkpoint data
>   /chk-1/...  <-- data exclusive to checkpoint 1
>   /chk-2/...  <-- data exclusive to checkpoint 2
>   /chk-3/...  <-- data exclusive to checkpoint 3
> ..//job2-id/
>   /shared/...
>   /chk-1/...
>   /chk-2/...
>   /chk-3/...
> ..//savepoint-1/savepoint-root
>  /file-1-uid
>  /file-2-uid
>  /file-3-uid
>  /savepoint-2/savepoint-root
>  /file-1-uid
>  /file-2-uid
>  /file-3-uid
> {code}
> This is the umbrella issue for the individual steps needed to address this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5396: [FLINK-5820] [state backends] Split shared/exclusi...

2018-02-01 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5396


---


[GitHub] flink issue #5396: [FLINK-5820] [state backends] Split shared/exclusive stat...

2018-02-01 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5396
  
Merged in 31e97e57ceeaf37264ab6db078552b73ee5121bf


---


[jira] [Resolved] (FLINK-8539) Introduce "CompletedCheckpointStorageLocation" to explicitly handle disposal of checkpoint storage locations

2018-02-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8539.
-
Resolution: Fixed

Fixed as of d85a62db13a1d90159e7e8f924c8006febad552b

  - f9dd19b584fca5932594392b0afc9f1d0eec7f1a
  - f577d2603fb0432fa7a15d54ee25f481bde95d95
  - d85a62db13a1d90159e7e8f924c8006febad552b

> Introduce "CompletedCheckpointStorageLocation" to explicitly handle disposal 
> of checkpoint storage locations
> 
>
> Key: FLINK-8539
> URL: https://issues.apache.org/jira/browse/FLINK-8539
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> The storage location of completed checkpoints misses a proper representation. 
> Because of that, there is no place that can handle the deletion of a 
> checkpoint directory, or the dropping of a checkpoint specific table.
> Current workaround for file systems is, for example, that every file disposal 
> checks if the parent directory is now empty, and deletes it if that is the 
> case. That is not only inefficient, but prohibitively expensive on some 
> systems, like Amazon S3.
> Properly representing the storage location for completed checkpoints allows 
> us to add a disposal call for that location.
> That {{CompletedCheckpointStorageLocation}} can also be used to capture 
> "external pointers", metadata, and even allow us to use custom serialization 
> and deserialization of the metadata in the future, making the abstraction 
> more extensible by allowing users to introduce new types of state handles.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8539) Introduce "CompletedCheckpointStorageLocation" to explicitly handle disposal of checkpoint storage locations

2018-02-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8539.
---

> Introduce "CompletedCheckpointStorageLocation" to explicitly handle disposal 
> of checkpoint storage locations
> 
>
> Key: FLINK-8539
> URL: https://issues.apache.org/jira/browse/FLINK-8539
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> The storage location of completed checkpoints misses a proper representation. 
> Because of that, there is no place that can handle the deletion of a 
> checkpoint directory, or the dropping of a checkpoint specific table.
> Current workaround for file systems is, for example, that every file disposal 
> checks if the parent directory is now empty, and deletes it if that is the 
> case. That is not only inefficient, but prohibitively expensive on some 
> systems, like Amazon S3.
> Properly representing the storage location for completed checkpoints allows 
> us to add a disposal call for that location.
> That {{CompletedCheckpointStorageLocation}} can also be used to capture 
> "external pointers", metadata, and even allow us to use custom serialization 
> and deserialization of the metadata in the future, making the abstraction 
> more extensible by allowing users to introduce new types of state handles.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8540) FileStateHandles must not attempt to delete their parent directory.

2018-02-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8540.
---

> FileStateHandles must not attempt to delete their parent directory.
> ---
>
> Key: FLINK-8540
> URL: https://issues.apache.org/jira/browse/FLINK-8540
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, every file disposal checks if the parent directory is now empty, 
> and deletes it if that is the case. That is not only inefficient, but 
> prohibitively expensive on some systems, like Amazon S3.
> With the resolution of [FLINK-8539], this will no longer be necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8540) FileStateHandles must not attempt to delete their parent directory.

2018-02-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8540.
-
Resolution: Fixed

Fixed via 31e97e57ceeaf37264ab6db078552b73ee5121bf

> FileStateHandles must not attempt to delete their parent directory.
> ---
>
> Key: FLINK-8540
> URL: https://issues.apache.org/jira/browse/FLINK-8540
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, every file disposal checks if the parent directory is now empty, 
> and deletes it if that is the case. That is not only inefficient, but 
> prohibitively expensive on some systems, like Amazon S3.
> With the resolution of [FLINK-8539], this will no longer be necessary.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348797#comment-16348797
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165399750
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -586,7 +586,7 @@ private StreamGraph 
generateInternal(List transformatio
transform.getOutputType(),
transform.getName());
 
-   if (transform.getStateKeySelector1() != null) {
+   if (transform.getStateKeySelector1() != null || 
transform.getStateKeySelector2() != null) {
--- End diff --

Does this, in combination with the comment below on the `DataStreamTest` 
mean that you want to move the `DataStreamTest` test in a separate `ITCase`?


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165399750
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -586,7 +586,7 @@ private StreamGraph 
generateInternal(List transformatio
transform.getOutputType(),
transform.getName());
 
-   if (transform.getStateKeySelector1() != null) {
+   if (transform.getStateKeySelector1() != null || 
transform.getStateKeySelector2() != null) {
--- End diff --

Does this, in combination with the comment below on the `DataStreamTest` 
mean that you want to move the `DataStreamTest` test in a separate `ITCase`?


---


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348790#comment-16348790
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165397632
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -753,6 +763,182 @@ public void onTimer(
assertTrue(getOperatorForDataStream(processed) instanceof 
ProcessOperator);
}
 
+   @Test
+   public void testConnectWithBroadcastTranslation() throws Exception {
+
+   final Map expected = new HashMap<>();
+   expected.put(0L, "test:0");
+   expected.put(1L, "test:1");
+   expected.put(2L, "test:2");
+   expected.put(3L, "test:3");
+   expected.put(4L, "test:4");
+   expected.put(5L, "test:5");
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   final DataStream srcOne = env.generateSequence(0L, 5L)
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+
+   @Override
+   public long extractTimestamp(Long 
element, long previousElementTimestamp) {
+   return element;
+   }
+   }).keyBy((KeySelector) value -> 
value);
+
+   final DataStream srcTwo = 
env.fromCollection(expected.values())
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+   @Override
+   public long extractTimestamp(String 
element, long previousElementTimestamp) {
+   return 
Long.parseLong(element.split(":")[1]);
+   }
+   });
+
+   final BroadcastStream broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+
+   // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
+   final DataStream output = 
srcOne.connect(broadcast).process(
+   new TestBroadcastProcessFunction(10L, 
expected));
+
+   output.addSink(new DiscardingSink<>());
+   env.execute();
--- End diff --

So far, all tests in this are purely translation tests. I mentioned this in 
another comment, that it would be good to have an ITCase that actually verifies 
that using keyed state works and that the other features work as well in a 
complete program. Have a look at `SideOutputITCase`, for example.  


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348792#comment-16348792
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165395860
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
+import org.apache.flink.api.common.state.ReadWriteBroadcastState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TwoInputStreamOperator} for executing {@link 
KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}.
+ *
+ * @param  The key type of the input keyed stream.
+ * @param  The input type of the keyed (non-broadcast) side.
+ * @param  The input type of the broadcast side.
+ * @param  The output type of the operator.
+ */
+@Internal
+public class CoBroadcastWithKeyedOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator, 
Triggerable {
+
+   private static final long serialVersionUID = 5926499536290284870L;
+
+   private final List broadcastStateDescriptors;
+
+   private transient TimestampedCollector collector;
+
+   private transient Map broadcastStates;
+
+   private transient ReadWriteContextImpl rwContext;
+
+   private transient ReadOnlyContextImpl rContext;
+
+   private transient OnTimerContextImpl onTimerContext;
+
+   public CoBroadcastWithKeyedOperator(
+   final KeyedBroadcastProcessFunction 
function,
+   final List 
broadcastStateDescriptors) {
+   super(function);
+   this.broadcastStateDescriptors = 
Preconditions.checkNotNull(broadcastStateDescriptors);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   InternalTimerService internalTimerService =
+   

[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348791#comment-16348791
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165394684
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
+import org.apache.flink.api.common.state.ReadWriteBroadcastState;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * The base class containing the functionality available to all broadcast 
process function.
+ * These include the {@link BroadcastProcessFunction} and the {@link 
KeyedBroadcastProcessFunction}.
+ */
+@PublicEvolving
+public abstract class BaseBroadcastProcessFunction extends 
AbstractRichFunction {
+
+   private static final long serialVersionUID = -131631008887478610L;
+
+   /**
+* The base context available to all methods in a broadcast process 
function. This
+* include {@link BroadcastProcessFunction BroadcastProcessFunctions} 
and
+* {@link KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}.
+*/
+   abstract class Context {
+
+   /**
+* Timestamp of the element currently being processed or 
timestamp of a firing timer.
+*
+* This might be {@code null}, for example if the time 
characteristic of your program
+* is set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+*/
+   public abstract Long timestamp();
+
+   /**
+* Emits a record to the side output identified by the {@link 
OutputTag}.
+*
+* @param outputTag the {@code OutputTag} that identifies the 
side output to emit to.
+* @param value The record to emit.
+*/
+   public abstract  void output(OutputTag outputTag, X 
value);
+
+   /** Returns the current processing time. */
+   public abstract long currentProcessingTime();
+
+   /** Returns the current event-time watermark. */
+   public abstract long currentWatermark();
+   }
+
+   /**
+* A base {@link Context context} available to the broadcasted stream 
side of
+* a {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream 
BroadcastConnectedStream}.
+*
+* Apart from the basic functionality of a {@link Context context},
+* this also allows to get and update the elements stored in the
+* {@link ReadWriteBroadcastState broadcast state}.
+* In other words, it gives read/write access to the broadcast state.
+*/
+   public abstract class ReadWriteContext extends Context {
--- End diff --

I think this could be called `Context`, similar to my comment on 
`ReadWriteBroadcastState`.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348793#comment-16348793
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165398176
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -753,6 +763,182 @@ public void onTimer(
assertTrue(getOperatorForDataStream(processed) instanceof 
ProcessOperator);
}
 
+   @Test
+   public void testConnectWithBroadcastTranslation() throws Exception {
+
+   final Map expected = new HashMap<>();
+   expected.put(0L, "test:0");
+   expected.put(1L, "test:1");
+   expected.put(2L, "test:2");
+   expected.put(3L, "test:3");
+   expected.put(4L, "test:4");
+   expected.put(5L, "test:5");
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   final DataStream srcOne = env.generateSequence(0L, 5L)
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+
+   @Override
+   public long extractTimestamp(Long 
element, long previousElementTimestamp) {
+   return element;
+   }
+   }).keyBy((KeySelector) value -> 
value);
+
+   final DataStream srcTwo = 
env.fromCollection(expected.values())
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+   @Override
+   public long extractTimestamp(String 
element, long previousElementTimestamp) {
+   return 
Long.parseLong(element.split(":")[1]);
+   }
+   });
+
+   final BroadcastStream broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+
+   // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
+   final DataStream output = 
srcOne.connect(broadcast).process(
+   new TestBroadcastProcessFunction(10L, 
expected));
+
+   output.addSink(new DiscardingSink<>());
+   env.execute();
+   }
+
+   private abstract static class CustomWmEmitter implements 
AssignerWithPunctuatedWatermarks {
+
+   @Nullable
+   @Override
+   public Watermark checkAndGetNextWatermark(T lastElement, long 
extractedTimestamp) {
+   return new Watermark(extractedTimestamp);
+   }
+   }
+
+   private static class TestBroadcastProcessFunction extends 
KeyedBroadcastProcessFunction {
+
+   private final Map expectedState;
+
+   private final long timerTimestamp;
+
+   static final MapStateDescriptor DESCRIPTOR = new 
MapStateDescriptor<>(
+   "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+   );
+
+   TestBroadcastProcessFunction(
+   final long timerTS,
+   final Map expectedBroadcastState
+   ) {
+   expectedState = expectedBroadcastState;
+   timerTimestamp = timerTS;
+   }
+
+   @Override
+   public void processElement(Long value, KeyedReadOnlyContext 
ctx, Collector out) throws Exception {
+   
ctx.timerService().registerEventTimeTimer(timerTimestamp);
+   }
+
+   @Override
+   public void processBroadcastElement(String value, 
KeyedReadWriteContext ctx, Collector out) throws Exception {
+   long key = Long.parseLong(value.split(":")[1]);
+   ctx.getBroadcastState(DESCRIPTOR).put(key, value);
+   }
+
+   @Override
+   public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {
+   Map map = new HashMap<>();
+   for (Map.Entry entry : 
ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) {
+   map.put(entry.getKey(), entry.getValue());
+   }

[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348789#comment-16348789
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165395176
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -586,7 +586,7 @@ private StreamGraph 
generateInternal(List transformatio
transform.getOutputType(),
transform.getName());
 
-   if (transform.getStateKeySelector1() != null) {
+   if (transform.getStateKeySelector1() != null || 
transform.getStateKeySelector2() != null) {
--- End diff --

I think we need some test that using keyed state actually works when only 
one of the inputs is keyed. I think we need an `ITCase` for that somewhere.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165398176
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -753,6 +763,182 @@ public void onTimer(
assertTrue(getOperatorForDataStream(processed) instanceof 
ProcessOperator);
}
 
+   @Test
+   public void testConnectWithBroadcastTranslation() throws Exception {
+
+   final Map expected = new HashMap<>();
+   expected.put(0L, "test:0");
+   expected.put(1L, "test:1");
+   expected.put(2L, "test:2");
+   expected.put(3L, "test:3");
+   expected.put(4L, "test:4");
+   expected.put(5L, "test:5");
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   final DataStream srcOne = env.generateSequence(0L, 5L)
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+
+   @Override
+   public long extractTimestamp(Long 
element, long previousElementTimestamp) {
+   return element;
+   }
+   }).keyBy((KeySelector) value -> 
value);
+
+   final DataStream srcTwo = 
env.fromCollection(expected.values())
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+   @Override
+   public long extractTimestamp(String 
element, long previousElementTimestamp) {
+   return 
Long.parseLong(element.split(":")[1]);
+   }
+   });
+
+   final BroadcastStream broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+
+   // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
+   final DataStream output = 
srcOne.connect(broadcast).process(
+   new TestBroadcastProcessFunction(10L, 
expected));
+
+   output.addSink(new DiscardingSink<>());
+   env.execute();
+   }
+
+   private abstract static class CustomWmEmitter implements 
AssignerWithPunctuatedWatermarks {
+
+   @Nullable
+   @Override
+   public Watermark checkAndGetNextWatermark(T lastElement, long 
extractedTimestamp) {
+   return new Watermark(extractedTimestamp);
+   }
+   }
+
+   private static class TestBroadcastProcessFunction extends 
KeyedBroadcastProcessFunction {
+
+   private final Map expectedState;
+
+   private final long timerTimestamp;
+
+   static final MapStateDescriptor DESCRIPTOR = new 
MapStateDescriptor<>(
+   "broadcast-state", 
BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+   );
+
+   TestBroadcastProcessFunction(
+   final long timerTS,
+   final Map expectedBroadcastState
+   ) {
+   expectedState = expectedBroadcastState;
+   timerTimestamp = timerTS;
+   }
+
+   @Override
+   public void processElement(Long value, KeyedReadOnlyContext 
ctx, Collector out) throws Exception {
+   
ctx.timerService().registerEventTimeTimer(timerTimestamp);
+   }
+
+   @Override
+   public void processBroadcastElement(String value, 
KeyedReadWriteContext ctx, Collector out) throws Exception {
+   long key = Long.parseLong(value.split(":")[1]);
+   ctx.getBroadcastState(DESCRIPTOR).put(key, value);
+   }
+
+   @Override
+   public void onTimer(long timestamp, OnTimerContext ctx, 
Collector out) throws Exception {
+   Map map = new HashMap<>();
+   for (Map.Entry entry : 
ctx.getBroadcastState(DESCRIPTOR).immutableEntries()) {
+   map.put(entry.getKey(), entry.getValue());
+   }
+   Assert.assertEquals(expectedState, map);
+   }
+   }
+
+   /**
+* Tests that with a {@link KeyedStream} we have to provide a {@link 
KeyedBroadcastProcessFunction}.
+*/
+   

[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165395176
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 ---
@@ -586,7 +586,7 @@ private StreamGraph 
generateInternal(List transformatio
transform.getOutputType(),
transform.getName());
 
-   if (transform.getStateKeySelector1() != null) {
+   if (transform.getStateKeySelector1() != null || 
transform.getStateKeySelector2() != null) {
--- End diff --

I think we need some test that using keyed state actually works when only 
one of the inputs is keyed. I think we need an `ITCase` for that somewhere.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165397632
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
@@ -753,6 +763,182 @@ public void onTimer(
assertTrue(getOperatorForDataStream(processed) instanceof 
ProcessOperator);
}
 
+   @Test
+   public void testConnectWithBroadcastTranslation() throws Exception {
+
+   final Map expected = new HashMap<>();
+   expected.put(0L, "test:0");
+   expected.put(1L, "test:1");
+   expected.put(2L, "test:2");
+   expected.put(3L, "test:3");
+   expected.put(4L, "test:4");
+   expected.put(5L, "test:5");
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   final DataStream srcOne = env.generateSequence(0L, 5L)
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+
+   @Override
+   public long extractTimestamp(Long 
element, long previousElementTimestamp) {
+   return element;
+   }
+   }).keyBy((KeySelector) value -> 
value);
+
+   final DataStream srcTwo = 
env.fromCollection(expected.values())
+   .assignTimestampsAndWatermarks(new 
CustomWmEmitter() {
+   @Override
+   public long extractTimestamp(String 
element, long previousElementTimestamp) {
+   return 
Long.parseLong(element.split(":")[1]);
+   }
+   });
+
+   final BroadcastStream broadcast = 
srcTwo.broadcast(TestBroadcastProcessFunction.DESCRIPTOR);
+
+   // the timestamp should be high enough to trigger the timer 
after all the elements arrive.
+   final DataStream output = 
srcOne.connect(broadcast).process(
+   new TestBroadcastProcessFunction(10L, 
expected));
+
+   output.addSink(new DiscardingSink<>());
+   env.execute();
--- End diff --

So far, all tests in this are purely translation tests. I mentioned this in 
another comment, that it would be good to have an ITCase that actually verifies 
that using keyed state works and that the other features work as well in a 
complete program. Have a look at `SideOutputITCase`, for example. 👍 


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165394684
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
+import org.apache.flink.api.common.state.ReadWriteBroadcastState;
+import org.apache.flink.util.OutputTag;
+
+/**
+ * The base class containing the functionality available to all broadcast 
process function.
+ * These include the {@link BroadcastProcessFunction} and the {@link 
KeyedBroadcastProcessFunction}.
+ */
+@PublicEvolving
+public abstract class BaseBroadcastProcessFunction extends 
AbstractRichFunction {
+
+   private static final long serialVersionUID = -131631008887478610L;
+
+   /**
+* The base context available to all methods in a broadcast process 
function. This
+* include {@link BroadcastProcessFunction BroadcastProcessFunctions} 
and
+* {@link KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}.
+*/
+   abstract class Context {
+
+   /**
+* Timestamp of the element currently being processed or 
timestamp of a firing timer.
+*
+* This might be {@code null}, for example if the time 
characteristic of your program
+* is set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+*/
+   public abstract Long timestamp();
+
+   /**
+* Emits a record to the side output identified by the {@link 
OutputTag}.
+*
+* @param outputTag the {@code OutputTag} that identifies the 
side output to emit to.
+* @param value The record to emit.
+*/
+   public abstract  void output(OutputTag outputTag, X 
value);
+
+   /** Returns the current processing time. */
+   public abstract long currentProcessingTime();
+
+   /** Returns the current event-time watermark. */
+   public abstract long currentWatermark();
+   }
+
+   /**
+* A base {@link Context context} available to the broadcasted stream 
side of
+* a {@link 
org.apache.flink.streaming.api.datastream.BroadcastConnectedStream 
BroadcastConnectedStream}.
+*
+* Apart from the basic functionality of a {@link Context context},
+* this also allows to get and update the elements stored in the
+* {@link ReadWriteBroadcastState broadcast state}.
+* In other words, it gives read/write access to the broadcast state.
+*/
+   public abstract class ReadWriteContext extends Context {
--- End diff --

I think this could be called `Context`, similar to my comment on 
`ReadWriteBroadcastState`.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165395860
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
 ---
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
+import org.apache.flink.api.common.state.ReadWriteBroadcastState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateFunction;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link TwoInputStreamOperator} for executing {@link 
KeyedBroadcastProcessFunction KeyedBroadcastProcessFunctions}.
+ *
+ * @param  The key type of the input keyed stream.
+ * @param  The input type of the keyed (non-broadcast) side.
+ * @param  The input type of the broadcast side.
+ * @param  The output type of the operator.
+ */
+@Internal
+public class CoBroadcastWithKeyedOperator
+   extends AbstractUdfStreamOperator>
+   implements TwoInputStreamOperator, 
Triggerable {
+
+   private static final long serialVersionUID = 5926499536290284870L;
+
+   private final List broadcastStateDescriptors;
+
+   private transient TimestampedCollector collector;
+
+   private transient Map broadcastStates;
+
+   private transient ReadWriteContextImpl rwContext;
+
+   private transient ReadOnlyContextImpl rContext;
+
+   private transient OnTimerContextImpl onTimerContext;
+
+   public CoBroadcastWithKeyedOperator(
+   final KeyedBroadcastProcessFunction 
function,
+   final List 
broadcastStateDescriptors) {
+   super(function);
+   this.broadcastStateDescriptors = 
Preconditions.checkNotNull(broadcastStateDescriptors);
+   }
+
+   @Override
+   public void open() throws Exception {
+   super.open();
+
+   InternalTimerService internalTimerService =
+   getInternalTimerService("user-timers", 
VoidNamespaceSerializer.INSTANCE, this);
+
+   TimerService timerService = new 
SimpleTimerService(internalTimerService);
+
+   collector = new TimestampedCollector<>(output);
+

[jira] [Closed] (FLINK-8531) Support separation of "Exclusive", "Shared" and "Task owned" state

2018-02-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-8531.
---

> Support separation of "Exclusive", "Shared" and "Task owned" state
> --
>
> Key: FLINK-8531
> URL: https://issues.apache.org/jira/browse/FLINK-8531
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, all state created at a certain checkpoint goes into the directory 
> {{chk-id}}.
> With incremental checkpointing, some state is shared across checkpoint and is 
> referenced by newer checkpoints. That way, old {{chk-id}} directories stay 
> around, containing some shared chunks. That makes it both for users and 
> cleanup hooks hard to determine when a {{chk-x}} directory could be deleted.
> The same holds for state that can only every be dropped by certain operators 
> on the TaskManager, never by the JobManager / CheckpointCoordinator. Examples 
> of that state are write ahead logs, which need to be retained until the move 
> to the target system is complete, which may in some cases be later then when 
> the checkpoint that created them is disposed.
> I propose to introduce different scopes for tasks:
>   - **EXCLUSIVE** is for state that belongs to one checkpoint only
>   - **SHARED** is for state that is possibly part of multiple checkpoints
>   - **TASKOWNED** is for state that must never by dropped by the JobManager.
> For file based checkpoint targets, I propose that we have the following 
> directory layout:
> {code}
> /user-defined-checkpoint-dir
> |
> + --shared/
> + --taskowned/
> + --chk-1/
> + --chk-2/
> + --chk-3/
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8531) Support separation of "Exclusive", "Shared" and "Task owned" state

2018-02-01 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8531?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-8531.
-
Resolution: Fixed

Fixed as of 4e481a72c1ed3cc5f177b511e5a72cd8726cf976

Consists of steps
  - 99495c91ecce7141ae8b2fbc96492681a9d130bd
  - 35c7d93ee85aa8689e804b713affa65b46af1acc
  - 9903c8c42793b922549835217c586c5928999ea5
  - 5cc50934bdcf80ae1fa69abe69e2f214852653f9
  - bb19e7f5278d43cd4fd265e3d2afa2fcc793ccf5
  - 1887187f6b5c210d2091c69ef14fa8b8a5cae82c
  - fc21423e1f8f1a1661badef20f9c6f368f6daf8b
  - e0b0f45bd9c8b06bd2cda56f6859d0d3944aa00e
  - 4e481a72c1ed3cc5f177b511e5a72cd8726cf976

> Support separation of "Exclusive", "Shared" and "Task owned" state
> --
>
> Key: FLINK-8531
> URL: https://issues.apache.org/jira/browse/FLINK-8531
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, all state created at a certain checkpoint goes into the directory 
> {{chk-id}}.
> With incremental checkpointing, some state is shared across checkpoint and is 
> referenced by newer checkpoints. That way, old {{chk-id}} directories stay 
> around, containing some shared chunks. That makes it both for users and 
> cleanup hooks hard to determine when a {{chk-x}} directory could be deleted.
> The same holds for state that can only every be dropped by certain operators 
> on the TaskManager, never by the JobManager / CheckpointCoordinator. Examples 
> of that state are write ahead logs, which need to be retained until the move 
> to the target system is complete, which may in some cases be later then when 
> the checkpoint that created them is disposed.
> I propose to introduce different scopes for tasks:
>   - **EXCLUSIVE** is for state that belongs to one checkpoint only
>   - **SHARED** is for state that is possibly part of multiple checkpoints
>   - **TASKOWNED** is for state that must never by dropped by the JobManager.
> For file based checkpoint targets, I propose that we have the following 
> directory layout:
> {code}
> /user-defined-checkpoint-dir
> |
> + --shared/
> + --taskowned/
> + --chk-1/
> + --chk-2/
> + --chk-3/
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348761#comment-16348761
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165378004
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/ReadWriteBroadcastState.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A type of state that can be created to store the state of a {@code 
BroadcastStream}. This state assumes that
+ * the same elements are sent to all instances of an operator.
+ *
+ * CAUTION: the user has to guarantee that all task instances 
store the same elements in this type of state.
+ *
+ *  Each operator instance individually maintains and stores elements 
in the broadcast state. The fact that the
+ * incoming stream is a broadcast one guarantees that all instances see 
all the elements. Upon recovery
+ * or re-scaling, the same state is given to each of the instances. To 
avoid hotspots, each task reads its previous
+ * partition, and if there are more tasks (scale up), then the new 
instances read from the old instances in a round
+ * robin fashion. This is why each instance has to guarantee that it 
stores the same elements as the rest. If not,
+ * upon recovery or rescaling you may have unpredictable redistribution of 
the partitions, thus unpredictable results.
+ *
+ * @param  The key type of the elements in the {@link 
ReadWriteBroadcastState}.
+ * @param  The value type of the elements in the {@link 
ReadWriteBroadcastState}.
+ */
+@PublicEvolving
+public interface ReadWriteBroadcastState extends 
ReadOnlyBroadcastState {
--- End diff --

I think we can call this just `BroadcastState`.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348762#comment-16348762
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165377821
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/**
+ * A read-only type of state that gives read-only access to the state of a 
{@code BroadcastStream}
--- End diff --

I think the `access to the state of a BroadCast` stream bit might be a bit 
confusing because it ties it too heavily to broadcast streams.

We should also have a big warning here that you should not modify the 
result of `get()` and also that you should not modify the entries in the 
immutable iterator because this would lead to problems with the heap state 
backend.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348765#comment-16348765
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165385868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -148,21 +170,27 @@ public void close() throws IOException {
@Override
public void dispose() {
IOUtils.closeQuietly(closeStreamOnCancelRegistry);
-   registeredStates.clear();
+   registeredOperatorStates.clear();
+   registeredBroadcastStates.clear();
}
 
// 
---
//  State access methods
// 
---
 
+   @Override
+   public  ReadWriteBroadcastState 
getBroadcastState(MapStateDescriptor stateDescriptor) throws Exception {
--- End diff --

I think we don't need this because the other `getBroadcastState()` is only 
ever called with `BROADCAST` mode.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348764#comment-16348764
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165388777
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,33 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   if (VERSION >= 3) {
--- End diff --

Wouldn't this always depend on the version of the code and not the version 
of the snapshot? That is, if we restore from a `VERSION < 3` snapshot we should 
not go into this code path.

I think here you can get that via `getReadVersion()`.


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8345) Iterate over keyed state on broadcast side of connect with broadcast.

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348763#comment-16348763
 ] 

ASF GitHub Bot commented on FLINK-8345:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165387915
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -57,18 +63,25 @@ public int getVersion() {
@Override
public int[] getCompatibleVersions() {
// we are compatible with version 2 (Flink 1.3.x) and version 1 
(Flink 1.2.x)
-   return new int[] {VERSION, 1};
+   return new int[] {VERSION, 2, 1};
--- End diff --

This should probably now say "we are compatible with versions 3, 2, and 1".


> Iterate over keyed state on broadcast side of connect with broadcast.
> -
>
> Key: FLINK-8345
> URL: https://issues.apache.org/jira/browse/FLINK-8345
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming
>Affects Versions: 1.5.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165387915
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -57,18 +63,25 @@ public int getVersion() {
@Override
public int[] getCompatibleVersions() {
// we are compatible with version 2 (Flink 1.3.x) and version 1 
(Flink 1.2.x)
-   return new int[] {VERSION, 1};
+   return new int[] {VERSION, 2, 1};
--- End diff --

This should probably now say "we are compatible with versions 3, 2, and 1".


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165385868
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
 ---
@@ -148,21 +170,27 @@ public void close() throws IOException {
@Override
public void dispose() {
IOUtils.closeQuietly(closeStreamOnCancelRegistry);
-   registeredStates.clear();
+   registeredOperatorStates.clear();
+   registeredBroadcastStates.clear();
}
 
// 
---
//  State access methods
// 
---
 
+   @Override
+   public  ReadWriteBroadcastState 
getBroadcastState(MapStateDescriptor stateDescriptor) throws Exception {
--- End diff --

I think we don't need this because the other `getBroadcastState()` is only 
ever called with `BROADCAST` mode.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165388777
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java
 ---
@@ -77,16 +90,33 @@ public void read(DataInputView in) throws IOException {
super.read(in);
 
int numKvStates = in.readShort();
-   stateMetaInfoSnapshots = new ArrayList<>(numKvStates);
+   operatorStateMetaInfoSnapshots = new ArrayList<>(numKvStates);
for (int i = 0; i < numKvStates; i++) {
-   stateMetaInfoSnapshots.add(
-   
OperatorBackendStateMetaInfoSnapshotReaderWriters
-   .getReaderForVersion(getReadVersion(), 
userCodeClassLoader)
-   .readStateMetaInfo(in));
+   operatorStateMetaInfoSnapshots.add(
+   
OperatorBackendStateMetaInfoSnapshotReaderWriters
+   
.getOperatorStateReaderForVersion(getReadVersion(), userCodeClassLoader)
+   
.readOperatorStateMetaInfo(in));
}
+
+   if (VERSION >= 3) {
--- End diff --

Wouldn't this always depend on the version of the code and not the version 
of the snapshot? That is, if we restore from a `VERSION < 3` snapshot we should 
not go into this code path.

I think here you can get that via `getReadVersion()`.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165378004
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/ReadWriteBroadcastState.java
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * A type of state that can be created to store the state of a {@code 
BroadcastStream}. This state assumes that
+ * the same elements are sent to all instances of an operator.
+ *
+ * CAUTION: the user has to guarantee that all task instances 
store the same elements in this type of state.
+ *
+ *  Each operator instance individually maintains and stores elements 
in the broadcast state. The fact that the
+ * incoming stream is a broadcast one guarantees that all instances see 
all the elements. Upon recovery
+ * or re-scaling, the same state is given to each of the instances. To 
avoid hotspots, each task reads its previous
+ * partition, and if there are more tasks (scale up), then the new 
instances read from the old instances in a round
+ * robin fashion. This is why each instance has to guarantee that it 
stores the same elements as the rest. If not,
+ * upon recovery or rescaling you may have unpredictable redistribution of 
the partitions, thus unpredictable results.
+ *
+ * @param  The key type of the elements in the {@link 
ReadWriteBroadcastState}.
+ * @param  The value type of the elements in the {@link 
ReadWriteBroadcastState}.
+ */
+@PublicEvolving
+public interface ReadWriteBroadcastState extends 
ReadOnlyBroadcastState {
--- End diff --

I think we can call this just `BroadcastState`.


---


[GitHub] flink pull request #5230: [FLINK-8345] Add iterator of keyed state on broadc...

2018-02-01 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5230#discussion_r165377821
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/ReadOnlyBroadcastState.java
 ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Map;
+
+/**
+ * A read-only type of state that gives read-only access to the state of a 
{@code BroadcastStream}
--- End diff --

I think the `access to the state of a BroadCast` stream bit might be a bit 
confusing because it ties it too heavily to broadcast streams.

We should also have a big warning here that you should not modify the 
result of `get()` and also that you should not modify the entries in the 
immutable iterator because this would lead to problems with the heap state 
backend.


---


[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348755#comment-16348755
 ] 

ASF GitHub Bot commented on FLINK-8308:
---

Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5395
  
Sure.


> Update yajl-ruby dependency to 1.3.1 or higher
> --
>
> Key: FLINK-8308
> URL: https://issues.apache.org/jira/browse/FLINK-8308
> Project: Flink
>  Issue Type: Task
>  Components: Project Website
>Reporter: Fabian Hueske
>Assignee: Steven Langbroek
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> We got notified that yajl-ruby < 1.3.1, a dependency which is used to build 
> the Flink website, has a  security vulnerability of high severity.
> We should update yajl-ruby to 1.3.1 or higher.
> Since the website is built offline and served as static HTML, I don't think 
> this is a super critical issue (please correct me if I'm wrong), but we 
> should resolve this soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-01 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5395
  
Sure.


---


[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348744#comment-16348744
 ] 

ASF GitHub Bot commented on FLINK-8308:
---

Github user StevenLangbroek commented on the issue:

https://github.com/apache/flink/pull/5395
  
@alpinegizmo I tested locally with Ruby 2.3 (default on High Sierra). I'm 
having some trouble with `rvm`, could you help testing with different versions?


> Update yajl-ruby dependency to 1.3.1 or higher
> --
>
> Key: FLINK-8308
> URL: https://issues.apache.org/jira/browse/FLINK-8308
> Project: Flink
>  Issue Type: Task
>  Components: Project Website
>Reporter: Fabian Hueske
>Assignee: Steven Langbroek
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> We got notified that yajl-ruby < 1.3.1, a dependency which is used to build 
> the Flink website, has a  security vulnerability of high severity.
> We should update yajl-ruby to 1.3.1 or higher.
> Since the website is built offline and served as static HTML, I don't think 
> this is a super critical issue (please correct me if I'm wrong), but we 
> should resolve this soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-01 Thread StevenLangbroek
Github user StevenLangbroek commented on the issue:

https://github.com/apache/flink/pull/5395
  
@alpinegizmo I tested locally with Ruby 2.3 (default on High Sierra). I'm 
having some trouble with `rvm`, could you help testing with different versions?


---


[jira] [Commented] (FLINK-8308) Update yajl-ruby dependency to 1.3.1 or higher

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348739#comment-16348739
 ] 

ASF GitHub Bot commented on FLINK-8308:
---

Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5395
  
It looks good, but I haven't tested it. I'm wondering what versions of Ruby 
this has been tested with. At a minimum it needs to work with whatever version 
we can get on the production build infrastructure, as well as 2.3 and 2.4, 
since most developers will have one of those versions. And maybe 2.5, since 
that's out now. 


> Update yajl-ruby dependency to 1.3.1 or higher
> --
>
> Key: FLINK-8308
> URL: https://issues.apache.org/jira/browse/FLINK-8308
> Project: Flink
>  Issue Type: Task
>  Components: Project Website
>Reporter: Fabian Hueske
>Assignee: Steven Langbroek
>Priority: Critical
> Fix For: 1.5.0, 1.4.1
>
>
> We got notified that yajl-ruby < 1.3.1, a dependency which is used to build 
> the Flink website, has a  security vulnerability of high severity.
> We should update yajl-ruby to 1.3.1 or higher.
> Since the website is built offline and served as static HTML, I don't think 
> this is a super critical issue (please correct me if I'm wrong), but we 
> should resolve this soon.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5395: [FLINK-8308] Remove explicit yajl-ruby dependency, update...

2018-02-01 Thread alpinegizmo
Github user alpinegizmo commented on the issue:

https://github.com/apache/flink/pull/5395
  
It looks good, but I haven't tested it. I'm wondering what versions of Ruby 
this has been tested with. At a minimum it needs to work with whatever version 
we can get on the production build infrastructure, as well as 2.3 and 2.4, 
since most developers will have one of those versions. And maybe 2.5, since 
that's out now. 


---


[jira] [Commented] (FLINK-8472) Extend migration tests for Flink 1.4

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348713#comment-16348713
 ] 

ASF GitHub Bot commented on FLINK-8472:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5364#discussion_r165385564
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
 ---
@@ -100,9 +100,7 @@ public static void main(String[] args) throws Exception 
{
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4);
 
-   if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
--- End diff --

@zentol can you confirm whether these changes make sense?

From the discussions I see in https://github.com/apache/flink/pull/3844, I 
assume this was a leftover to write a 1.2 savepoint (when uids couldn't be 
added for each chained operator separately). So it should be ok to remove this?


> Extend migration tests for Flink 1.4
> 
>
> Key: FLINK-8472
> URL: https://issues.apache.org/jira/browse/FLINK-8472
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The following migration tests should be extended to cover migrating Flink 
> jobs with a 1.4 savepoint.
>  * {{WindowOperatorMigrationTest}}
>  * {{CEPMigrationTest}}
>  * {{StatefulJobSavepointMigrationTestITCase}}
>  * {{FlinkKinesisConsumerMigrationTest}}
>  * {{FlinkKafkaConsumerBaseMigrationTest}}
>  * {{ContinuousFileProcessingMigrationTest}}
>  * {{BucketingSinkMigrationTest}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8546) Respect savepoint settings and recover from latest checkpoint in Flip-6

2018-02-01 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8546:


 Summary: Respect savepoint settings and recover from latest 
checkpoint in Flip-6
 Key: FLINK-8546
 URL: https://issues.apache.org/jira/browse/FLINK-8546
 Project: Flink
  Issue Type: Improvement
  Components: JobManager
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


The {{JobMaster}} should respect savepoints and recover from the latest 
checkpoint if possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5364: [FLINK-8472] [test] Extend all migration tests for...

2018-02-01 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5364#discussion_r165385564
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
 ---
@@ -100,9 +100,7 @@ public static void main(String[] args) throws Exception 
{
.map(new StatefulStringStoringMap(mode, "first"))
.setParallelism(4);
 
-   if (mode == ExecutionMode.MIGRATE || mode == 
ExecutionMode.RESTORE) {
--- End diff --

@zentol can you confirm whether these changes make sense?

From the discussions I see in https://github.com/apache/flink/pull/3844, I 
assume this was a leftover to write a 1.2 savepoint (when uids couldn't be 
added for each chained operator separately). So it should be ok to remove this?


---


[jira] [Commented] (FLINK-8472) Extend migration tests for Flink 1.4

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348707#comment-16348707
 ] 

ASF GitHub Bot commented on FLINK-8472:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5364
  
@aljoscha @zentol 
could you have another quick look at commit 8882fb7? That commit extends 
`KeyedComplexChainTest`, `ChainBreakTest`, `ChainLengthIncreaseTest`, ... etc. 
also for 1.4.

Once that is good, I'll merge this to `master` and `release-1.4`.


> Extend migration tests for Flink 1.4
> 
>
> Key: FLINK-8472
> URL: https://issues.apache.org/jira/browse/FLINK-8472
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> The following migration tests should be extended to cover migrating Flink 
> jobs with a 1.4 savepoint.
>  * {{WindowOperatorMigrationTest}}
>  * {{CEPMigrationTest}}
>  * {{StatefulJobSavepointMigrationTestITCase}}
>  * {{FlinkKinesisConsumerMigrationTest}}
>  * {{FlinkKafkaConsumerBaseMigrationTest}}
>  * {{ContinuousFileProcessingMigrationTest}}
>  * {{BucketingSinkMigrationTest}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5364: [FLINK-8472] [test] Extend all migration tests for Flink ...

2018-02-01 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5364
  
@aljoscha @zentol 
could you have another quick look at commit 8882fb7? That commit extends 
`KeyedComplexChainTest`, `ChainBreakTest`, `ChainLengthIncreaseTest`, ... etc. 
also for 1.4.

Once that is good, I'll merge this to `master` and `release-1.4`.


---


[jira] [Commented] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()

2018-02-01 Thread Dyana Rose (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348691#comment-16348691
 ] 

Dyana Rose commented on FLINK-8073:
---

Another instance: [https://travis-ci.org/apache/flink/jobs/336094494,] 
https://api.travis-ci.org/v3/job/336094494/log.txt

> Test instability 
> FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
> -
>
> Key: FLINK-8073
> URL: https://issues.apache.org/jira/browse/FLINK-8073
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8384) Session Window Assigner with Dynamic Gaps

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348692#comment-16348692
 ] 

ASF GitHub Bot commented on FLINK-8384:
---

Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
the ci fail looks to be a known flaky test: 
FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint


> Session Window Assigner with Dynamic Gaps
> -
>
> Key: FLINK-8384
> URL: https://issues.apache.org/jira/browse/FLINK-8384
> Project: Flink
>  Issue Type: Improvement
>  Components: Streaming
>Reporter: Dyana Rose
>Priority: Minor
>
> *Reason for Improvement*
> Currently both Session Window assigners only allow a static inactivity gap. 
> Given the following scenario, this is too restrictive:
> * Given a stream of IoT events from many device types
> * Assume each device type could have a different inactivity gap
> * Assume each device type gap could change while sessions are in flight
> By allowing dynamic inactivity gaps, the correct gap can be determined in the 
> [assignWindows 
> function|https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java#L59]
>  by passing the element currently under consideration, the timestamp, and the 
> context to a user defined function. This eliminates the need to create 
> unwieldy work arounds if you only have static gaps.
> Dynamic Session Window gaps should be available for both Event Time and 
> Processing Time streams.
> (short preliminary discussion: 
> https://lists.apache.org/thread.html/08a011c0039e826343e9be0b1bb4ecfc2e12976bc65f8a43ee88@%3Cdev.flink.apache.org%3E)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5295: [FLINK-8384] [streaming] Session Window Assigner with Dyn...

2018-02-01 Thread dyanarose
Github user dyanarose commented on the issue:

https://github.com/apache/flink/pull/5295
  
the ci fail looks to be a known flaky test: 
FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint


---


[jira] [Commented] (FLINK-8545) Implement upsert stream table source

2018-02-01 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348675#comment-16348675
 ] 

Fabian Hueske commented on FLINK-8545:
--

Yes, many user have been asking for this feature recently.
Great that you're taking the initiative to work on this!

I'd propose to first start with the {{DataStream}} to {{Table}} upsert 
conversion and work on the {{UpsertTableSource}} interface later because these 
will need careful API design.

The keys could be defined similar to the time indicators, i.e.,
{code:java}
DataStream[(String, Long, Int)] input = ???
// upsert with key
Table table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
// upsert without key -> single row table
Table table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
I think it would be good to be able to declare a time attribute that decides 
whether an upsert is performed or not.
For example this could look like this:
{code:java}
DataStream[(String, Long, Int)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 
'c.key){code}
 

 

> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8545) Implement upsert stream table source

2018-02-01 Thread Hequn Cheng (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-8545:
---
Description: 
As more and more users are eager for ingesting data with upsert mode in flink 
sql/table-api, it is valuable to enable table source with upsert mode. I will 
provide a design doc later and we can have more discussions. Any suggestions 
are warmly welcomed !

 

  was:
As more and more users are eager for ingesting data with upsert mode, it is 
valuable to enable table source with upsert mode. I will provide a design doc 
later and we can have more discussions. Any suggestions are warmly welcomed !

 


> Implement upsert stream table source 
> -
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8545) Implement upsert stream table source

2018-02-01 Thread Hequn Cheng (JIRA)
Hequn Cheng created FLINK-8545:
--

 Summary: Implement upsert stream table source 
 Key: FLINK-8545
 URL: https://issues.apache.org/jira/browse/FLINK-8545
 Project: Flink
  Issue Type: New Feature
  Components: Table API  SQL
Reporter: Hequn Cheng
Assignee: Hequn Cheng


As more and more users are eager for ingesting data with upsert mode, it is 
valuable to enable table source with upsert mode. I will provide a design doc 
later and we can have more discussions. Any suggestions are warmly welcomed !

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5398: [hotfix][cep] Remove migration from 1.5 test

2018-02-01 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5398
  
Thanks, LGTM.

We can add it back once `release-1.5` is cut.
It might not really make any difference in the end, but we never know :)


---


[GitHub] flink issue #5398: [hotfix][cep] Remove migration from 1.5 test

2018-02-01 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5398
  
@tzulitai I removed the migration from 1.5 


---


[GitHub] flink pull request #5398: [hotfix][cep] Remove migration from 1.5 test

2018-02-01 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/5398

[hotfix][cep] Remove migration from 1.5 test

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink cep-remove-1-5-test

Alternatively you can review and apply these changes as the patch at:


[GitHub] flink pull request #:

2018-02-01 Thread dawidwys
Github user dawidwys commented on the pull request:


https://github.com/apache/flink/commit/a2533f406d46b1c5acb5f70c263f9afad839dffe#commitcomment-27262545
  
In 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java:
In 
flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
 on line 74:
You're right. Will create a hotfix PR, that removes it.


---


[jira] [Updated] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-02-01 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5372:
-
Affects Version/s: 1.5.0

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.1
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-02-01 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-5372:
-
Fix Version/s: 1.5.0

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.1
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5372) Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()

2018-02-01 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348593#comment-16348593
 ] 

Till Rohrmann commented on FLINK-5372:
--

Another instance: https://api.travis-ci.org/v3/job/335627318/log.txt

> Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints()
> --
>
> Key: FLINK-5372
> URL: https://issues.apache.org/jira/browse/FLINK-5372
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.1
>
>
> The test is currently {{@Ignored}}. We have to change 
> {{AsyncCheckpointOperator}} to make sure that we can run fully 
> asynchronously. Then, the test will still fail because the canceling 
> behaviour was changed in the meantime.
> {code}
> public static class AsyncCheckpointOperator
> extends AbstractStreamOperator
> implements OneInputStreamOperator {
> @Override
> public void open() throws Exception {
> super.open();
> // also get the state in open, this way we are sure that it was 
> created before
> // we trigger the test checkpoint
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> }
> @Override
> public void processElement(StreamRecord element) throws Exception 
> {
> // we also don't care
> ValueState state = getPartitionedState(
> VoidNamespace.INSTANCE,
> VoidNamespaceSerializer.INSTANCE,
> new ValueStateDescriptor<>("count",
> StringSerializer.INSTANCE, "hello"));
> state.update(element.getValue());
> }
> @Override
> public void snapshotState(StateSnapshotContext context) throws Exception {
> // do nothing so that we don't block
> }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >