[jira] [Reopened] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-16 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou reopened FLINK-9373:
---

> Always call RocksIterator.status() to check the internal error of RocksDB
> -
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that _iterator.isValid()_ may also cause by a internal error. A safer 
> way to use the _RocksIterator_ is to always call the _iterator.status()_ to 
> check the internal error of _RocksDB_. There is a case from user email seems 
> to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9091) Failure while enforcing releasability in building flink-json module

2018-05-16 Thread Timo Walther (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476972#comment-16476972
 ] 

Timo Walther commented on FLINK-9091:
-

[~yew1eb] can we close this issue or do you still get this error?

> Failure while enforcing releasability in building flink-json module
> ---
>
> Key: FLINK-9091
> URL: https://issues.apache.org/jira/browse/FLINK-9091
> Project: Flink
>  Issue Type: Bug
>  Components: Build System
>Reporter: Ted Yu
>Assignee: Hai Zhou
>Priority: Major
> Attachments: f-json.out
>
>
> Got the following when building flink-json module:
> {code}
> [WARNING] Rule 0: org.apache.maven.plugins.enforcer.DependencyConvergence 
> failed with message:
> Failed while enforcing releasability. See above detailed error message.
> ...
> [ERROR] Failed to execute goal 
> org.apache.maven.plugins:maven-enforcer-plugin:3.0.0-M1:enforce 
> (dependency-convergence) on project flink-json: Some Enforcer rules have 
> failed.   Look above for specific messages explaining why the rule failed. -> 
> [Help 1]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6001: [FLINK-9299] ProcessWindowFunction documentation J...

2018-05-16 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6001#discussion_r188529928
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -797,7 +797,7 @@ DataStream input = ...;
 
 input
   .keyBy()
-  .timeWindow()
+  .timeWindow()
--- End diff --

How about using "duration" instead of "time size". I think "time size" is 
not a commonly used term...


---


[jira] [Commented] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476984#comment-16476984
 ] 

ASF GitHub Bot commented on FLINK-9299:
---

Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/6001#discussion_r188529928
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -797,7 +797,7 @@ DataStream input = ...;
 
 input
   .keyBy()
-  .timeWindow()
+  .timeWindow()
--- End diff --

How about using "duration" instead of "time size". I think "time size" is 
not a commonly used term...


> ProcessWindowFunction documentation Java examples have errors
> -
>
> Key: FLINK-9299
> URL: https://issues.apache.org/jira/browse/FLINK-9299
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: Ken Krugler
>Assignee: vinoyang
>Priority: Minor
>
> In looking at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation],
>  I noticed a few errors...
>  * "This allows to incrementally compute windows" should be "This allows it 
> to incrementally compute windows"
>  * DataStream input = ...; should be 
> DataStream> input = ...;
>  * The getResult() method needs to cast one of the accumulator values to a 
> double, if that's what it is going to return.
>  * MyProcessWindowFunction needs to extend, not implement 
> ProcessWindowFunction
>  * MyProcessWindowFunction needs to implement a process() method, not an 
> apply() method.
>  * The call to .timeWindow takes a Time parameter, not a window assigner.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9182) async checkpoints for timer service

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9182?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477009#comment-16477009
 ] 

ASF GitHub Bot commented on FLINK-9182:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6019
  
I wonder can you introduce a `HeapState` which scoped to `key group` to 
support timer service. This way timer service is backed by keyed state backend, 
which looks like a beautiful things.


> async checkpoints for timer service
> ---
>
> Key: FLINK-9182
> URL: https://issues.apache.org/jira/browse/FLINK-9182
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: makeyang
>Assignee: makeyang
>Priority: Minor
> Fix For: 1.4.3, 1.5.1
>
>
> # problem description:
>  ## with the increase in the number of  'InternalTimer' object the checkpoint 
> more and more slowly
>  # improvement desgin
>  ## maintain a stateTableVersion, which is exactly the same thing as 
> CopyOnWriteStateTable and snapshotVersions which is exactly the same thing as 
> CopyOnWriteStateTable in InternalTimeServiceManager. one more thing: a 
> readwrite lock, which is used to protect snapshotVersions and 
> stateTableVersion
>  ## for each InternalTimer, add 2 more properties: create version and delete 
> version beside 3 existing properties: timestamp, key and namespace. each time 
> a Timer is registered in timerservice, it is created with stateTableVersion 
> as its create version while delete version is -1. each time when timer is 
> deleted in timerservice, it is marked delete for giving it a delete verison 
> equals to stateTableVersion without physically delete it from timerservice.
>  ## each time when try to snapshot timers, InternalTimeServiceManager 
> increase its stateTableVersion and add this stateTableVersion in 
> snapshotVersions. these 2 operators are protected by write lock of 
> InternalTimeServiceManager. that current stateTableVersion take as snapshot 
> version of this snapshot
>  ## shallow copy  tuples
>  ## then use a another thread asynchronous snapshot whole things: 
> keyserialized, namespaceserializer and timers. for timers which is not 
> deleted(delete version is -1) and create version less than snapshot version, 
> serialized it. for timers whose delete version is not -1 and is bigger than 
> or equals snapshot version, serialized it. otherwise, it will not be 
> serialized by this snapshot.
>  ## when everything is serialized, remove snapshot version in 
> snapshotVersions, which is still in another thread and this action is guarded 
> by write lock.
>  ## last thing: timer physical deletion. 2 places to physically delete 
> timers: each time when timer is deleted in timerservice, it is marked delete 
> for giving it a delete verison equals to stateTableVersion without physically 
> delete it from timerservice. after this, check if snapshotVersions size is 0 
> (which means there is no running snapshot) and if true, delete timer .the 
> other place to delete is in snapshot timer's iterat: when timer's delete 
> version is less than min value of snapshotVersions, which means the timer is 
> deleted and no running snapshot should keep it.
>  ## some more additions: processingTimeTimers and eventTimeTimers for each 
> group used to be hashset and now it is changed to concurrenthashmap with 
> key+namesapce+timestamp as its hash key.
>  # related mail list thread
>  ## 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Slow-flink-checkpoint-td18946.html
>  # github pull request
>  ## //coming soon



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9337) Implement AvroDeserializationSchema

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477008#comment-16477008
 ] 

ASF GitHub Bot commented on FLINK-9337:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
I would actually keep the package name for now. It makes sense, because the 
connection to the registry is avro-specific at the moment...


> Implement AvroDeserializationSchema
> ---
>
> Key: FLINK-9337
> URL: https://issues.apache.org/jira/browse/FLINK-9337
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6019: [FLINK-9182]async checkpoints for timer service

2018-05-16 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6019
  
I wonder can you introduce a `HeapState` which scoped to `key group` to 
support timer service. This way timer service is backed by keyed state backend, 
which looks like a beautiful things.


---


[GitHub] flink issue #5995: [FLINK-9337] Implemented AvroDeserializationSchema

2018-05-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5995
  
I would actually keep the package name for now. It makes sense, because the 
connection to the registry is avro-specific at the moment...


---


[jira] [Updated] (FLINK-9376) Allow upgrading to incompatible state serializers (state schema evolution)

2018-05-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9376:
---
Description: 
Currently, users have access to upgrade state serializers on the restore run of 
a stateful job, as long as the upgraded new serializer remains backwards 
compatible with all previous written data in the savepoint (i.e. it can read 
all previous and current schema of serialized state objects).

What is still lacking is the ability to upgrade to incompatible serializers. 
Upon being registered an incompatible serializer for existing restored state, 
that state needs to go through the process of -
 1. read serialized state with the previous serializer
 2. passing each deserialized state object through a “migration map function”, 
and
 3. writing back the state with the new serializer

The availability of this process should be strictly limited to state 
registrations that occur before the actual processing begins (e.g. in the 
{{open}} or {{initializeState}} methods), so that we avoid performing these 
operations during processing.

Procedure 2. will allow even state type migrations, but that is out-of-scope of 
this JIRA.
 This ticket focuses only on procedures 1. and 3., where we try to enable 
schema evolution without state type changes.

This is an umbrella JIRA ticket that overlooks this feature, including a few 
preliminary tasks that work towards enabling it.

  was:
Currently, users have access to upgrade state serializers on the restore run of 
a stateful job, as long as the upgraded new serializer remains backwards 
compatible with all previous written data in the savepoint (i.e. it can read 
all previous and current schema of serialized state objects).

What is still lacking is the ability to upgrade to incompatible serializers. 
Upon being registered an incompatible serializer for existing restored state, 
that state needs to go through the process of -
 1. read serialized state with the previous serializer
 2. passing each deserialized state object through a “migration map function”, 
and
 3. writing back the state with the new serializer

This should be strictly limited to state registrations that occur before the 
actual processing begins (e.g. in the {{open}} or {{initializeState}} methods), 
so that we avoid performing these operations during processing.

Procedure 2. will allow even state type migrations, but that is out-of-scope of 
this JIRA.
 This ticket focuses only on procedures 1. and 3., where we try to enable 
schema evolution without state type changes.

This is an umbrella JIRA ticket that overlooks this feature, including a few 
preliminary tasks that work towards enabling it.


> Allow upgrading to incompatible state serializers (state schema evolution)
> --
>
> Key: FLINK-9376
> URL: https://issues.apache.org/jira/browse/FLINK-9376
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.6.0
>
>
> Currently, users have access to upgrade state serializers on the restore run 
> of a stateful job, as long as the upgraded new serializer remains backwards 
> compatible with all previous written data in the savepoint (i.e. it can read 
> all previous and current schema of serialized state objects).
> What is still lacking is the ability to upgrade to incompatible serializers. 
> Upon being registered an incompatible serializer for existing restored state, 
> that state needs to go through the process of -
>  1. read serialized state with the previous serializer
>  2. passing each deserialized state object through a “migration map 
> function”, and
>  3. writing back the state with the new serializer
> The availability of this process should be strictly limited to state 
> registrations that occur before the actual processing begins (e.g. in the 
> {{open}} or {{initializeState}} methods), so that we avoid performing these 
> operations during processing.
> Procedure 2. will allow even state type migrations, but that is out-of-scope 
> of this JIRA.
>  This ticket focuses only on procedures 1. and 3., where we try to enable 
> schema evolution without state type changes.
> This is an umbrella JIRA ticket that overlooks this feature, including a few 
> preliminary tasks that work towards enabling it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9376) Allow upgrading to incompatible state serializers (state schema evolution)

2018-05-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9376?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9376:
---
Description: 
Currently, users have access to upgrade state serializers on the restore run of 
a stateful job, as long as the upgraded new serializer remains backwards 
compatible with all previous written data in the savepoint (i.e. it can read 
all previous and current schema of serialized state objects).

What is still lacking is the ability to upgrade to incompatible serializers. 
Upon being registered an incompatible serializer for existing restored state, 
that state needs to go through the process of -
 1. read serialized state with the previous serializer
 2. passing each deserialized state object through a “migration map function”, 
and
 3. writing back the state with the new serializer

The availability of this process should be strictly limited to state 
registrations that occur before the actual processing begins (e.g. in the 
{{open}} or {{initializeState}} methods), so that we avoid performing these 
operations during processing.

How this procedure actually occurs, differs across different types of state 
backends.
For example, for state backends that eagerly deserialize / lazily serialize 
state (e.g. {{HeapStateBackend}}), the job execution itself can be seen as a 
"migration"; everything is deserialized to state objects on restore, and is 
only serialized again, with the new serializer, on checkpoints.
Therefore, for these state backends, the above process is irrelevant.

On the other hand, for state backends that lazily deserialize / eagerly 
serialize state (e.g. {{RocksDBStateBackend}}), the state evolution process 
needs to happen for every state with a newly registered incompatible serializer.

Procedure 2. will allow even state type migrations, but that is out-of-scope of 
this JIRA.
 This ticket focuses only on procedures 1. and 3., where we try to enable 
schema evolution without state type changes.

This is an umbrella JIRA ticket that overlooks this feature, including a few 
preliminary tasks that work towards enabling it.

  was:
Currently, users have access to upgrade state serializers on the restore run of 
a stateful job, as long as the upgraded new serializer remains backwards 
compatible with all previous written data in the savepoint (i.e. it can read 
all previous and current schema of serialized state objects).

What is still lacking is the ability to upgrade to incompatible serializers. 
Upon being registered an incompatible serializer for existing restored state, 
that state needs to go through the process of -
 1. read serialized state with the previous serializer
 2. passing each deserialized state object through a “migration map function”, 
and
 3. writing back the state with the new serializer

The availability of this process should be strictly limited to state 
registrations that occur before the actual processing begins (e.g. in the 
{{open}} or {{initializeState}} methods), so that we avoid performing these 
operations during processing.

Procedure 2. will allow even state type migrations, but that is out-of-scope of 
this JIRA.
 This ticket focuses only on procedures 1. and 3., where we try to enable 
schema evolution without state type changes.

This is an umbrella JIRA ticket that overlooks this feature, including a few 
preliminary tasks that work towards enabling it.


> Allow upgrading to incompatible state serializers (state schema evolution)
> --
>
> Key: FLINK-9376
> URL: https://issues.apache.org/jira/browse/FLINK-9376
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing, Type Serialization System
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Critical
> Fix For: 1.6.0
>
>
> Currently, users have access to upgrade state serializers on the restore run 
> of a stateful job, as long as the upgraded new serializer remains backwards 
> compatible with all previous written data in the savepoint (i.e. it can read 
> all previous and current schema of serialized state objects).
> What is still lacking is the ability to upgrade to incompatible serializers. 
> Upon being registered an incompatible serializer for existing restored state, 
> that state needs to go through the process of -
>  1. read serialized state with the previous serializer
>  2. passing each deserialized state object through a “migration map 
> function”, and
>  3. writing back the state with the new serializer
> The availability of this process should be strictly limited to state 
> registrations that occur before the actual processing begins (e.g. in the 
> {{open}} or {{initializeState}} methods), so that we avoid performing these 
> operations during processing.
> How this procedure 

[jira] [Commented] (FLINK-9341) Oracle: "Type is not supported: Date"

2018-05-16 Thread Sergey Nuyanzin (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477191#comment-16477191
 ] 

Sergey Nuyanzin commented on FLINK-9341:


issue created FLINK-9378
will be able to do it in the next days

> Oracle: "Type is not supported: Date"
> -
>
> Key: FLINK-9341
> URL: https://issues.apache.org/jira/browse/FLINK-9341
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.2
>Reporter: Ken Geis
>Priority: Major
>
> When creating a Table from an Oracle JDBCInputFormat with a date column, I 
> get the error "Type is not supported: Date". This happens with as simple a 
> query as
> {code:java}
> SELECT SYSDATE FROM DUAL{code}
>  Stack trace:
> {noformat}
> Caused by: org.apache.flink.table.api.TableException: Type is not supported: 
> Date
>     at 
> org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) 
> ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:336)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:68)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:198)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:195)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  ~[scala-library-2.11.11.jar:na]
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
> ~[scala-library-2.11.11.jar:na]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:195)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:499)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485) 
> ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.java.BatchTableEnvironment.fromDataSet(BatchTableEnvironment.scala:61)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.java.BatchTableEnvironment$fromDataSet$0.call(Unknown
>  Source) ~[na:na]
> (at my code...)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9299) ProcessWindowFunction documentation Java examples have errors

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476927#comment-16476927
 ] 

ASF GitHub Bot commented on FLINK-9299:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6001
  
cc @tzulitai @fhueske 


> ProcessWindowFunction documentation Java examples have errors
> -
>
> Key: FLINK-9299
> URL: https://issues.apache.org/jira/browse/FLINK-9299
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.2
>Reporter: Ken Krugler
>Assignee: vinoyang
>Priority: Minor
>
> In looking at 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation],
>  I noticed a few errors...
>  * "This allows to incrementally compute windows" should be "This allows it 
> to incrementally compute windows"
>  * DataStream input = ...; should be 
> DataStream> input = ...;
>  * The getResult() method needs to cast one of the accumulator values to a 
> double, if that's what it is going to return.
>  * MyProcessWindowFunction needs to extend, not implement 
> ProcessWindowFunction
>  * MyProcessWindowFunction needs to implement a process() method, not an 
> apply() method.
>  * The call to .timeWindow takes a Time parameter, not a window assigner.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6001: [FLINK-9299] ProcessWindowFunction documentation Java exa...

2018-05-16 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6001
  
cc @tzulitai @fhueske 


---


[jira] [Commented] (FLINK-9325) generate the _meta file for checkpoint only when the writing is truly successful

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476945#comment-16476945
 ] 

ASF GitHub Bot commented on FLINK-9325:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen I guess this PR is already for an another look now...


> generate the _meta file for checkpoint only when the writing is truly 
> successful
> 
>
> Key: FLINK-9325
> URL: https://issues.apache.org/jira/browse/FLINK-9325
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Major
>
> We should generate the _meta file for checkpoint only when the writing is 
> totally successful. We should write the metadata file first to a temp file 
> and then atomically rename it (with an equivalent workaround for S3). 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5982: [FLINK-9325][checkpoint]generate the meta file for checkp...

2018-05-16 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5982
  
@StephanEwen I guess this PR is already for an another look now...


---


[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476998#comment-16476998
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r188532278
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
 ---
@@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode {
 */
   def consumesRetractions: Boolean = false
 
+  /**
+* Whether the [[DataStreamRel]] produces retraction messages.
+*/
+  def producesRetractions: Boolean = false
--- End diff --

Thanks for the explanation.


> Implement stream-stream non-window left outer join
> --
>
> Key: FLINK-8428
> URL: https://issues.apache.org/jira/browse/FLINK-8428
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Implement stream-stream non-window left outer join for sql/table-api. A 
> simple design doc can be found 
> [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-05-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r188532278
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamRel.scala
 ---
@@ -53,4 +53,8 @@ trait DataStreamRel extends FlinkRelNode {
 */
   def consumesRetractions: Boolean = false
 
+  /**
+* Whether the [[DataStreamRel]] produces retraction messages.
+*/
+  def producesRetractions: Boolean = false
--- End diff --

Thanks for the explanation.


---


[GitHub] flink issue #6010: [FLINK-9359][docs] Update quickstart docs to only mention...

2018-05-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6010
  
merging.


---


[GitHub] flink issue #6005: [FLINK-9176][tests] Remove category annotations

2018-05-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6005
  
merging.


---


[jira] [Commented] (FLINK-9354) print execution times for end-to-end tests

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477097#comment-16477097
 ] 

ASF GitHub Bot commented on FLINK-9354:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6008#discussion_r188550129
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -33,8 +33,8 @@ else
   NUM_SLOTS=$NEW_DOP
 fi
 
-STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file}
-STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true}
+STATE_BACKEND_TYPE=${3:-file}
+STATE_BACKEND_FILE_ASYNC=${4:-true}
--- End diff --

will do  


> print execution times for end-to-end tests
> --
>
> Key: FLINK-9354
> URL: https://issues.apache.org/jira/browse/FLINK-9354
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests, Travis
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> We need to modify the end-to-end scripts to include the time it takes for a 
> test to run.
> We currently don't have any clue how long a test actually runs for.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6008: [FLINK-9354][travis] Print execution times for nig...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6008#discussion_r188550129
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -33,8 +33,8 @@ else
   NUM_SLOTS=$NEW_DOP
 fi
 
-STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file}
-STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true}
+STATE_BACKEND_TYPE=${3:-file}
+STATE_BACKEND_FILE_ASYNC=${4:-true}
--- End diff --

will do 👍 


---


[jira] [Created] (FLINK-9376) Allow upgrading to incompatible state serializers (state schema evolution)

2018-05-16 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-9376:
--

 Summary: Allow upgrading to incompatible state serializers (state 
schema evolution)
 Key: FLINK-9376
 URL: https://issues.apache.org/jira/browse/FLINK-9376
 Project: Flink
  Issue Type: New Feature
  Components: State Backends, Checkpointing, Type Serialization System
Reporter: Tzu-Li (Gordon) Tai
 Fix For: 1.6.0


Currently, users have access to upgrade state serializers on the restore run of 
a stateful job, as long as the upgraded new serializer remains backwards 
compatible with all previous written data in the savepoint (i.e. it can read 
all previous and current schema of serialized state objects).

What is still lacking is the ability to upgrade to incompatible serializers. 
Upon being registered an incompatible serializer for existing restored state, 
that state needs to go through the process of -
1. read serialized state with the previous serializer
2. passing each deserialized state object through a “migration map function”, 
and
3. writing back the state with the new serializer

This should be strictly limited to state registrations that occur before the 
actual processing begins (e.g. in the `open` or `initializeState` methods), so 
that we avoid performing these operations during processing.

Procedure 2. will allow even state type migrations, but that is out-of-scope of 
this JIRA.
This ticket focuses only on procedures 1. and 3., where we try to enable schema 
evolution without state type changes.

This is an umbrella JIRA ticket that overlooks this feature, including a few 
preliminary tasks that work towards enabling it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476909#comment-16476909
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512862
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
 

[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476905#comment-16476905
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511081
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
 

[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476908#comment-16476908
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511258
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
 

[jira] [Commented] (FLINK-9292) Remove TypeInfoParser

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476930#comment-16476930
 ] 

ASF GitHub Bot commented on FLINK-9292:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5970
  
cc @tzulitai 


> Remove TypeInfoParser
> -
>
> Key: FLINK-9292
> URL: https://issues.apache.org/jira/browse/FLINK-9292
> Project: Flink
>  Issue Type: Task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: vinoyang
>Priority: Major
>
> The {{TypeInfoParser}} has been deprecated, in favor of the {{TypeHint}}.
> Because the TypeInfoParser is also not working correctly with respect to 
> classloading, we should remove it. Users still find the class, try to use it, 
> and run into problems.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5970: [FLINK-9292] [core] Remove TypeInfoParser (part 2)

2018-05-16 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5970
  
cc @tzulitai 


---


[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476994#comment-16476994
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r188531785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -176,14 +179,34 @@ class DataStreamJoin(
   body,
   returnType)
 
-val coMapFun =
-  new NonWindowInnerJoin(
-leftSchema.typeInfo,
-rightSchema.typeInfo,
-CRowTypeInfo(returnType),
-genFunction.name,
-genFunction.code,
-queryConfig)
+val coMapFun = joinType match {
+  case JoinRelType.INNER =>
+new NonWindowInnerJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  queryConfig)
+  case JoinRelType.LEFT if joinInfo.isEqui =>
+new NonWindowLeftRightJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  joinType == JoinRelType.LEFT,
+  queryConfig)
+  case JoinRelType.LEFT =>
--- End diff --

We can also do it as part of FLINK-8429.


> Implement stream-stream non-window left outer join
> --
>
> Key: FLINK-8428
> URL: https://issues.apache.org/jira/browse/FLINK-8428
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Implement stream-stream non-window left outer join for sql/table-api. A 
> simple design doc can be found 
> [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8428) Implement stream-stream non-window left outer join

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476992#comment-16476992
 ] 

ASF GitHub Bot commented on FLINK-8428:
---

Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r188531694
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -60,6 +60,9 @@ class DataStreamJoin(
 
   override def needsUpdatesAsRetraction: Boolean = true
 
+  // outer join will generate retractions
+  override def producesRetractions: Boolean = joinType != JoinRelType.INNER
--- End diff --

Thanks, now I understand the terminology between producing and just 
forwarding retractions.


> Implement stream-stream non-window left outer join
> --
>
> Key: FLINK-8428
> URL: https://issues.apache.org/jira/browse/FLINK-8428
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Hequn Cheng
>Assignee: Hequn Cheng
>Priority: Major
>
> Implement stream-stream non-window left outer join for sql/table-api. A 
> simple design doc can be found 
> [here|https://docs.google.com/document/d/1u7bJHeEBP_hFhi8Jm4oT3FqQDOm2pJDqCtq1U1WMHDo/edit?usp=sharing]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-05-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r188531785
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -176,14 +179,34 @@ class DataStreamJoin(
   body,
   returnType)
 
-val coMapFun =
-  new NonWindowInnerJoin(
-leftSchema.typeInfo,
-rightSchema.typeInfo,
-CRowTypeInfo(returnType),
-genFunction.name,
-genFunction.code,
-queryConfig)
+val coMapFun = joinType match {
+  case JoinRelType.INNER =>
+new NonWindowInnerJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  queryConfig)
+  case JoinRelType.LEFT if joinInfo.isEqui =>
+new NonWindowLeftRightJoin(
+  leftSchema.typeInfo,
+  rightSchema.typeInfo,
+  CRowTypeInfo(returnType),
+  genFunction.name,
+  genFunction.code,
+  joinType == JoinRelType.LEFT,
+  queryConfig)
+  case JoinRelType.LEFT =>
--- End diff --

We can also do it as part of FLINK-8429.


---


[jira] [Commented] (FLINK-9360) HA end-to-end nightly test takes more than 15 min in Travis CI

2018-05-16 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477040#comment-16477040
 ] 

Chesnay Schepler commented on FLINK-9360:
-

I will push a hotfix to reduce the sleep to 30 seconds.

> HA end-to-end nightly test takes more than 15 min in Travis CI
> --
>
> Key: FLINK-9360
> URL: https://issues.apache.org/jira/browse/FLINK-9360
> Project: Flink
>  Issue Type: Test
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Andrey Zagrebin
>Priority: Major
>  Labels: E2E, Nightly
> Fix For: 1.5.0
>
>
> We have not discussed how long the nightly tests should run. Currently 
> overall testing build time is around the limit of free Travis CI VM (50 min).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5928#discussion_r188548955
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -35,60 +35,62 @@ the same semantics as a failure-free execution.
 See [Checkpointing]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) for how to enable and
 configure checkpoints for your program.
 
-## Externalized Checkpoints
+## Retain The Checkpoints
--- End diff --

"Retained Checkpoints"


---


[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5928#discussion_r188548729
  
--- Diff: docs/_includes/generated/checkpointing_configuration.html ---
@@ -40,7 +40,7 @@
 
 state.checkpoints.dir
 (none)
-The default directory used for checkpoints. Used by the 
state backends that write checkpoints to file systems (MemoryStateBackend, 
FsStateBackend, RocksDBStateBackend).
+The default directory used for storing the data files and 
meta data of checkpoints in a Flink supported filesystem. Note: the storage 
path must be accessible from all participating processes/nodes(i.e. all 
TaskManagers and JobManagers).
--- End diff --

typo: "Note: The"


---


[GitHub] flink pull request #5928: [hotfix][doc] fix doc of externalized checkpoint

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5928#discussion_r188549219
  
--- Diff: docs/ops/state/checkpoints.md ---
@@ -35,60 +35,62 @@ the same semantics as a failure-free execution.
 See [Checkpointing]({{ site.baseurl 
}}/dev/stream/state/checkpointing.html) for how to enable and
 configure checkpoints for your program.
 
-## Externalized Checkpoints
+## Retain The Checkpoints
 
 Checkpoints are by default not persisted externally and are only used to
 resume a job from failures. They are deleted when a program is cancelled.
 You can, however, configure periodic checkpoints to be persisted externally
-similarly to [savepoints](savepoints.html). These *externalized 
checkpoints*
-write their meta data out to persistent storage and are *not* automatically
-cleaned up when the job fails. This way, you will have a checkpoint around
-to resume from if your job fails.
+similarly to [savepoints](savepoints.html). This way, you will have a 
persisted 
+checkpoint around to resume from if your job fails.
 
 {% highlight java %}
 CheckpointConfig config = env.getCheckpointConfig();
 
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 {% endhighlight %}
 
-The `ExternalizedCheckpointCleanup` mode configures what happens with 
externalized checkpoints when you cancel the job:
+The `ExternalizedCheckpointCleanup` mode configures what happens with 
checkpoints when you cancel the job:
 
-- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the 
externalized checkpoint when the job is cancelled. Note that you have to 
manually clean up the checkpoint state after cancellation in this case.
+- **`ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION`**: Retain the 
checkpoint when the job is cancelled. Note that you have to manually clean up 
the checkpoint state after cancellation in this case.
 
-- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the 
externalized checkpoint when the job is cancelled. The checkpoint state will 
only be available if the job fails.
+- **`ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION`**: Delete the 
checkpoint when the job is cancelled. The checkpoint state will only be 
available if the job fails.
 
 ### Directory Structure
 
-Similarly to [savepoints](savepoints.html), an externalized checkpoint 
consists
-of a meta data file and, depending on the state back-end, some additional 
data
-files. The **target directory** for the externalized checkpoint's meta 
data is
-determined from the configuration key `state.checkpoints.dir` which, 
currently,
-can only be set via the configuration files.
+Similarly to [savepoints](savepoints.html), an checkpoint consists
--- End diff --

typo: "a checkpoint"


---


[jira] [Updated] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-05-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-9377:
---
Description: 
When writing meta information of a state in savepoints, we currently write both 
the state serializer as well as the state serializer's configuration snapshot.

Writing both is actually redundant, as most of the time they have identical 
information.
 Moreover, the fact that we use Java serialization to write the serializer and 
rely on it to be re-readable on the restore run, already poses problems for 
serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) to 
perform even a compatible upgrade.

The proposal here is to leave only the config snapshot as meta information, and 
use that as the single source of truth of information about the schema of 
serialized state.
 The config snapshot should be treated as a factory (or provided to a factory) 
to re-create serializers capable of reading old, serialized state.

  was:
When writing meta information of a state in savepoints, we currently write both 
the state serializer as well as the state serializer's configuration snapshot.

Writing both is actually redundant, as most of the time they have identical 
information.
Moreover, the fact that we use Java serialization to write the serializer and 
rely on it to be re-readable on the restore run, already poses problems for 
serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202).

The proposal here is to leave only the config snapshot as meta information, and 
use that as the single source of truth of information about the schema of 
serialized state.
The config snapshot should be treated as a factory (or provided to a factory) 
to re-create serializers capable of reading old, serialized state.


> Remove writing serializers as part of the checkpoint meta information
> -
>
> Key: FLINK-9377
> URL: https://issues.apache.org/jira/browse/FLINK-9377
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> When writing meta information of a state in savepoints, we currently write 
> both the state serializer as well as the state serializer's configuration 
> snapshot.
> Writing both is actually redundant, as most of the time they have identical 
> information.
>  Moreover, the fact that we use Java serialization to write the serializer 
> and rely on it to be re-readable on the restore run, already poses problems 
> for serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202) 
> to perform even a compatible upgrade.
> The proposal here is to leave only the config snapshot as meta information, 
> and use that as the single source of truth of information about the schema of 
> serialized state.
>  The config snapshot should be treated as a factory (or provided to a 
> factory) to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9377) Remove writing serializers as part of the checkpoint meta information

2018-05-16 Thread Tzu-Li (Gordon) Tai (JIRA)
Tzu-Li (Gordon) Tai created FLINK-9377:
--

 Summary: Remove writing serializers as part of the checkpoint meta 
information
 Key: FLINK-9377
 URL: https://issues.apache.org/jira/browse/FLINK-9377
 Project: Flink
  Issue Type: Sub-task
  Components: State Backends, Checkpointing
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
 Fix For: 1.6.0


When writing meta information of a state in savepoints, we currently write both 
the state serializer as well as the state serializer's configuration snapshot.

Writing both is actually redundant, as most of the time they have identical 
information.
Moreover, the fact that we use Java serialization to write the serializer and 
rely on it to be re-readable on the restore run, already poses problems for 
serializers such as the {{AvroSerializer}} (see discussion in FLINK-9202).

The proposal here is to leave only the config snapshot as meta information, and 
use that as the single source of truth of information about the schema of 
serialized state.
The config snapshot should be treated as a factory (or provided to a factory) 
to re-create serializers capable of reading old, serialized state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6944) Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for serializer compatibility checks

2018-05-16 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477194#comment-16477194
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-6944:


I'll change this to be a subtask of FLINK-9376, so that all things related to 
state evolution is consolidated there for easier tracking.
Will also mark this as a blocker for 1.6.0.

> Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for 
> serializer compatibility checks
> --
>
> Key: FLINK-6944
> URL: https://issues.apache.org/jira/browse/FLINK-6944
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing, Type Serialization System
>Affects Versions: 1.3.0, 1.3.1
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, we store both the {{TypeSerializer}} and its corresponding 
> {{TypeSerializerConfigSnapshot}} in checkpoints of managed state. This, in 
> most cases, are actually duplicate information.
> This JIRA proposes to change this by only storing the 
> {{TypeSerializerConfigSnapshot}}, while at the same time, letting 
> {{TypeSerializer.snapshotConfiguration}} return a default 
> {{DefaultTypeSerializerConfigSnapshot}}.
> This default simply serializes the serializer instance using Java 
> serialization.
> The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serializer bytes, 
> the serialVersionUID of the serializer class, and the serializer class' 
> classname. The latter two will be used to check compatibility in the default 
> implementation of {{TypeSerializer.ensureCompatibility}}. Specifically, if 
> classname / serialVersionUID has changed, the default implementation of 
> {{TypeSerializer.ensureCompatibility}} will simply return 
> {{CompatibilityResult.requiresMigration}} with the deserialized serializer as 
> the convert deserializer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6944) Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for serializer compatibility checks

2018-05-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6944:
---
Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-9376

> Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for 
> serializer compatibility checks
> --
>
> Key: FLINK-6944
> URL: https://issues.apache.org/jira/browse/FLINK-6944
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Affects Versions: 1.3.0, 1.3.1
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Currently, we store both the {{TypeSerializer}} and its corresponding 
> {{TypeSerializerConfigSnapshot}} in checkpoints of managed state. This, in 
> most cases, are actually duplicate information.
> This JIRA proposes to change this by only storing the 
> {{TypeSerializerConfigSnapshot}}, while at the same time, letting 
> {{TypeSerializer.snapshotConfiguration}} return a default 
> {{DefaultTypeSerializerConfigSnapshot}}.
> This default simply serializes the serializer instance using Java 
> serialization.
> The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serializer bytes, 
> the serialVersionUID of the serializer class, and the serializer class' 
> classname. The latter two will be used to check compatibility in the default 
> implementation of {{TypeSerializer.ensureCompatibility}}. Specifically, if 
> classname / serialVersionUID has changed, the default implementation of 
> {{TypeSerializer.ensureCompatibility}} will simply return 
> {{CompatibilityResult.requiresMigration}} with the deserialized serializer as 
> the convert deserializer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6944) Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for serializer compatibility checks

2018-05-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6944:
---
Priority: Blocker  (was: Major)

> Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for 
> serializer compatibility checks
> --
>
> Key: FLINK-6944
> URL: https://issues.apache.org/jira/browse/FLINK-6944
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Affects Versions: 1.3.0, 1.3.1
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> Currently, we store both the {{TypeSerializer}} and its corresponding 
> {{TypeSerializerConfigSnapshot}} in checkpoints of managed state. This, in 
> most cases, are actually duplicate information.
> This JIRA proposes to change this by only storing the 
> {{TypeSerializerConfigSnapshot}}, while at the same time, letting 
> {{TypeSerializer.snapshotConfiguration}} return a default 
> {{DefaultTypeSerializerConfigSnapshot}}.
> This default simply serializes the serializer instance using Java 
> serialization.
> The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serializer bytes, 
> the serialVersionUID of the serializer class, and the serializer class' 
> classname. The latter two will be used to check compatibility in the default 
> implementation of {{TypeSerializer.ensureCompatibility}}. Specifically, if 
> classname / serialVersionUID has changed, the default implementation of 
> {{TypeSerializer.ensureCompatibility}} will simply return 
> {{CompatibilityResult.requiresMigration}} with the deserialized serializer as 
> the convert deserializer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-16 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9373:
--
Description: Currently, when using RocksIterator we only use the 
_iterator.isValid()_ to check whether we have reached the end of the iterator. 
But that is not enough, if we refer to RocksDB's wiki 
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find 
that even if _iterator.isValid()=true_, there may also exist some internal 
error. A safer way to use the _RocksIterator_ is to always call the 
_iterator.status()_ to check the internal error of _RocksDB_. There is a case 
from user email seems to lost data because of this 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html
  (was: Currently, when using RocksIterator we only use the 
_iterator.isValid()_ to check whether we have reached the end of the iterator. 
But that is not enough, if we refer to RocksDB's wiki 
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find 
that _iterator.isValid()_ may also cause by a internal error. A safer way to 
use the _RocksIterator_ is to always call the _iterator.status()_ to check the 
internal error of _RocksDB_. There is a case from user email seems to lost data 
because of this 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html)

> Always call RocksIterator.status() to check the internal error of RocksDB
> -
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-05-16 Thread Franz Thoma (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Franz Thoma updated FLINK-9374:
---
Description: 
The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
{{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
internally holds an unbounded queue of records that have not yet been sent.

Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow 
indefinitely if Flink sends records faster than the KPL can forward them to 
Kinesis.

One way to circumvent this problem is to set a record TTL, so that queued 
records are dropped after a certain amount of time, but this will lead to data 
loss under high loads.

Currently the only time the queue is flushed is during checkpointing: 
{{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
checkpoint is reached (and will wait until the queue is flushed), or until 
out-of-memory, whichever is reached first. (This gets worse due to the fact 
that the Java KPL is only a thin wrapper around a C++ process, so it is not 
even the Java process that runs out of memory, but the C++ process.) The 
implicit rate-limit due to checkpointing leads to a ragged throughput graph 
like this (the periods with zero throughput are the wait times before a 
checkpoint):

!file:///home/fthoma/projects/flink/before.png!!before.png! Throughput limited 
by checkpointing only

My proposed solution is to add a config option {{queueLimit}} to set a maximum 
number of records that may be waiting in the KPL queue. If this limit is 
reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and wait 
(blocking) until the queue length is below the limit again. This automatically 
leads to backpressuring, since the {{FlinkKinesisProducer}} cannot accept 
records while waiting. For compatibility, {{queueLimit}} is set to 
{{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a client 
explicitly sets the value. Setting a »sane« default value is not possible 
unfortunately, since sensible values for the limit depend on the record size 
(the limit should be chosen so that about 10–100MB of records per shard are 
accumulated before flushing, otherwise the maximum Kinesis throughput may not 
be reached).

!after.png! Throughput with a queue limit of 10 records (the spikes are 
checkpoints, where the queue is still flushed completely)

  was:
The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
{{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
internally holds an unbounded queue of records that have not yet been sent.

Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow 
indefinitely if Flink sends records faster than the KPL can forward them to 
Kinesis.

One way to circumvent this problem is to set a record TTL, so that queued 
records are dropped after a certain amount of time, but this will lead to data 
loss under high loads.

Currently the only time the queue is flushed is during checkpointing: 
{{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
checkpoint is reached (and will wait until the queue is flushed), or until 
out-of-memory, whichever is reached first. (This gets worse due to the fact 
that the Java KPL is only a thin wrapper around a C++ process, so it is not 
even the Java process that runs out of memory, but the C++ process.) The 
implicit rate-limit due to checkpointing leads to a ragged throughput graph 
like this (the periods with zero throughput are the wait times before a 
checkpoint):

!file:///home/fthoma/projects/flink/before.png!!before.png! Throughput limited 
by checkpointing only

My proposed solution is to add a config option {{queueLimit}} to set a maximum 
number of records that may be waiting in the KPL queue. If this limit is 
reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and wait 
(blocking) until the queue length is below the limit again. This automatically 
leads to backpressuring, since the {{FlinkKinesisProducer}} cannot accept 
records while waiting. For compatibility, {{queueLimit}} is set to 
{{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a client 
explicitly sets the value. Setting a »sane« default value is not possible 
unfortunately, since sensible values for the limit depend on the record size 
(the limit should be chosen so that about 10–100MB of records per shard are 
accumulated before flushing, otherwise the maximum Kinesis throughput may not 
be reached).

!after.png! Throughput with a queue limit of 10 records (the spikes are 
checkpoints, where the queue is still flushed completely)


> Flink Kinesis Producer does not backpressure
> 
>
> Key: FLINK-9374
> URL: https://issues.apache.org/jira/browse/FLINK-9374
>  

[GitHub] flink pull request #6021: [FLINK-9374] [kinesis] Enable FlinkKinesisProducer...

2018-05-16 Thread fmthoma
GitHub user fmthoma opened a pull request:

https://github.com/apache/flink/pull/6021

[FLINK-9374] [kinesis] Enable FlinkKinesisProducer Backpressuring

## What is the purpose of the change

The `FlinkKinesisProducer` just accepts records and forwards it to a 
`KinesisProducer` from the Amazon Kinesis Producer Library (KPL). The KPL 
internally holds an unbounded queue of records that have not yet been sent.

Since Kinesis is rate-limited to 1MB per second per shard, this queue may 
grow indefinitely if Flink sends records faster than the KPL can forward them 
to Kinesis.

One way to circumvent this problem is to set a record TTL, so that queued 
records are dropped after a certain amount of time, but this will lead to data 
loss under high loads.

Currently the only time the queue is flushed is during checkpointing: 
`FlinkKinesisProducer` consumes records at arbitrary rate, either until a 
checkpoint is reached (and will wait until the queue is flushed), or until 
out-of-memory, whichever is reached first. (This gets worse due to the fact 
that the Java KPL is only a thin wrapper around a C++ process, so it is not 
even the Java process that runs out of memory, but the C++ process.)

My proposed solution is to add a config option `queueLimit` to set a 
maximum number of records that may be waiting in the KPL queue. If this limit 
is reached, the `FlinkKinesisProducer` should trigger a `flush()` and wait 
(blocking) until the queue length is below the limit again. This automatically 
leads to backpressuring, since the `FlinkKinesisProducer` cannot accept records 
while waiting. For compatibility, `queueLimit` is set to `Integer.MAX_VALUE` by 
default, so the behavior is unchanged unless a client explicitly sets the 
value. Setting a »sane« default value is not possible unfortunately, since 
sensible values for the limit depend on the record size (the limit should be 
chosen so that about 10–100MB of records per shard are accumulated before 
flushing, otherwise the maximum Kinesis throughput may not be reached).

## Brief change log

* Add a `queueLimit` setting to `FlinkKinesisProducer` to limit the number 
of in-flight records in the Kinesis Producer Library, and enable backpressuring 
if the limit is exceeded

## Verifying this change

This change added tests and can be verified as follows:

* Added unit test
* Manually verified the change by running a job that produces to a 2-shard 
Kinesis stream. The input rate is limited by Kinesis (verified that the Kinesis 
stream is indeed at maximum capacity).

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes, but backwards compatible (option was added)
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): don't know
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: don't know
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fmthoma/flink queueLimit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6021.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6021


commit 9a2930cbbec4cd6979e6bfacb741da820cdbb284
Author: Franz Thoma 
Date:   2018-05-09T06:27:47Z

[FLINK-9374] [kinesis] Add hardcoded queue size limit of 10 records

commit e41037eb5e07efb73ded7f945111d0d5f6e9b18b
Author: Franz Thoma 
Date:   2018-05-09T06:56:53Z

[FLINK-9374] [kinesis] Expose queueLimit option

commit 9222849869da0018718072c33b32d8d935f3dec4
Author: Franz Thoma 
Date:   2018-05-09T07:08:11Z

[FLINK-9374] [kinesis] Refactor test: Mock implementation of flush() only 
flushes *some*, not *all* records

commit f062c5b9cd2e572da9fef0cdb5c8ea89af2a228c
Author: Franz Thoma 
Date:   2018-05-09T11:59:05Z

[FLINK-9374] [kinesis] adapt tests




---


[GitHub] flink issue #6022: [FLINK-9283] Update cluster execution docs

2018-05-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6022
  
merging.


---


[jira] [Created] (FLINK-9375) Introduce AbortCheckpoint message from JM to TMs

2018-05-16 Thread Stefan Richter (JIRA)
Stefan Richter created FLINK-9375:
-

 Summary: Introduce AbortCheckpoint message from JM to TMs
 Key: FLINK-9375
 URL: https://issues.apache.org/jira/browse/FLINK-9375
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Stefan Richter


We should introduce an {{AbortCheckpoint}} message that a jobmanager can send 
to taskmanagers if a checkpoint is canceled so that the operators can eagerly 
stop their alignment phase and continue to normal processing. This can reduce 
some backpressure issues in the context of canceled and restarted checkpoints.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8918) Introduce Runtime Filter Join

2018-05-16 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8918:
--
Description: 
In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the `joined ratio` of the stream join is often very low, for 
example.
 - stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, payment, collection, retweet) log with the 
`promotion_id` to analysis the effect of the promotion.

 - stream join in AD(advertising) attribution: Job need to join the AD click 
log with the item payment log on the `click_id` to find which click of which AD 
that brings the payment to do attribution.
 - stream join in click log analysis of doc: Job need to join viewed log(doc 
viewed by users) with the click log (doc clicked by users) to analysis the 
reason of the click and the property of the users.
 - ….so on

All these cases have one common property, that is the _joined ratio_ is very 
low. Here is a example to describe it, imagine that, we have 1 records from 
the left stream, and 1 records from the right stream, and we execute 
_select * from leftStream l join rightStream r on l.id = r.id_ , we only got 
100 record from the result, that is the case for low _joined ratio_, this is an 
example for inner join, but it can also apply to left & right join.

there are more example I can come up with low _joined ratio_ , but the most 
important point I want to expressed is that, the low _joined ratio_ of stream 
join in production is a very common phenomenon(maybe the almost common 
phenomenon in some companies, at least in our company that is the case).

*Then how to improve it?*

We can see from the above case, 1 record join 1 record we only got 100 
result, that means, we query the state 2 times (1 for the left stream 
and 1 for the right stream) but only 100 of them are meaningful!!! If we 
could reduce the useless query times, then we can definitely improve the 
performance of stream join.

the way we used to improve this is to introduce the _Runtime Filter Join_, the 
mainly ideal is that, we build a _filter_ for the state on each side (left 
stream & right stream). When we need to query the state on that side we first 
check the corresponding _filter_ whether the _key_ is possible in the state, if 
the _filter_ say "not, it impossible in the state", then we stop querying the 
state, if it say "hmm, it maybe in state", then we need to query the state. As 
you can see, the best choose of the _filter_ is _Bloom Filter_, it has all the 
feature that we expected: _extremely good performance_, _non-existence of false 
negative_.

 


*the simplest pseudo code for _Runtime Filter Join_(the comments inline are 
based on RocksDBBackend)*
{code:java}
void performJoinNormally(Record recordFromLeftStream) {
Iterator rightIterator = rigthStreamState.iterator();
// perform the `seek()` on the RocksDB, and iterator one by one,
// this is an expensive operation especially when the key can't be found in 
RocksDB.
for (Record recordFromRightState : rightIterator) {
...
}
}

void performRuntimeFilterJoin(Record recordFromLeftStream) {
Iterator rightIterator = EMPTY_ITERATOR;
if (rigthStreamfilter.containsCurrentKey()) {
rightIterator = rigthStreamState.iterator();
}
// perform the `seek()` only when filter.containsCurrentKey() return true
for (Record recordFromRightState : rightIterator) {
...
}

// add the current key into the filter of left stream.
leftStreamFilter.addCurrentKey();
}
{code}

A description of Runtime Filter Join for batch join can be found 
[here|https://www.cloudera.com/documentation/enterprise/5-9-x/topics/impala_runtime_filtering.html]
 (even though it not for stream join original, but we can easily refer it to 
`stream join`)

  was:
In general, stream join is one of the most performance cost task. For every 
record from both side, we need to query the state from the other side, this 
will lead to poor performance when the state size if huge. So, in production, 
we always need to spend a lot slots to handle stream join. But, indeed, we can 
improve this in somehow, there a phenomenon of stream join can be found in 
production. That's the `joined ratio` of the stream join is often very low, for 
example.
 - stream join in promotion analysis: Job need to join the promotion log with 
the action(click, view, payment, collection, retweet) log with the 
`promotion_id` to analysis the effect of the promotion.

 

[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

2018-05-16 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477127#comment-16477127
 ] 

Fabian Hueske commented on FLINK-8866:
--

Regarding (3), originally, {{TableSink}} was designed to be flexible with 
regard to the output schema. A query that should be emitted was planned by the 
optimizer. Based on the resulting type, the TableSink was internally configured 
for the result schema. The configuration method produces a configured copy of 
the sink that is used to emit the result. So, the TableSink was not known to 
Calcite and only handled by the TableEnvironment.

When we added support for {{INSERT INTO}} this didn't work anymore because 
Calcite validates that the schema of the target table is compatible with the 
result schema of the SELECT query. Hence, we added the field names and types to 
the registration, configure the TableSink, and register the newly configured 
TableSink in Calcite's catalog. By doing it this way, we did not have to change 
the interface of the TableSink which did not only mean backwards compatibility 
but also that all TableSinks can be used in either way.

> Create unified interfaces to configure and instatiate TableSinks
> 
>
> Key: FLINK-8866
> URL: https://issues.apache.org/jira/browse/FLINK-8866
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Shuyi Chen
>Priority: Major
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure 
> and instantiate TableSinks. Among other applications, this is necessary in 
> order to declare table sinks in an environment file of the SQL client. Such 
> that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to 
> TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) 
> for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to 
> identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9373) Fix potential data losing for RocksDBBackend

2018-05-16 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9373:
--
Summary: Fix potential data losing for RocksDBBackend  (was: Always call 
RocksIterator.status() to check the internal error of RocksDB)

> Fix potential data losing for RocksDBBackend
> 
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9303) Unassign partitions from Kafka client if partitions become unavailable

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477171#comment-16477171
 ] 

ASF GitHub Bot commented on FLINK-9303:
---

Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r188564095
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -221,7 +221,8 @@ private FlinkKafkaConsumer08(
getLong(
checkNotNull(props, "props"),

KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
-   !getBoolean(props, KEY_DISABLE_METRICS, false));
+   !getBoolean(props, KEY_DISABLE_METRICS, false),
+   getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, 
false));
--- End diff --

You're right, I'll change it


> Unassign partitions from Kafka client if partitions become unavailable
> --
>
> Key: FLINK-9303
> URL: https://issues.apache.org/jira/browse/FLINK-9303
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Tzu-Li (Gordon) Tai
>Priority: Major
> Fix For: 1.6.0
>
>
> Originally reported in ML:
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Dynamically-deleting-kafka-topics-does-not-remove-partitions-from-kafkaConsumer-td19946.html]
> The problem is that the Kafka consumer has no notion of "closed" partitions 
> at the moment, so statically assigned partitions to the Kafka client is never 
> removed and is always continuously requested for records.
> This causes LOG noises as reported in the reported mail thread.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5991: [FLINK-9303] [kafka] Adding support for unassign d...

2018-05-16 Thread EAlexRojas
Github user EAlexRojas commented on a diff in the pull request:

https://github.com/apache/flink/pull/5991#discussion_r188564095
  
--- Diff: 
flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 ---
@@ -221,7 +221,8 @@ private FlinkKafkaConsumer08(
getLong(
checkNotNull(props, "props"),

KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, PARTITION_DISCOVERY_DISABLED),
-   !getBoolean(props, KEY_DISABLE_METRICS, false));
+   !getBoolean(props, KEY_DISABLE_METRICS, false),
+   getBoolean(props, KEY_CHECK_UNAVAILABLE_TOPICS, 
false));
--- End diff --

You're right, I'll change it


---


[GitHub] flink pull request #6001: [FLINK-9299] ProcessWindowFunction documentation J...

2018-05-16 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/6001#discussion_r188530473
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -797,7 +797,7 @@ DataStream input = ...;
 
 input
   .keyBy()
-  .timeWindow()
+  .timeWindow()
--- End diff --

accept, hold on...


---


[jira] [Assigned] (FLINK-9375) Introduce AbortCheckpoint message from JM to TMs

2018-05-16 Thread vinoyang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

vinoyang reassigned FLINK-9375:
---

Assignee: vinoyang

> Introduce AbortCheckpoint message from JM to TMs
> 
>
> Key: FLINK-9375
> URL: https://issues.apache.org/jira/browse/FLINK-9375
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: vinoyang
>Priority: Major
>
> We should introduce an {{AbortCheckpoint}} message that a jobmanager can send 
> to taskmanagers if a checkpoint is canceled so that the operators can eagerly 
> stop their alignment phase and continue to normal processing. This can reduce 
> some backpressure issues in the context of canceled and restarted checkpoints.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9378) Improve TableException message with TypeName usage

2018-05-16 Thread Sergey Nuyanzin (JIRA)
Sergey Nuyanzin created FLINK-9378:
--

 Summary: Improve TableException message with TypeName usage
 Key: FLINK-9378
 URL: https://issues.apache.org/jira/browse/FLINK-9378
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Sergey Nuyanzin
Assignee: Sergey Nuyanzin


Currently in TableException simple name is in use. It is not clear what is the 
issue while having error message like {noformat}
Exception in thread "main" org.apache.flink.table.api.TableException: Result 
field does not match requested type. Requested: Date; Actual: Date
at 
org.apache.flink.table.api.TableEnvironment$$anonfun$generateRowConverterFunction$1.apply(TableEnvironment.scala:953)
{noformat}
or
{noformat}Caused by: org.apache.flink.table.api.TableException: Type is not 
supported: Date
at 
org.apache.flink.table.api.TableException$.apply(exceptions.scala:53){noformat}
also for more detailed have a look at FLINK-9341



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511081
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   

[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476906#comment-16476906
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511280
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
 

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512949
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   

[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476910#comment-16476910
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512949
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
 

[jira] [Commented] (FLINK-8659) Add migration tests for Broadcast state.

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476907#comment-16476907
 ] 

ASF GitHub Bot commented on FLINK-8659:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512804
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
 

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512862
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188512804
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511280
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   

[GitHub] flink pull request #5955: [FLINK-8659] Add migration itcases for broadcast s...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5955#discussion_r188511258
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/StatefulJobWBroadcastStateMigrationITCase.java
 ---
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing.utils;
+
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.StateBackendLoader;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.BroadcastStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.migration.MigrationVersion;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Migration ITCases for a stateful job with broadcast state. The tests 
are parameterized to (potentially)
+ * cover migrating for multiple previous Flink versions, as well as for 
different state backends.
+ */
+@RunWith(Parameterized.class)
+public class StatefulJobWBroadcastStateMigrationITCase extends 
SavepointMigrationTestBase {
+
+   private static final int NUM_SOURCE_ELEMENTS = 4;
+
+   // TODO change this to PERFORM_SAVEPOINT to regenerate binary savepoints
+   private final StatefulJobSavepointMigrationITCase.ExecutionMode 
executionMode =
+   
StatefulJobSavepointMigrationITCase.ExecutionMode.VERIFY_SAVEPOINT;
+
+   @Parameterized.Parameters(name = "Migrate Savepoint / Backend: {0}")
+   public static Collection> parameters 
() {
+   return Arrays.asList(
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.MEMORY_STATE_BACKEND_NAME),
+   Tuple2.of(MigrationVersion.v1_5, 
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME));
+   }
+
+   private final MigrationVersion testMigrateVersion;
+   private final String testStateBackend;
+
+   public 
StatefulJobWBroadcastStateMigrationITCase(Tuple2 
testMigrateVersionAndBackend) throws Exception {
+   this.testMigrateVersion = testMigrateVersionAndBackend.f0;
+   this.testStateBackend = testMigrateVersionAndBackend.f1;
+   }
+
+   @Test
+   public void testSavepoint() throws Exception {
+
+   final int parallelism = 4;
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.setRestartStrategy(RestartStrategies.noRestart());
+   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+   switch (testStateBackend) {
+   case StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME:
+   

[jira] [Created] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-16 Thread Sihua Zhou (JIRA)
Sihua Zhou created FLINK-9373:
-

 Summary: Always call RocksIterator.status() to check the internal 
error of RocksDB
 Key: FLINK-9373
 URL: https://issues.apache.org/jira/browse/FLINK-9373
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.5.0
Reporter: Sihua Zhou
Assignee: Sihua Zhou


Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
check whether we have reached the end of the iterator. But that is not enough, 
if we refer to RocksDB's wiki 
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find 
that _iterator.isValid()_ may also cause by a internal error. A safer way to 
use the _RocksIterator_ is to always call the _iterator.status()_ to check the 
internal error of _RocksDB_. There is a case from user email seems to lost data 
because of this 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16476989#comment-16476989
 ] 

ASF GitHub Bot commented on FLINK-9373:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6020

[FLINK-9373][state] Always call RocksIterator.status() to check the 
internal error of RocksDB

## What is the purpose of the change

Currently, when using RocksIterator we only use the `iterator.isValid()` to 
check whether we have reached the end of the iterator. But that is not enough, 
if we refer to RocksDB's wiki 
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find 
that even if `iterator.isValid()=true`, there may also exist some internal 
errors. A safer way to use the `RocksIterator` is to always call the 
`iterator.status()` to check the internal error of RocksDB. There is one case 
from user email seems to lost data because of this 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html
 (I'm not so sure yet)

## Brief change log

  - *Always call RocksIterator.status() to check the internal error of 
RocksDB*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9373

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6020.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6020


commit b1d531949d521bb7d72cd389dfe72a4a5cf1bfc9
Author: sihuazhou 
Date:   2018-05-16T07:47:05Z

Always call RocksIterator.status() to check the internal error of RocksDB




> Always call RocksIterator.status() to check the internal error of RocksDB
> -
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5327: [FLINK-8428] [table] Implement stream-stream non-w...

2018-05-16 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5327#discussion_r188531694
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 ---
@@ -60,6 +60,9 @@ class DataStreamJoin(
 
   override def needsUpdatesAsRetraction: Boolean = true
 
+  // outer join will generate retractions
+  override def producesRetractions: Boolean = joinType != JoinRelType.INNER
--- End diff --

Thanks, now I understand the terminology between producing and just 
forwarding retractions.


---


[jira] [Commented] (FLINK-9283) Update cluster execution docs

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477017#comment-16477017
 ] 

ASF GitHub Bot commented on FLINK-9283:
---

GitHub user yuqi1129 opened a pull request:

https://github.com/apache/flink/pull/6022

[FLINK-9283] Update cluster execution docs

## What is the purpose of the change

This pull request is to fix port error in example about using 
`RemoteStreamEnvironment`


## Brief change log

  - Change port from 6123 to 8081 in example

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yuqi1129/flink FLINK-9283

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6022


commit 7adf511f21e14797610fccd8b6e4dd9ba72410e4
Author: hzyuqi1 
Date:   2018-05-16T08:06:33Z

[FLINK-9283] Update cluster execution docs

This closes #9283.




> Update cluster execution docs
> -
>
> Key: FLINK-9283
> URL: https://issues.apache.org/jira/browse/FLINK-9283
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: yuqi
>Priority: Major
> Fix For: 1.5.0
>
>
> The [Cluster 
> Execution|https://ci.apache.org/projects/flink/flink-docs-master/dev/cluster_execution.html#cluster-execution]
>  page must be updated for 1.5.
> The [RemoteEnvironment 
> example|https://ci.apache.org/projects/flink/flink-docs-master/dev/cluster_execution.html#example]
>  should use the port {{8081}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6020: [FLINK-9373][state] Fix potential data losing for RocksDB...

2018-05-16 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6020
  
cc @StefanRRichter 


---


[jira] [Commented] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477035#comment-16477035
 ] 

ASF GitHub Bot commented on FLINK-9373:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/6020
  
cc @StefanRRichter 


> Always call RocksIterator.status() to check the internal error of RocksDB
> -
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9341) Oracle: "Type is not supported: Date"

2018-05-16 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477066#comment-16477066
 ] 

Fabian Hueske commented on FLINK-9341:
--

Thanks for investigating the issue [~kgeis] and [~Sergey Nuyanzin].
I completely agree that we should improve the error message.
Would one of you mind to create a corresponding JIRA issue and maybe also be 
interested in contributing the improvement?

Thanks, Fabian

> Oracle: "Type is not supported: Date"
> -
>
> Key: FLINK-9341
> URL: https://issues.apache.org/jira/browse/FLINK-9341
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.4.2
>Reporter: Ken Geis
>Priority: Major
>
> When creating a Table from an Oracle JDBCInputFormat with a date column, I 
> get the error "Type is not supported: Date". This happens with as simple a 
> query as
> {code:java}
> SELECT SYSDATE FROM DUAL{code}
>  Stack trace:
> {noformat}
> Caused by: org.apache.flink.table.api.TableException: Type is not supported: 
> Date
>     at 
> org.apache.flink.table.api.TableException$.apply(exceptions.scala:53) 
> ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:336)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:68)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:198)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory$$anonfun$buildLogicalRowType$1.apply(FlinkTypeFactory.scala:195)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  ~[scala-library-2.11.11.jar:na]
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
> ~[scala-library-2.11.11.jar:na]
>     at 
> org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:195)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.plan.schema.InlineTable.getRowType(InlineTable.scala:105)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.TableEnvironment.scanInternal(TableEnvironment.scala:499)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.TableEnvironment.scan(TableEnvironment.scala:485) 
> ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.java.BatchTableEnvironment.fromDataSet(BatchTableEnvironment.scala:61)
>  ~[flink-table_2.11-1.4.2.jar:1.4.2]
>     at 
> org.apache.flink.table.api.java.BatchTableEnvironment$fromDataSet$0.call(Unknown
>  Source) ~[na:na]
> (at my code...)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188546038
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -470,6 +488,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
+   throw new IllegalArgumentException("Specifying 
a keyspace is only allowed when using a Pojo-Stream as input.");
--- End diff --

same as above


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545921
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -381,6 +393,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
--- End diff --

we only have to check for non-null.


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545761
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -381,6 +393,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
+   throw new IllegalArgumentException("Specifying 
a keyspace is only allowed when using a Pojo-Stream as input.");
--- End diff --

"default keyspace"


---


[jira] [Updated] (FLINK-9373) Fix potential data losing for RocksDBBackend

2018-05-16 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-9373:
--
Fix Version/s: 1.5.0

> Fix potential data losing for RocksDBBackend
> 
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that even if _iterator.isValid()=true_, there may also exist some 
> internal error. A safer way to use the _RocksIterator_ is to always call the 
> _iterator.status()_ to check the internal error of _RocksDB_. There is a case 
> from user email seems to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6008: [FLINK-9354][travis] Print execution times for nightly E2...

2018-05-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6008
  
nice catch @tzulitai, will rework this into a run_test method that takes a 
description and command to execute.


---


[jira] [Created] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-05-16 Thread Franz Thoma (JIRA)
Franz Thoma created FLINK-9374:
--

 Summary: Flink Kinesis Producer does not backpressure
 Key: FLINK-9374
 URL: https://issues.apache.org/jira/browse/FLINK-9374
 Project: Flink
  Issue Type: Bug
Reporter: Franz Thoma
 Attachments: after.png, before.png

The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
{{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
internally holds an unbounded queue of records that have not yet been sent.

Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow 
indefinitely if Flink sends records faster than the KPL can forward them to 
Kinesis.

One way to circumvent this problem is to set a record TTL, so that queued 
records are dropped after a certain amount of time, but this will lead to data 
loss under high loads.

Currently the only time the queue is flushed is during checkpointing: 
{{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
checkpoint is reached (and will wait until the queue is flushed), or until 
out-of-memory, whichever is reached first. (This gets worse due to the fact 
that the Java KPL is only a thin wrapper around a C++ process, so it is not 
even the Java process that runs out of memory, but the C++ process.) The 
implicit rate-limit due to checkpointing leads to a ragged throughput graph 
like this (the periods with zero throughput are the wait times before a 
checkpoint):

!file:///home/fthoma/projects/flink/before.png!!before.png! Throughput limited 
by checkpointing only

My proposed solution is to add a config option {{queueLimit}} to set a maximum 
number of records that may be waiting in the KPL queue. If this limit is 
reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and wait 
(blocking) until the queue length is below the limit again. This automatically 
leads to backpressuring, since the {{FlinkKinesisProducer}} cannot accept 
records while waiting. For compatibility, {{queueLimit}} is set to 
{{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a client 
explicitly sets the value. Setting a »sane« default value is not possible 
unfortunately, since sensible values for the limit depend on the record size 
(the limit should be chosen so that about 10–100MB of records per shard are 
accumulated before flushing, otherwise the maximum Kinesis throughput may not 
be reached).

!after.png! Throughput with a queue limit of 10 records (the spikes are 
checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9354) print execution times for end-to-end tests

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477003#comment-16477003
 ] 

ASF GitHub Bot commented on FLINK-9354:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6008
  
nice catch @tzulitai, will rework this into a run_test method that takes a 
description and command to execute.


> print execution times for end-to-end tests
> --
>
> Key: FLINK-9354
> URL: https://issues.apache.org/jira/browse/FLINK-9354
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests, Travis
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> We need to modify the end-to-end scripts to include the time it takes for a 
> test to run.
> We currently don't have any clue how long a test actually runs for.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6009: [FLINK-9357][tests][yarn] Add margins to exception excerp...

2018-05-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6009
  
merging.


---


[jira] [Commented] (FLINK-9357) Add margins to yarn exception excerpts

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477022#comment-16477022
 ] 

ASF GitHub Bot commented on FLINK-9357:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6009
  
merging.


> Add margins to yarn exception excerpts
> --
>
> Key: FLINK-9357
> URL: https://issues.apache.org/jira/browse/FLINK-9357
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests, YARN
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> The yarn tests check the log files for exceptions to detect test failures. If 
> detected a test will fail and an excerpt from the logs will be printed.
> The excerpt content is currently the stack of the detected exception. This 
> only works correctly if the stacktrace follows a specific formatting style; 
> for example if an exception message contains an empty line the output will be 
> cut off.
> I propose including the 10 before/after the found exception to make this a 
> bit more reliable. As a side-effect we also get a little contextual 
> information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9333) QuickStart Docs Spelling fix and some info regarding IntelliJ JVM Options

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477043#comment-16477043
 ] 

ASF GitHub Bot commented on FLINK-9333:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5989
  
merging.


> QuickStart Docs Spelling fix and some info regarding IntelliJ JVM Options
> -
>
> Key: FLINK-9333
> URL: https://issues.apache.org/jira/browse/FLINK-9333
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.1, 1.4.2
>Reporter: Yazdan Shirvany
>Assignee: Yazdan Shirvany
>Priority: Trivial
>  Labels: document, spelling
>
> - Spelling fix for QuickStart Project Template for Java 
> - Adding more details regarding changing JVM options in IntelliJ IDEA



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5989: [FLINK-9333] [Docs] QuickStart Docs Spelling fix and some...

2018-05-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5989
  
merging.


---


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477074#comment-16477074
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545761
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -381,6 +393,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
+   throw new IllegalArgumentException("Specifying 
a keyspace is only allowed when using a Pojo-Stream as input.");
--- End diff --

"default keyspace"


> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5946: [FLINK-9285][REST][docs] Update REST API docs

2018-05-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5946
  
merging.


---


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188546432
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -429,6 +429,25 @@ public void testCassandraPojoAtLeastOnceSink() throws 
Exception {
Assert.assertEquals(20, rs.all().size());
}
 
+   @Test
+   public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() 
throws Exception {
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"test2"));
--- End diff --

let's use a more descriptive table name, to avoid conflicts in the future.


---


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477070#comment-16477070
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188546432
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -429,6 +429,25 @@ public void testCassandraPojoAtLeastOnceSink() throws 
Exception {
Assert.assertEquals(20, rs.all().size());
}
 
+   @Test
+   public void testCassandraPojoNoAnnotatedKeyspaceAtLeastOnceSink() 
throws Exception {
+   session.execute(CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
"test2"));
--- End diff --

let's use a more descriptive table name, to avoid conflicts in the future.


> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545817
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -410,6 +425,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
+   throw new IllegalArgumentException("Specifying 
a keyspace is only allowed when using a Pojo-Stream as input.");
--- End diff --

same as above


---


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477069#comment-16477069
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545721
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -258,6 +259,17 @@ public CassandraSinkBuilder(DataStream input, 
TypeInformation typeInfo,
return this;
}
 
+   /**
+* Sets the keyspace to be used.
+*
+* @param keyspace keyspace to use
+* @return this builder
+*/
+   public CassandraSinkBuilder setKeyspace(String keyspace) {
--- End diff --

rename to `setDefaultKeyspace`


> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5964: [FLINK-8655] [Cassandra Connector] add keyspace in...

2018-05-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545721
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -258,6 +259,17 @@ public CassandraSinkBuilder(DataStream input, 
TypeInformation typeInfo,
return this;
}
 
+   /**
+* Sets the keyspace to be used.
+*
+* @param keyspace keyspace to use
+* @return this builder
+*/
+   public CassandraSinkBuilder setKeyspace(String keyspace) {
--- End diff --

rename to `setDefaultKeyspace`


---


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477072#comment-16477072
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545817
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -410,6 +425,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
+   throw new IllegalArgumentException("Specifying 
a keyspace is only allowed when using a Pojo-Stream as input.");
--- End diff --

same as above


> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477073#comment-16477073
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188546038
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -470,6 +488,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
+   throw new IllegalArgumentException("Specifying 
a keyspace is only allowed when using a Pojo-Stream as input.");
--- End diff --

same as above


> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8655) Add a default keyspace to CassandraSink

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477071#comment-16477071
 ] 

ASF GitHub Bot commented on FLINK-8655:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5964#discussion_r188545921
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSink.java
 ---
@@ -381,6 +393,9 @@ protected void sanityCheck() {
if (query == null || query.length() == 0) {
throw new IllegalArgumentException("Query must 
not be null or empty.");
}
+   if (keyspace != null && keyspace.length() != 0) {
--- End diff --

we only have to check for non-null.


> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>  Components: Cassandra Connector
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.6.0
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6008: [FLINK-9354][travis] Print execution times for nig...

2018-05-16 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6008#discussion_r188548653
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -33,8 +33,8 @@ else
   NUM_SLOTS=$NEW_DOP
 fi
 
-STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file}
-STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true}
+STATE_BACKEND_TYPE=${3:-file}
+STATE_BACKEND_FILE_ASYNC=${4:-true}
--- End diff --

We should also update the usage message at the beginning of this file to 
reflect these extra parameters.


---


[jira] [Commented] (FLINK-9354) print execution times for end-to-end tests

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477086#comment-16477086
 ] 

ASF GitHub Bot commented on FLINK-9354:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6008#discussion_r188548653
  
--- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh ---
@@ -33,8 +33,8 @@ else
   NUM_SLOTS=$NEW_DOP
 fi
 
-STATE_BACKEND_TYPE=${STATE_BACKEND_TYPE:-file}
-STATE_BACKEND_FILE_ASYNC=${STATE_BACKEND_FILE_ASYNC:-true}
+STATE_BACKEND_TYPE=${3:-file}
+STATE_BACKEND_FILE_ASYNC=${4:-true}
--- End diff --

We should also update the usage message at the beginning of this file to 
reflect these extra parameters.


> print execution times for end-to-end tests
> --
>
> Key: FLINK-9354
> URL: https://issues.apache.org/jira/browse/FLINK-9354
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests, Travis
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> We need to modify the end-to-end scripts to include the time it takes for a 
> test to run.
> We currently don't have any clue how long a test actually runs for.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9373) Always call RocksIterator.status() to check the internal error of RocksDB

2018-05-16 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou closed FLINK-9373.
-
Resolution: Invalid

> Always call RocksIterator.status() to check the internal error of RocksDB
> -
>
> Key: FLINK-9373
> URL: https://issues.apache.org/jira/browse/FLINK-9373
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
>
> Currently, when using RocksIterator we only use the _iterator.isValid()_ to 
> check whether we have reached the end of the iterator. But that is not 
> enough, if we refer to RocksDB's wiki 
> https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should 
> find that _iterator.isValid()_ may also cause by a internal error. A safer 
> way to use the _RocksIterator_ is to always call the _iterator.status()_ to 
> check the internal error of _RocksDB_. There is a case from user email seems 
> to lost data because of this 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #6020: [FLINK-9373][state] Always call RocksIterator.stat...

2018-05-16 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/6020

[FLINK-9373][state] Always call RocksIterator.status() to check the 
internal error of RocksDB

## What is the purpose of the change

Currently, when using RocksIterator we only use the `iterator.isValid()` to 
check whether we have reached the end of the iterator. But that is not enough, 
if we refer to RocksDB's wiki 
https://github.com/facebook/rocksdb/wiki/Iterator#error-handling we should find 
that even if `iterator.isValid()=true`, there may also exist some internal 
errors. A safer way to use the `RocksIterator` is to always call the 
`iterator.status()` to check the internal error of RocksDB. There is one case 
from user email seems to lost data because of this 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Missing-MapState-when-Timer-fires-after-restored-state-td20134.html
 (I'm not so sure yet)

## Brief change log

  - *Always call RocksIterator.status() to check the internal error of 
RocksDB*


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink FLINK-9373

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6020.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6020


commit b1d531949d521bb7d72cd389dfe72a4a5cf1bfc9
Author: sihuazhou 
Date:   2018-05-16T07:47:05Z

Always call RocksIterator.status() to check the internal error of RocksDB




---


[jira] [Commented] (FLINK-9359) Update quickstart docs to only mention Java 8

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477037#comment-16477037
 ] 

ASF GitHub Bot commented on FLINK-9359:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6010
  
merging.


> Update quickstart docs to only mention Java 8
> -
>
> Key: FLINK-9359
> URL: https://issues.apache.org/jira/browse/FLINK-9359
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.0, 1.4.2, 1.6.0
>Reporter: David Anderson
>Assignee: David Anderson
>Priority: Major
> Fix For: 1.5.0, 1.4.2, 1.6.0
>
>
> Java 7 support was dropped from Flink 1.4, and Java 9 and 10 aren't yet 
> supported, but the quickstart docs still say "the only requirement is to have 
> a working *Java 7.x* (or higher) installation". 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9176) Category annotations are unused

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9176?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477038#comment-16477038
 ] 

ASF GitHub Bot commented on FLINK-9176:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6005
  
merging.


> Category annotations are unused
> ---
>
> Key: FLINK-9176
> URL: https://issues.apache.org/jira/browse/FLINK-9176
> Project: Flink
>  Issue Type: Bug
>  Components: Build System, Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: mingleizhang
>Priority: Critical
> Fix For: 1.5.0
>
>
> The {{LegacyAndNew}} and {{New}} annotations, that were previously used to 
> disable tests based on whether the {{legacyCode}} profile is active, are 
> effectively unused.
> While several tests are annotated with them they are never used in the actual 
> {{surefire}} configuration.
> We should either re-introduce them into the {{surefire}} configuration, or 
> remove them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9285) Update REST API page

2018-05-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477075#comment-16477075
 ] 

ASF GitHub Bot commented on FLINK-9285:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5946
  
merging.


> Update REST API page
> 
>
> Key: FLINK-9285
> URL: https://issues.apache.org/jira/browse/FLINK-9285
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, REST
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Critical
> Fix For: 1.5.0
>
>
> The [REST 
> API|https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html]
>  must be updated for 1.5.
> The [Available 
> requests|https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#available-requests]
>  section still predominantly lists legacy calls. These should be either 
> removed or moved to the bottom, and explicitly marked as legacy.
> The [developing 
> section|https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#developing]
>  must be updated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #6001: [FLINK-9299] ProcessWindowFunction documentation Java exa...

2018-05-16 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/6001
  
@StephanEwen fixed based on your suggestion


---


[GitHub] flink issue #5961: [FLINK-8255][DataSet API, DataStream API] key expressions...

2018-05-16 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5961
  
Thanks for the update @snuyanzin. I'll try to have a look at the changes in 
the next days.
Best, Fabian


---


[jira] [Commented] (FLINK-766) Web interface progress monitoring for DataSources and DataSinks with splits

2018-05-16 Thread Flavio Pompermaier (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16477115#comment-16477115
 ] 

Flavio Pompermaier commented on FLINK-766:
--

+1 for this feature!

> Web interface progress monitoring for DataSources and DataSinks with splits
> ---
>
> Key: FLINK-766
> URL: https://issues.apache.org/jira/browse/FLINK-766
> Project: Flink
>  Issue Type: Sub-task
>  Components: Metrics
>Reporter: GitHub Import
>Priority: Minor
>  Labels: github-import
> Fix For: pre-apache
>
>
> The progress monitoring for DataSources and DataSinks can be improved by 
> including the number of processed vs total splits into the progress.
>  Imported from GitHub 
> Url: https://github.com/stratosphere/stratosphere/issues/766
> Created by: [rmetzger|https://github.com/rmetzger]
> Labels: enhancement, gui, simple-issue, 
> Milestone: Release 0.6 (unplanned)
> Created at: Wed May 07 12:05:54 CEST 2014
> State: open



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-6944) Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for serializer compatibility checks

2018-05-16 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-6944:
---
Description: 
FLINK-9377 proposes to remove writing serializers as part of checkpoint meta 
info, and only write its configuration snapshot.

Since then serializer config snapshots will be the single source of truth for 
previous serializer schema, this JIRA proposes to follow up the change in 
FLINK-9377 by having a base default implementation for letting 
{{TypeSerializer.snapshotConfiguration}} that returns a 
{{DefaultTypeSerializerConfigSnapshot}}.

The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serialVersionUID of 
the serializer class, and the serializer class' classname. The latter two will 
be used to check compatibility in the default implementation of 
{{TypeSerializer.ensureCompatibility}}. Specifically, if classname / 
serialVersionUID has changed, the default implementation of 
{{TypeSerializer.ensureCompatibility}} should return 
{{CompatibilityResult.requiresMigration}}.

  was:
Currently, we store both the {{TypeSerializer}} and its corresponding 
{{TypeSerializerConfigSnapshot}} in checkpoints of managed state. This, in most 
cases, are actually duplicate information.

This JIRA proposes to change this by only storing the 
{{TypeSerializerConfigSnapshot}}, while at the same time, letting 
{{TypeSerializer.snapshotConfiguration}} return a default 
{{DefaultTypeSerializerConfigSnapshot}}.
This default simply serializes the serializer instance using Java serialization.

The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serializer bytes, 
the serialVersionUID of the serializer class, and the serializer class' 
classname. The latter two will be used to check compatibility in the default 
implementation of {{TypeSerializer.ensureCompatibility}}. Specifically, if 
classname / serialVersionUID has changed, the default implementation of 
{{TypeSerializer.ensureCompatibility}} will simply return 
{{CompatibilityResult.requiresMigration}} with the deserialized serializer as 
the convert deserializer.


> Introduce DefaultTypeSerializerConfigSnapshot as a base implementation for 
> serializer compatibility checks
> --
>
> Key: FLINK-6944
> URL: https://issues.apache.org/jira/browse/FLINK-6944
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing, Type Serialization System
>Affects Versions: 1.3.0, 1.3.1
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Blocker
> Fix For: 1.6.0
>
>
> FLINK-9377 proposes to remove writing serializers as part of checkpoint meta 
> info, and only write its configuration snapshot.
> Since then serializer config snapshots will be the single source of truth for 
> previous serializer schema, this JIRA proposes to follow up the change in 
> FLINK-9377 by having a base default implementation for letting 
> {{TypeSerializer.snapshotConfiguration}} that returns a 
> {{DefaultTypeSerializerConfigSnapshot}}.
> The {{DefaultTypeSerializerConfigSnapshot}} should wrap the serialVersionUID 
> of the serializer class, and the serializer class' classname. The latter two 
> will be used to check compatibility in the default implementation of 
> {{TypeSerializer.ensureCompatibility}}. Specifically, if classname / 
> serialVersionUID has changed, the default implementation of 
> {{TypeSerializer.ensureCompatibility}} should return 
> {{CompatibilityResult.requiresMigration}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >