[jira] [Updated] (FLINK-7833) Flink's Literal(value, SqlTimeTypeInfo) fails to convert Calctie's RexNode

2017-10-12 Thread John Fang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Fang updated FLINK-7833:
-
Description: 
{code:java}
Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be 
cast to java.util.Date
  at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:116)
  at org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:84)
  at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
  at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:296)
  at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:97)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:97)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:97)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:60)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
{code}

Using the {{RexProgramExtractor.visitLiteral}} will invoke Calcite's 
{{RexLiteral.getValue}} to generator {{Literal(value, type)}}. If the type is 
TIME|DATE|TIMESTAMP, then Calcite's  return type of {{RexLiteral.getValue}} is 
{{Calendar}}. After that, the {{Literal(value, type)}} maybe convert  
{{RexNode}}  by invoking {{Literal.toRexNode}}. Then it will invoked the 
function {{dateToCalendar}}. Thus it will cause the exception.

  was:

{code:java}
Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be 
cast to java.util.Date
  at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:116)
  at org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:84)
  at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
  at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:296)
  at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:97)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:97)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:97)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:60)
  at 

[jira] [Updated] (FLINK-7833) Flink's Literal(value, SqlTimeTypeInfo) fails to convert Calctie's RexNode

2017-10-12 Thread John Fang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7833?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Fang updated FLINK-7833:
-
Description: 

{code:java}
Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be 
cast to java.util.Date
  at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:116)
  at org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:84)
  at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
  at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:296)
  at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:97)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:97)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:97)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:60)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
{code}

Using the 'RexProgramExtractor.visitLiteral' will invoke Calcite's 
{{RexLiteral.getValue}} to generator `Literal(value, type)`. If the type is 
TIME|DATE|TIMESTAMP, then Calcite's  return type of `RexLiteral.getValue'` is 
`Calendar`. After that, the `Literal(value, type)` maybe convert `RexNode` by 
invoking `Literal.toRexNode`. Then it will invoked the function 
`dateToCalendar`. Thus it will cause the exception.

  was:
```
Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be 
cast to java.util.Date
  at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:116)
  at org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:84)
  at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
  at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:296)
  at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:97)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:97)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:97)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:60)
  at 

[jira] [Commented] (FLINK-7502) PrometheusReporter improvements

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203067#comment-16203067
 ] 

ASF GitHub Bot commented on FLINK-7502:
---

Github user mbode commented on the issue:

https://github.com/apache/flink/pull/4586
  
@zentol *ping*


> PrometheusReporter improvements
> ---
>
> Key: FLINK-7502
> URL: https://issues.apache.org/jira/browse/FLINK-7502
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Maximilian Bode
>Assignee: Maximilian Bode
>Priority: Minor
>
> * do not throw exceptions on metrics being registered for second time
> * allow port ranges for setups where multiple reporters are on same host 
> (e.g. one TaskManager and one JobManager)
> * do not use nanohttpd anymore, there is now a minimal http server included 
> in [Prometheus JVM client|https://github.com/prometheus/client_java]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7833) Flink's Literal(value, SqlTimeTypeInfo) fails to convert Calctie's RexNode

2017-10-12 Thread John Fang (JIRA)
John Fang created FLINK-7833:


 Summary: Flink's Literal(value, SqlTimeTypeInfo) fails to convert 
Calctie's RexNode
 Key: FLINK-7833
 URL: https://issues.apache.org/jira/browse/FLINK-7833
 Project: Flink
  Issue Type: Bug
  Components: Table API & SQL
Reporter: John Fang
 Fix For: 1.4.0


```
Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be 
cast to java.util.Date
  at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:116)
  at org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:84)
  at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
  at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:296)
  at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:97)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:97)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:97)
  at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:60)
  at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
```
Using the `RexProgramExtractor.visitLiteral` will invoke Calcite's 
`RexLiteral.getValue` to generator `Literal(value, type)`. If the type is 
TIME|DATE|TIMESTAMP, then Calcite's  return type of `RexLiteral.getValue'` is 
`Calendar`. After that, the `Literal(value, type)` maybe convert `RexNode` by 
invoking `Literal.toRexNode`. Then it will invoked the function 
`dateToCalendar`. Thus it will cause the exception.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4586: [FLINK-7502] [metrics] Improve PrometheusReporter

2017-10-12 Thread mbode
Github user mbode commented on the issue:

https://github.com/apache/flink/pull/4586
  
@zentol *ping*


---


[jira] [Commented] (FLINK-7764) FlinkKafkaProducer010 does not accept name, uid, or parallelism

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203018#comment-16203018
 ] 

ASF GitHub Bot commented on FLINK-7764:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4790
  
Hi @aljoscha, I've created another PR #4819 for the `release-1.3` branch. 


> FlinkKafkaProducer010 does not accept name, uid, or parallelism
> ---
>
> Key: FLINK-7764
> URL: https://issues.apache.org/jira/browse/FLINK-7764
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> As [reported on the user 
> list|https://lists.apache.org/thread.html/1e97b79cb5611a942beb609a61b5fba6d62a49c9c86336718a9a0004@%3Cuser.flink.apache.org%3E]:
> When I try to use KafkaProducer with timestamps it fails to set name, uid or 
> parallelism. It uses default values.
> {code}
> FlinkKafkaProducer010.FlinkKafkaProducer010Configuration producer = 
> FlinkKafkaProducer010
> .writeToKafkaWithTimestamps(stream, topicName, schema, props, 
> partitioner);
> producer.setFlushOnCheckpoint(flushOnCheckpoint);
> producer.name("foo")
> .uid("bar")
> .setParallelism(5);
> return producer;
> {code}
> As operator name it shows "FlinKafkaProducer 0.10.x” with the typo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4790: [FLINK-7764] [kafka] Enable the operator settings for Fli...

2017-10-12 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/4790
  
Hi @aljoscha, I've created another PR #4819 for the `release-1.3` branch. 


---


[jira] [Commented] (FLINK-7764) FlinkKafkaProducer010 does not accept name, uid, or parallelism

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16203017#comment-16203017
 ] 

ASF GitHub Bot commented on FLINK-7764:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/4819

[FLINK-7764] [kafka] Fix the operator setting problem of 
FlinkKafkaProducer010(1.3)

## What is the purpose of the change

This PR fixes the operator setting (name, uid, parallelism, etc.) problem 
of FlinkKafkaProducer010 for `release-1.3` branch.


## Brief change log

  - Override all the DataStreamSink public methods in 
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration
  - Fix the typo names "FlinKafkaProducer" ==> "FlinkKafkaProducer".


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-7764-1.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4819.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4819


commit d2fd6cc1c7b074f7810c7034a4a87e8d8a1fe074
Author: Xingcan Cui 
Date:   2017-10-11T15:42:29Z

[FLINK-7764] [kafka] Enable the operator settings for FlinkKafkaProducer010

[hotfix] [kafka] Fix the config parameter names in KafkaTestBase.




> FlinkKafkaProducer010 does not accept name, uid, or parallelism
> ---
>
> Key: FLINK-7764
> URL: https://issues.apache.org/jira/browse/FLINK-7764
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Fabian Hueske
>Assignee: Xingcan Cui
>
> As [reported on the user 
> list|https://lists.apache.org/thread.html/1e97b79cb5611a942beb609a61b5fba6d62a49c9c86336718a9a0004@%3Cuser.flink.apache.org%3E]:
> When I try to use KafkaProducer with timestamps it fails to set name, uid or 
> parallelism. It uses default values.
> {code}
> FlinkKafkaProducer010.FlinkKafkaProducer010Configuration producer = 
> FlinkKafkaProducer010
> .writeToKafkaWithTimestamps(stream, topicName, schema, props, 
> partitioner);
> producer.setFlushOnCheckpoint(flushOnCheckpoint);
> producer.name("foo")
> .uid("bar")
> .setParallelism(5);
> return producer;
> {code}
> As operator name it shows "FlinKafkaProducer 0.10.x” with the typo.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4819: [FLINK-7764] [kafka] Fix the operator setting prob...

2017-10-12 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/4819

[FLINK-7764] [kafka] Fix the operator setting problem of 
FlinkKafkaProducer010(1.3)

## What is the purpose of the change

This PR fixes the operator setting (name, uid, parallelism, etc.) problem 
of FlinkKafkaProducer010 for `release-1.3` branch.


## Brief change log

  - Override all the DataStreamSink public methods in 
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration
  - Fix the typo names "FlinKafkaProducer" ==> "FlinkKafkaProducer".


## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-7764-1.3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4819.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4819


commit d2fd6cc1c7b074f7810c7034a4a87e8d8a1fe074
Author: Xingcan Cui 
Date:   2017-10-11T15:42:29Z

[FLINK-7764] [kafka] Enable the operator settings for FlinkKafkaProducer010

[hotfix] [kafka] Fix the config parameter names in KafkaTestBase.




---


[jira] [Updated] (FLINK-7827) creating and starting a NetCat in test cases programmatically

2017-10-12 Thread bluejoe (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7827?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

bluejoe updated FLINK-7827:
---
Description: 
hi, all
I have written a MockNetCat class, which help developers start a netcat 
programmatically, instead of manually launching
{{nc -lk }}
command for test
developers can start a mock NetCat server in test cases and send data to the 
server (simulating user input text in the nc shell)

is this feature ok to create a PR to flink?

use of MockNetCat is very simple, like:
{{var nc: MockNetCat = MockNetCat.start();}}
this starts a NetCat server, and data can be generated using following code:
{{nc.writeData("hello\r\nworld\r\nbye\r\nworld\r\n");}}

  was:
hi, all
I have written a MockNetCat class, which help developers start a netcat 
programmatically, instead of manually launching
{{nc -lk }}
command for test
developers can start a mock NetCat server in test cases and send data to the 
server (simulating user input text in the nc shell)

use of MockNetCat is very simple, like:
{{var nc: MockNetCat = MockNetCat.start();}}
this starts a NetCat server, and data can be generated using following code:
{{nc.writeData("hello\r\nworld\r\nbye\r\nworld\r\n");}}


> creating and starting a NetCat in test cases programmatically
> -
>
> Key: FLINK-7827
> URL: https://issues.apache.org/jira/browse/FLINK-7827
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: bluejoe
>  Labels: httpstreaming, netcat, streaming
>
> hi, all
> I have written a MockNetCat class, which help developers start a netcat 
> programmatically, instead of manually launching
> {{nc -lk }}
> command for test
> developers can start a mock NetCat server in test cases and send data to the 
> server (simulating user input text in the nc shell)
> is this feature ok to create a PR to flink?
> use of MockNetCat is very simple, like:
> {{var nc: MockNetCat = MockNetCat.start();}}
> this starts a NetCat server, and data can be generated using following code:
> {{nc.writeData("hello\r\nworld\r\nbye\r\nworld\r\n");}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7820) remove text related to FoldingState and FoldingStateDescriptor from doc

2017-10-12 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202918#comment-16202918
 ] 

Bowen Li commented on FLINK-7820:
-

[~aljoscha] I was thinking that keeping examples of deprecated APIs in our docs 
would still misleadingly encourage users to use them. What're the principles to 
handle deprecated APIs and their docs?

> remove text related to FoldingState and FoldingStateDescriptor from doc
> ---
>
> Key: FLINK-7820
> URL: https://issues.apache.org/jira/browse/FLINK-7820
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Minor
> Fix For: 1.4.0
>
>
> {{FoldState}} and {{FoldStateDescriptor}} have been deprecated. We should 
> remove docs related to the two classes



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7660) Support sideOutput in ProcessAllWindowFunction

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202829#comment-16202829
 ] 

ASF GitHub Bot commented on FLINK-7660:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4740
  
@aljoscha  Thank you!


> Support sideOutput in ProcessAllWindowFunction
> --
>
> Key: FLINK-7660
> URL: https://issues.apache.org/jira/browse/FLINK-7660
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> Add side output support to ProcessAllWindow functions.
> This is a sibling ticket for FLINK-7635



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7660) Support sideOutput in ProcessAllWindowFunction

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202830#comment-16202830
 ] 

ASF GitHub Bot commented on FLINK-7660:
---

Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4740


> Support sideOutput in ProcessAllWindowFunction
> --
>
> Key: FLINK-7660
> URL: https://issues.apache.org/jira/browse/FLINK-7660
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API, Scala API
>Reporter: Bowen Li
>Assignee: Bowen Li
> Fix For: 1.4.0
>
>
> Add side output support to ProcessAllWindow functions.
> This is a sibling ticket for FLINK-7635



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4740: [FLINK-7660][DataStream API/Scala API] Support sideOutput...

2017-10-12 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4740
  
@aljoscha  Thank you!


---


[GitHub] flink pull request #4740: [FLINK-7660][DataStream API/Scala API] Support sid...

2017-10-12 Thread bowenli86
Github user bowenli86 closed the pull request at:

https://github.com/apache/flink/pull/4740


---


[jira] [Comment Edited] (FLINK-7771) Make the operator state queryable

2017-10-12 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202692#comment-16202692
 ] 

Stavros Kontopoulos edited comment on FLINK-7771 at 10/12/17 10:44 PM:
---

[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (the 
client could push changes to a distributed log for recovery) in order to 
correlate state with (operator_id, task_id) and time. This way any query about 
state could always point to the correct task. Is this feasible or adds too much 
overhead?


was (Author: skonto):
We could overcome some problems by allowing Flink to inform an external system 
about state changes. If re-assignment is done the client who issues the queries 
should know. It could subscribe to that event channel (the client could push 
changes to a distributed log for recovery) in order to correlate state with 
(operator_id, task_id) and time. This way any query about state could always 
point to the correct task. Is this feasible or adds too much overhead?

> Make the operator state queryable
> -
>
> Key: FLINK-7771
> URL: https://issues.apache.org/jira/browse/FLINK-7771
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> There seem to be some requests for making the operator (non-keyed) state 
> queryable. This means that the user will specify the *uuid* of the operator 
> and the *taskId*, and he will be able to access the state that corresponds to 
> that operator and for that specific task.
> This issue will serve to document the discussion on the topic, so that 
> everybody can participate.
> I also link [~till.rohrmann] and [~skonto] as he also mentioned that this 
> feature could be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7771) Make the operator state queryable

2017-10-12 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202692#comment-16202692
 ] 

Stavros Kontopoulos edited comment on FLINK-7771 at 10/12/17 10:44 PM:
---

We could overcome some problems by allowing Flink to inform an external system 
about state changes. If re-assignment is done the client who issues the queries 
should know. It could subscribe to that event channel (the client could push 
changes to a distributed log for recovery) in order to correlate state with 
(operator_id, task_id) and time. This way any query about state could always 
point to the correct task. Is this feasible or adds too much overhead?


was (Author: skonto):
We could overcome some problems by allowing Flink to inform an external system 
about state changes. If re-assignment is done the client who issues the queries 
should know. It could subscribe to that event channel (or persisted log) in 
order to bind together state with (operator_id, task_id) and time. This way any 
query about state could always point to the correct task. Is this feasible or 
adds too much overhead?

> Make the operator state queryable
> -
>
> Key: FLINK-7771
> URL: https://issues.apache.org/jira/browse/FLINK-7771
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> There seem to be some requests for making the operator (non-keyed) state 
> queryable. This means that the user will specify the *uuid* of the operator 
> and the *taskId*, and he will be able to access the state that corresponds to 
> that operator and for that specific task.
> This issue will serve to document the discussion on the topic, so that 
> everybody can participate.
> I also link [~till.rohrmann] and [~skonto] as he also mentioned that this 
> feature could be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7771) Make the operator state queryable

2017-10-12 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202692#comment-16202692
 ] 

Stavros Kontopoulos edited comment on FLINK-7771 at 10/12/17 10:52 PM:
---

[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (plus 
we should checkpoint state changes for recovery and when the client wants to 
reply the event sequence or we could write to a distributed log directly) in 
order to correlate state with (operator_id, task_id) and time. This way any 
query about state could always point to the correct task. Is this feasible or 
adds too much overhead?


was (Author: skonto):
[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (plus 
we should checkpoint state changes for recovery and when client wants to reply 
the event sequence) in order to correlate state with (operator_id, task_id) and 
time. This way any query about state could always point to the correct task. Is 
this feasible or adds too much overhead?

> Make the operator state queryable
> -
>
> Key: FLINK-7771
> URL: https://issues.apache.org/jira/browse/FLINK-7771
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> There seem to be some requests for making the operator (non-keyed) state 
> queryable. This means that the user will specify the *uuid* of the operator 
> and the *taskId*, and he will be able to access the state that corresponds to 
> that operator and for that specific task.
> This issue will serve to document the discussion on the topic, so that 
> everybody can participate.
> I also link [~till.rohrmann] and [~skonto] as he also mentioned that this 
> feature could be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7771) Make the operator state queryable

2017-10-12 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202692#comment-16202692
 ] 

Stavros Kontopoulos edited comment on FLINK-7771 at 10/12/17 10:47 PM:
---

[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (or 
push changes to a distributed log for recovery) in order to correlate state 
with (operator_id, task_id) and time. This way any query about state could 
always point to the correct task. Is this feasible or adds too much overhead?


was (Author: skonto):
[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (the 
client could push changes to a distributed log for recovery) in order to 
correlate state with (operator_id, task_id) and time. This way any query about 
state could always point to the correct task. Is this feasible or adds too much 
overhead?

> Make the operator state queryable
> -
>
> Key: FLINK-7771
> URL: https://issues.apache.org/jira/browse/FLINK-7771
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> There seem to be some requests for making the operator (non-keyed) state 
> queryable. This means that the user will specify the *uuid* of the operator 
> and the *taskId*, and he will be able to access the state that corresponds to 
> that operator and for that specific task.
> This issue will serve to document the discussion on the topic, so that 
> everybody can participate.
> I also link [~till.rohrmann] and [~skonto] as he also mentioned that this 
> feature could be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7771) Make the operator state queryable

2017-10-12 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202692#comment-16202692
 ] 

Stavros Kontopoulos edited comment on FLINK-7771 at 10/12/17 10:48 PM:
---

[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (plus 
checkpoint state changes for recovery and when client want to reply the event 
sequence) in order to correlate state with (operator_id, task_id) and time. 
This way any query about state could always point to the correct task. Is this 
feasible or adds too much overhead?


was (Author: skonto):
[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (plus 
checkpoint state changes for recovery) in order to correlate state with 
(operator_id, task_id) and time. This way any query about state could always 
point to the correct task. Is this feasible or adds too much overhead?

> Make the operator state queryable
> -
>
> Key: FLINK-7771
> URL: https://issues.apache.org/jira/browse/FLINK-7771
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> There seem to be some requests for making the operator (non-keyed) state 
> queryable. This means that the user will specify the *uuid* of the operator 
> and the *taskId*, and he will be able to access the state that corresponds to 
> that operator and for that specific task.
> This issue will serve to document the discussion on the topic, so that 
> everybody can participate.
> I also link [~till.rohrmann] and [~skonto] as he also mentioned that this 
> feature could be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7771) Make the operator state queryable

2017-10-12 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202692#comment-16202692
 ] 

Stavros Kontopoulos edited comment on FLINK-7771 at 10/12/17 10:50 PM:
---

[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (plus 
we should checkpoint state changes for recovery and when client wants to reply 
the event sequence) in order to correlate state with (operator_id, task_id) and 
time. This way any query about state could always point to the correct task. Is 
this feasible or adds too much overhead?


was (Author: skonto):
[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (plus 
checkpoint state changes for recovery and when client want to reply the event 
sequence) in order to correlate state with (operator_id, task_id) and time. 
This way any query about state could always point to the correct task. Is this 
feasible or adds too much overhead?

> Make the operator state queryable
> -
>
> Key: FLINK-7771
> URL: https://issues.apache.org/jira/browse/FLINK-7771
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> There seem to be some requests for making the operator (non-keyed) state 
> queryable. This means that the user will specify the *uuid* of the operator 
> and the *taskId*, and he will be able to access the state that corresponds to 
> that operator and for that specific task.
> This issue will serve to document the discussion on the topic, so that 
> everybody can participate.
> I also link [~till.rohrmann] and [~skonto] as he also mentioned that this 
> feature could be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7771) Make the operator state queryable

2017-10-12 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202692#comment-16202692
 ] 

Stavros Kontopoulos edited comment on FLINK-7771 at 10/12/17 10:48 PM:
---

[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (plus 
checkpoint state changes for recovery) in order to correlate state with 
(operator_id, task_id) and time. This way any query about state could always 
point to the correct task. Is this feasible or adds too much overhead?


was (Author: skonto):
[~kkl0u] We could overcome some problems by allowing Flink to inform an 
external system about state changes. If re-assignment is done the client who 
issues the queries should know. It could subscribe to that event channel (or 
push changes to a distributed log for recovery) in order to correlate state 
with (operator_id, task_id) and time. This way any query about state could 
always point to the correct task. Is this feasible or adds too much overhead?

> Make the operator state queryable
> -
>
> Key: FLINK-7771
> URL: https://issues.apache.org/jira/browse/FLINK-7771
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> There seem to be some requests for making the operator (non-keyed) state 
> queryable. This means that the user will specify the *uuid* of the operator 
> and the *taskId*, and he will be able to access the state that corresponds to 
> that operator and for that specific task.
> This issue will serve to document the discussion on the topic, so that 
> everybody can participate.
> I also link [~till.rohrmann] and [~skonto] as he also mentioned that this 
> feature could be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7771) Make the operator state queryable

2017-10-12 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202692#comment-16202692
 ] 

Stavros Kontopoulos edited comment on FLINK-7771 at 10/12/17 10:40 PM:
---

We could overcome some problems by allowing Flink to inform an external system 
about state changes. If re-assignment is done the client who issues the queries 
should know. It could subscribe to that event channel (or persisted log) in 
order to bind together state with (operator_id, task_id) and time. This way any 
query about state could always point to the correct task. Is this feasible or 
adds too much overhead?


was (Author: skonto):
We could overcome some problems by allowing Flink to inform an external system 
about state changes. If re-assignment is done the client who issues the queries 
should know. It could subscribe to that event channel (or persisted log) in 
order to bind together state with (operator_id, task_id) and time. This way any 
query about state could always point to the correct task. Is this feasible or 
too adds too much overhead?

> Make the operator state queryable
> -
>
> Key: FLINK-7771
> URL: https://issues.apache.org/jira/browse/FLINK-7771
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> There seem to be some requests for making the operator (non-keyed) state 
> queryable. This means that the user will specify the *uuid* of the operator 
> and the *taskId*, and he will be able to access the state that corresponds to 
> that operator and for that specific task.
> This issue will serve to document the discussion on the topic, so that 
> everybody can participate.
> I also link [~till.rohrmann] and [~skonto] as he also mentioned that this 
> feature could be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7737) On HCFS systems, FSDataOutputStream does not issue hsync only hflush which leads to data loss

2017-10-12 Thread Ryan Hobbs (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202700#comment-16202700
 ] 

Ryan Hobbs commented on FLINK-7737:
---

In our case we are using a hadoop compatible file system (HCFS) which is 
similar to how an S3FileSystem or AzureDataLike would behave. So in the 
traditional sense we do not have multiple data nodes; instead data is written 
to a node which is later on distributed.  It is for this reason we cannot 
guarantee that hflush will work.  I believe the flash SYNC_BLOCK on create() 
would be sufficient because it is my understanding that if that flag is set, a 
hflush() would trigger the hsync() upon close which would work in our case.

> On HCFS systems, FSDataOutputStream does not issue hsync only hflush which 
> leads to data loss
> -
>
> Key: FLINK-7737
> URL: https://issues.apache.org/jira/browse/FLINK-7737
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.3.2
> Environment: Dev
>Reporter: Ryan Hobbs
>
> During several tests where we simulated failure conditions, we have observed 
> that on HCFS systems where the data stream is of type FSDataOutputStream, 
> Flink will issue hflush() and not hsync() which results in data loss.
> In the class *StreamWriterBase.java* the code below will execute hsync if the 
> output stream is of type *HdfsDataOutputStream* but not for streams of type 
> *FSDataOutputStream*.  Is this by design?
> {code}
> protected void hflushOrSync(FSDataOutputStream os) throws IOException {
> try {
> // At this point the refHflushOrSync cannot be null,
> // since register method would have thrown if it was.
> this.refHflushOrSync.invoke(os);
> if (os instanceof HdfsDataOutputStream) {
>   ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
>   }
>   } catch (InvocationTargetException e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e.getCause());
> Throwable cause = e.getCause();
> if (cause != null && cause instanceof IOException) {
> throw (IOException) cause;
>   }
> throw new RuntimeException(msg, e);
>   } catch (Exception e) {
> String msg = "Error while trying to hflushOrSync!";
> LOG.error(msg + " " + e);
> throw new RuntimeException(msg, e);
>   }
>   }
> {code}
> Could a potential fix me to perform a sync even on streams of type 
> *FSDataOutputStream*?
> {code}
>  if (os instanceof HdfsDataOutputStream) {
> ((HdfsDataOutputStream) 
> os).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
> } else if (os instanceof FSDataOutputStream) {
> os.hsync();
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7771) Make the operator state queryable

2017-10-12 Thread Stavros Kontopoulos (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7771?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202692#comment-16202692
 ] 

Stavros Kontopoulos commented on FLINK-7771:


We could overcome some problems by allowing Flink to inform an external system 
about state changes. If re-assignment is done the client who issues the queries 
should know. It could subscribe to that event channel (or persisted log) in 
order to bind together state with (operator_id, task_id) and time. This way any 
query about state could always point to the correct task. Is this feasible or 
too adds too much overhead?

> Make the operator state queryable
> -
>
> Key: FLINK-7771
> URL: https://issues.apache.org/jira/browse/FLINK-7771
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> There seem to be some requests for making the operator (non-keyed) state 
> queryable. This means that the user will specify the *uuid* of the operator 
> and the *taskId*, and he will be able to access the state that corresponds to 
> that operator and for that specific task.
> This issue will serve to document the discussion on the topic, so that 
> everybody can participate.
> I also link [~till.rohrmann] and [~skonto] as he also mentioned that this 
> feature could be helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202661#comment-16202661
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4767
  
I don't much like the use of `RequestBody` and `ResponseBody` here, or even 
that the WebSocket distinguishes between client and server messages.  Honestly 
a `MessageBody` marker interface may suffice.   WDTY?


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, c...

2017-10-12 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4767
  
I don't much like the use of `RequestBody` and `ResponseBody` here, or even 
that the WebSocket distinguishes between client and server messages.  Honestly 
a `MessageBody` marker interface may suffice.   WDTY?


---


[jira] [Commented] (FLINK-7830) Problematic interaction of CEP and asynchronous snapshots

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202513#comment-16202513
 ] 

Aljoscha Krettek commented on FLINK-7830:
-

I think [~kkl0u]'s discovery in the {{NFASerializer}} is one part of the puzzle 
and another one is this (which I also posted in the other issues): 
https://github.com/apache/flink/pull/1134

The {{CaseClassSerializer}} no longer does a proper deep copy of the child 
serialisers and the {{KryoSerializer}} is not thread-save, meaning that this 
breaks when used in combination with asynchronous snapshots.

> Problematic interaction of CEP and asynchronous snapshots
> -
>
> Key: FLINK-7830
> URL: https://issues.apache.org/jira/browse/FLINK-7830
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing
>Reporter: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Just so we collect all the (possibly duplicate) issue reports.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202510#comment-16202510
 ] 

Aljoscha Krettek commented on FLINK-7756:
-

I think the symptom could be the same as for the other issues: The problem is 
this change: https://github.com/apache/flink/pull/1134

The {{CaseClassSerializer}} no longer does a proper deep copy of the child 
serialisers and the {{KryoSerializer}} is not thread-save, meaning that this 
breaks when used in combination with asynchronous snapshots.

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at 

[jira] [Commented] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202509#comment-16202509
 ] 

Aljoscha Krettek commented on FLINK-6321:
-

It's probably the same underlying problem as for the other issues: the problem 
is this change: https://github.com/apache/flink/pull/1134

The {{CaseClassSerializer}} no longer does a proper deep copy of the child 
serialisers and the {{KryoSerializer}} is not thread-save, meaning that this 
breaks when used in combination with asynchronous snapshots.

> RocksDB state backend Checkpointing is not working with KeyedCEP.
> -
>
> Key: FLINK-6321
> URL: https://issues.apache.org/jira/browse/FLINK-6321
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.2.0
> Environment: yarn-cluster, RocksDB State backend, Checkpointing every 
> 1000 ms
>Reporter: Shashank Agarwal
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Checkpointing is not working with RocksDBStateBackend when using CEP. It's 
> working fine with FsStateBackend and MemoryStateBackend. Application failing 
> every-time.
> {code}
> 04/18/2017 21:53:20   Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 46 for operator KeyedCEPPatternOperator -> Map (1/4).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 46 for 
> operator KeyedCEPPatternOperator -> Map (1/4).
>   ... 6 more
> Caused by: java.util.concurrent.CancellationException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:121)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
>   ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202503#comment-16202503
 ] 

Aljoscha Krettek commented on FLINK-7435:
-

In addition to the {{NFASerializer}} there is a bug introduced in this change: 
https://github.com/apache/flink/pull/1134

The {{CaseClassSerializer}} no longer does a proper deep copy of the child 
serialisers and the {{KryoSerializer}} is not thread-save, meaning that this 
breaks when used in combination with asynchronous snapshots.

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202501#comment-16202501
 ] 

Aljoscha Krettek commented on FLINK-7484:
-

The problem is this change: https://github.com/apache/flink/pull/1134

The {{CaseClassSerializer}} no longer does a proper deep copy of the child 
serialisers and the {{KryoSerializer}} is not thread-save, meaning that this 
breaks when used in combination with asynchronous snapshots.

> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> ---
>
> Key: FLINK-7484
> URL: https://issues.apache.org/jira/browse/FLINK-7484
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, DataStream API, Scala API
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> I am using many CEP's and Global Window. I am getting following error 
> sometimes and application  crashes. I have checked logically there's no flow 
> in the program. Here ItemPojo is a Pojo class and we are using 
> java.utill.List[ItemPojo]. We are using Scala DataStream API please find 
> attached logs.
> {code}
> 2017-08-17 10:04:12,814 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched 
> from RUNNING to FAILED.
> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> Serialization trace:
> category (co.thirdwatch.pojo.ItemPojo)
> underlying (scala.collection.convert.Wrappers$SeqWrapper)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
>   at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
>   at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>   at java.util.ArrayList.get(ArrayList.java:429)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>   ... 22 more
> 2017-08-17 10:04:12,816 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for 

[GitHub] flink pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (se...

2017-10-12 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r144381682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractWebSocketHandler.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.WebSocketSpecification;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A channel handler for WebSocket resources.
+ *
+ * This handler handles handshaking and ongoing messaging with a 
WebSocket client,
+ * based on a {@link WebSocketSpecification} that describes the REST 
resource location,
+ * parameter type, and message inbound/outbound types.  Messages are 
automatically
+ * encoded from (and decoded to) JSON text.
+ *
+ * Subclasses should override the following methods to extend the 
respective phases.
+ * 
+ * {@code handshakeInitiated} - occurs upon receipt of a handshake 
request from an HTTP client.  Useful for parameter validation.
+ * {@code handshakeCompleted} - occurs upon successful completion; 
WebSocket is ready for I/O.
+ * {@code messageReceived}: occurs when a WebSocket message is 
received on the channel.
+ * 
+ *
+ * The handler supports gateway availability announcements.
+ *
+ * @param  The gateway type.
+ * @param  The REST parameter type.
+ * @param  The outbound message type.
+ * @param  The inbound message type.
+ */
+public abstract class AbstractWebSocketHandler 
extends ChannelInboundHandlerAdapter {
+
+   protected final Logger log = LoggerFactory.getLogger(getClass());
+
+   private final RedirectHandler redirectHandler;
+
+   private final AttributeKey gatewayAttr;
+
+   private final WebSocketSpecification specification;
+
+   private final ChannelHandler messageCodec;
+
+   private final AttributeKey parametersAttr;
+
+   /**
+* Creates a new handler.
+*/
+   public AbstractWebSocketHandler(
+   @Nonnull CompletableFuture localAddressFuture,
+   @Nonnull GatewayRetriever leaderRetriever,
+   @Nonnull Time timeout,
+   @Nonnull WebSocketSpecification specification) {
+   this.redirectHandler = new 
RedirectHandler<>(localAddressFuture, leaderRetriever, timeout);
+   

[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202468#comment-16202468
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4767#discussion_r144381682
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractWebSocketHandler.java
 ---
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.util.HandlerUtils;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.WebSocketSpecification;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A channel handler for WebSocket resources.
+ *
+ * This handler handles handshaking and ongoing messaging with a 
WebSocket client,
+ * based on a {@link WebSocketSpecification} that describes the REST 
resource location,
+ * parameter type, and message inbound/outbound types.  Messages are 
automatically
+ * encoded from (and decoded to) JSON text.
+ *
+ * Subclasses should override the following methods to extend the 
respective phases.
+ * 
+ * {@code handshakeInitiated} - occurs upon receipt of a handshake 
request from an HTTP client.  Useful for parameter validation.
+ * {@code handshakeCompleted} - occurs upon successful completion; 
WebSocket is ready for I/O.
+ * {@code messageReceived}: occurs when a WebSocket message is 
received on the channel.
+ * 
+ *
+ * The handler supports gateway availability announcements.
+ *
+ * @param  The gateway type.
+ * @param  The REST parameter type.
+ * @param  The outbound message type.
+ * @param  The inbound message type.
+ */
+public abstract class AbstractWebSocketHandler 
extends ChannelInboundHandlerAdapter {
+
+   protected final Logger log = LoggerFactory.getLogger(getClass());
+
+   private final RedirectHandler redirectHandler;
+
+   private final AttributeKey gatewayAttr;
+
+   private final WebSocketSpecification specification;
+
+   private final ChannelHandler messageCodec;
+
+   private final AttributeKey parametersAttr;
+
+   /**
+* Creates a new handler.
+*/
+   public AbstractWebSocketHandler(
+   @Nonnull CompletableFuture localAddressFuture,
+   @Nonnull GatewayRetriever leaderRetriever,

[jira] [Commented] (FLINK-5706) Implement Flink's own S3 filesystem

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202425#comment-16202425
 ] 

ASF GitHub Bot commented on FLINK-5706:
---

Github user steveloughran commented on the issue:

https://github.com/apache/flink/pull/4818
  
1. I hope you pick up Hadoop 2.8.1 for this, as it's got a lot of the 
optimisations
1. And equally importantly: a later SDK
1. Though not one of the more recent 1.11 SDKs, where support is yet to 
ship. That's a big single shaded aws-sdk JAR so things like joda-time, jackson, 
guava, etc, don't cause problems, just the detail of 50+MB more of .jar on the 
CP.

Test wise, see how well your client works with a v4 endpoint like 
frankfurt, as there you also have to change the endpoint used.

Otherwise, nothing obvious I'd flag up as dangerous


> Implement Flink's own S3 filesystem
> ---
>
> Key: FLINK-5706
> URL: https://issues.apache.org/jira/browse/FLINK-5706
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> As part of the effort to make Flink completely independent from Hadoop, Flink 
> needs its own S3 filesystem implementation. Currently Flink relies on 
> Hadoop's S3a and S3n file systems.
> An own S3 file system can be implemented using the AWS SDK. As the basis of 
> the implementation, the Hadoop File System can be used (Apache Licensed, 
> should be okay to reuse some code as long as we do a proper attribution).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4818: [FLINK-5706] [file systems] Add S3 file systems without H...

2017-10-12 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/flink/pull/4818
  
1. I hope you pick up Hadoop 2.8.1 for this, as it's got a lot of the 
optimisations
1. And equally importantly: a later SDK
1. Though not one of the more recent 1.11 SDKs, where support is yet to 
ship. That's a big single shaded aws-sdk JAR so things like joda-time, jackson, 
guava, etc, don't cause problems, just the detail of 50+MB more of .jar on the 
CP.

Test wise, see how well your client works with a v4 endpoint like 
frankfurt, as there you also have to change the endpoint used.

Otherwise, nothing obvious I'd flag up as dangerous


---


[jira] [Commented] (FLINK-5706) Implement Flink's own S3 filesystem

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202403#comment-16202403
 ] 

ASF GitHub Bot commented on FLINK-5706:
---

Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/flink/pull/4818#discussion_r144372829
  
--- Diff: 
flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the S3 file system.
+ */
+public class S3FileSystemFactory implements FileSystemFactory {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystemFactory.class);
+
+   /** The prefixes that Flink adds to the Hadoop config under 'fs.s3a.'. 
*/
+   private static final String[] CONFIG_PREFIXES = { "s3.", "s3a.", 
"fs.s3a." };
+
+   /** Keys that are replaced (after prefix replacement, to give a more 
uniform experience
+* across different file system implementations. */
+   private static final String[][] MIRRORED_CONFIG_KEYS = {
+   { "fs.s3a.access-key", "fs.s3a.access.key" },
+   { "fs.s3a.secret-key", "fs.s3a.secret.key" }
--- End diff --

think about the session key; lets you support temporary credentials


> Implement Flink's own S3 filesystem
> ---
>
> Key: FLINK-5706
> URL: https://issues.apache.org/jira/browse/FLINK-5706
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> As part of the effort to make Flink completely independent from Hadoop, Flink 
> needs its own S3 filesystem implementation. Currently Flink relies on 
> Hadoop's S3a and S3n file systems.
> An own S3 file system can be implemented using the AWS SDK. As the basis of 
> the implementation, the Hadoop File System can be used (Apache Licensed, 
> should be okay to reuse some code as long as we do a proper attribution).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4818: [FLINK-5706] [file systems] Add S3 file systems wi...

2017-10-12 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/flink/pull/4818#discussion_r144372829
  
--- Diff: 
flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3hadoop;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.flink.runtime.util.HadoopUtils;
+
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Simple factory for the S3 file system.
+ */
+public class S3FileSystemFactory implements FileSystemFactory {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(S3FileSystemFactory.class);
+
+   /** The prefixes that Flink adds to the Hadoop config under 'fs.s3a.'. 
*/
+   private static final String[] CONFIG_PREFIXES = { "s3.", "s3a.", 
"fs.s3a." };
+
+   /** Keys that are replaced (after prefix replacement, to give a more 
uniform experience
+* across different file system implementations. */
+   private static final String[][] MIRRORED_CONFIG_KEYS = {
+   { "fs.s3a.access-key", "fs.s3a.access.key" },
+   { "fs.s3a.secret-key", "fs.s3a.secret.key" }
--- End diff --

think about the session key; lets you support temporary credentials


---


[GitHub] flink pull request #4818: [FLINK-5706] [file systems] Add S3 file systems wi...

2017-10-12 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4818

[FLINK-5706] [file systems] Add S3 file systems without Hadoop dependencies

## What is the purpose of the change

This adds two implementations of a file system that write to S3 so that 
users can use Flink with S3 without depending on Hadoop and have an alternative 
to Hadoop's S3 connectors.

Both are not actual re-implementations but wrap other implementations and 
shade dependencies.

1. The first is a wrapper around Hadoop's s3a file system. By pulling a 
smaller dependency tree and shading all dependencies away, this keeps the 
appearance of Flink being Hadoop-free, from a dependency perspective. We can 
also bump the shaded Hadoop dependency here to get improvements to s3a in (as 
in Hadoop 3.0) without causing dependency conflicts.

2. The second S3 file system is from the Presto Project. Initial simple 
tests seem to indicate that it responds slightly faster and in a bit more 
lightweight manner to write/read/list requests, compared to the Hadoop s3a FS, 
but it has some semantical differences. For example, creating a directory does 
not mean the file system recognized that the directory is there. The directory 
is only recognized as existing once files are inserted. For checkpointing, that 
could even be preferable.

Both file systems register themselves under `s3` to not overload the `s3n` 
and `s3a` schemes used by Hadoop,

## Brief change log

  - Adds `flink-filesystems/flink-s3-fs-hadoop`
  - Adds `flink-filesystems/flink-s3-fs-presto`

## Verifying this change

This adds some initial integration tests, which do depend on S3 
credentials. These credentials are not in the code, but only encrypted on 
Travis, which is why the tests can only run in a meaningful way either on the 
`apache/flink` master branch, or in a committer repository when the committer 
enabled Travis uploads to S3 (for logs) - the tests here use the same secret 
credentials.

Since this does not implement the actual S3 communication, we have no tests 
for that. The tests only test instantiation and whether S3 communication can be 
established (simple reads/writes to a bucket, listing, etc).

Change can also be verified by building Flink, pulling the respective S3 FS 
jar from `/opt` into `/lib` and running a workload that checkpoints or writes 
to S3.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)

Proper behavior of the File Systems is important, otherwise checkpointing 
may fail. In some sense we are already relying on proper tests of HDFS and S3 
connectors by the Hadoop project. This adds a similar dependency.

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)

Will add documentation once the details of this feature are agreed upon.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink fs_s3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4818


commit a0b89ad02d4cb67c4d5e1e28efcb6872af0540e6
Author: Stephan Ewen 
Date:   2017-10-06T15:41:00Z

[FLINK-5706] [file systems] Add S3 file systems without Hadoop dependencies

This adds two implementations of a file system that write to S3.
Both are not actual re-implementations but wrap other implementations and 
shade dependencies.

(1) A wrapper around Hadoop's s3a file system. By pulling a smaller 
dependency tree and
shading all dependencies away, this keeps the appearance of Flink being 
Hadoop-free,
from a dependency perspective.

(2) The second S3 file system is from the Presto Project.
Initial simple tests seem to indicate that it responds slightly faster
and in a bit more lightweight manner to write/read/list requests, 
compared
to the Hadoop s3a FS, but it has some semantical differences.




---


[jira] [Commented] (FLINK-5706) Implement Flink's own S3 filesystem

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202332#comment-16202332
 ] 

ASF GitHub Bot commented on FLINK-5706:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/4818

[FLINK-5706] [file systems] Add S3 file systems without Hadoop dependencies

## What is the purpose of the change

This adds two implementations of a file system that write to S3 so that 
users can use Flink with S3 without depending on Hadoop and have an alternative 
to Hadoop's S3 connectors.

Both are not actual re-implementations but wrap other implementations and 
shade dependencies.

1. The first is a wrapper around Hadoop's s3a file system. By pulling a 
smaller dependency tree and shading all dependencies away, this keeps the 
appearance of Flink being Hadoop-free, from a dependency perspective. We can 
also bump the shaded Hadoop dependency here to get improvements to s3a in (as 
in Hadoop 3.0) without causing dependency conflicts.

2. The second S3 file system is from the Presto Project. Initial simple 
tests seem to indicate that it responds slightly faster and in a bit more 
lightweight manner to write/read/list requests, compared to the Hadoop s3a FS, 
but it has some semantical differences. For example, creating a directory does 
not mean the file system recognized that the directory is there. The directory 
is only recognized as existing once files are inserted. For checkpointing, that 
could even be preferable.

Both file systems register themselves under `s3` to not overload the `s3n` 
and `s3a` schemes used by Hadoop,

## Brief change log

  - Adds `flink-filesystems/flink-s3-fs-hadoop`
  - Adds `flink-filesystems/flink-s3-fs-presto`

## Verifying this change

This adds some initial integration tests, which do depend on S3 
credentials. These credentials are not in the code, but only encrypted on 
Travis, which is why the tests can only run in a meaningful way either on the 
`apache/flink` master branch, or in a committer repository when the committer 
enabled Travis uploads to S3 (for logs) - the tests here use the same secret 
credentials.

Since this does not implement the actual S3 communication, we have no tests 
for that. The tests only test instantiation and whether S3 communication can be 
established (simple reads/writes to a bucket, listing, etc).

Change can also be verified by building Flink, pulling the respective S3 FS 
jar from `/opt` into `/lib` and running a workload that checkpoints or writes 
to S3.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (**yes** / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)

Proper behavior of the File Systems is important, otherwise checkpointing 
may fail. In some sense we are already relying on proper tests of HDFS and S3 
connectors by the Hadoop project. This adds a similar dependency.

## Documentation

  - Does this pull request introduce a new feature? (**yes** / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / **not documented**)

Will add documentation once the details of this feature are agreed upon.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink fs_s3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4818


commit a0b89ad02d4cb67c4d5e1e28efcb6872af0540e6
Author: Stephan Ewen 
Date:   2017-10-06T15:41:00Z

[FLINK-5706] [file systems] Add S3 file systems without Hadoop dependencies

This adds two implementations of a file system that write to S3.
Both are not actual re-implementations but wrap other implementations and 
shade dependencies.

(1) A wrapper around Hadoop's s3a file system. By pulling a smaller 
dependency tree and
shading all dependencies away, this keeps the appearance of Flink being 
Hadoop-free,
from a dependency perspective.

(2) The second S3 file system is from the Presto Project.
Initial simple tests seem to indicate that it responds slightly faster
and in a bit more 

[GitHub] flink issue #4809: [FLINK-7803][Documentation] Add missing savepoint informa...

2017-10-12 Thread razvan100
Github user razvan100 commented on the issue:

https://github.com/apache/flink/pull/4809
  
Okay, so if you account for it in the review then I don't need to make 
another empty commit. Thanks!


---


[jira] [Commented] (FLINK-7803) Update savepoint Documentation

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202327#comment-16202327
 ] 

ASF GitHub Bot commented on FLINK-7803:
---

Github user razvan100 commented on the issue:

https://github.com/apache/flink/pull/4809
  
Okay, so if you account for it in the review then I don't need to make 
another empty commit. Thanks!


> Update savepoint Documentation
> --
>
> Key: FLINK-7803
> URL: https://issues.apache.org/jira/browse/FLINK-7803
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Razvan
>Assignee: Razvan
>
> Can you please update 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
>  to specify the savepoint location *MUST* always be a location accessible by 
> all hosts?
> I spent quite some time believing it'S a bug and trying to find solutions, 
> see https://issues.apache.org/jira/browse/FLINK-7750. It's not obvious in the 
> current documentation and other might waste time also believing it's an 
> actual issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-5706) Implement Flink's own S3 filesystem

2017-10-12 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen reassigned FLINK-5706:
---

Assignee: Stephan Ewen

> Implement Flink's own S3 filesystem
> ---
>
> Key: FLINK-5706
> URL: https://issues.apache.org/jira/browse/FLINK-5706
> Project: Flink
>  Issue Type: New Feature
>  Components: filesystem-connector
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>
> As part of the effort to make Flink completely independent from Hadoop, Flink 
> needs its own S3 filesystem implementation. Currently Flink relies on 
> Hadoop's S3a and S3n file systems.
> An own S3 file system can be implemented using the AWS SDK. As the basis of 
> the implementation, the Hadoop File System can be used (Apache Licensed, 
> should be okay to reuse some code as long as we do a proper attribution).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.

2017-10-12 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202238#comment-16202238
 ] 

Kostas Kloudas commented on FLINK-7760:
---

No [~shashank734] it shouldn't. I am just saying this as a side note.

> Restore failing from external checkpointing metadata.
> -
>
> Key: FLINK-7760
> URL: https://issues.apache.org/jira/browse/FLINK-7760
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, Flink 1.3.2, HDFS,  FsStateBackend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> My job failed due to failure of cassandra. I have enabled 
> ExternalizedCheckpoints. But when job tried to restore from that checkpoint 
> it's failing continuously with following error.
> {code:java}
> 2017-10-04 09:39:20,611 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - KeyedCEPPatternOperator -> Map (1/2) 
> (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
>   at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>   ... 6 more
> {code}
> I have tried to start new job also after failure with parameter {code:java} 
> -s [checkpoint meta data path]{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7832) SlotManager should return number of registered slots

2017-10-12 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7832:


 Summary: SlotManager should return number of registered slots
 Key: FLINK-7832
 URL: https://issues.apache.org/jira/browse/FLINK-7832
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.4.0


The {{SlotManager}} should provide information about the number of registered 
slots for a {{TaskExecutor}} and how many of these slots are still free.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7653) Properly implement DispatcherGateway methods on the Dispatcher

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202233#comment-16202233
 ] 

ASF GitHub Bot commented on FLINK-7653:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4793


> Properly implement DispatcherGateway methods on the Dispatcher
> --
>
> Key: FLINK-7653
> URL: https://issues.apache.org/jira/browse/FLINK-7653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
> Fix For: 1.4.0
>
>
> Currently, {{DispatcherGateway}} methods such as {{listJobs}}, 
> {{requestStatusOverview}}, and probably other new methods that will be added 
> as we port more existing REST handlers to the new endpoint, have only dummy 
> placeholder implementations in the {{Dispatcher}} marked with TODOs.
> This ticket is to track that they are all eventually properly implemented.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4793: [FLINK-7653] Properly implement Dispatcher#request...

2017-10-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4793


---


[jira] [Resolved] (FLINK-7653) Properly implement DispatcherGateway methods on the Dispatcher

2017-10-12 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7653.
--
   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 660a45ca1a67ade234652482e8e41c79ab674d3d

> Properly implement DispatcherGateway methods on the Dispatcher
> --
>
> Key: FLINK-7653
> URL: https://issues.apache.org/jira/browse/FLINK-7653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
> Fix For: 1.4.0
>
>
> Currently, {{DispatcherGateway}} methods such as {{listJobs}}, 
> {{requestStatusOverview}}, and probably other new methods that will be added 
> as we port more existing REST handlers to the new endpoint, have only dummy 
> placeholder implementations in the {{Dispatcher}} marked with TODOs.
> This ticket is to track that they are all eventually properly implemented.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4808) Allow skipping failed checkpoints

2017-10-12 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202221#comment-16202221
 ] 

ramkrishna.s.vasudevan commented on FLINK-4808:
---

I can try working on it next week? I raised a PR for FLINK-4810 so I will try 
to refresh that against the latest code? Is that fine? If the issue is very 
critical and can't wait till next week then am fine if some one wants to fix it

> Allow skipping failed checkpoints
> -
>
> Key: FLINK-4808
> URL: https://issues.apache.org/jira/browse/FLINK-4808
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.2, 1.1.3
>Reporter: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, if Flink cannot complete a checkpoint, it results in a failure and 
> recovery.
> To make the impact of less stable storage infrastructure on the performance 
> of Flink less severe, Flink should be able to tolerate a certain number of 
> failed checkpoints and simply keep executing.
> This should be controllable via a parameter, for example:
> {code}
> env.getCheckpointConfig().setAllowedFailedCheckpoints(3);
> {code}
> A value of {{-1}} could indicate an infinite number of checkpoint failures 
> tolerated by Flink.
> The default value should still be {{0}}, to keep compatibility with the 
> existing behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4808) Allow skipping failed checkpoints

2017-10-12 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202223#comment-16202223
 ] 

ramkrishna.s.vasudevan commented on FLINK-4808:
---

Thanks once again.

> Allow skipping failed checkpoints
> -
>
> Key: FLINK-4808
> URL: https://issues.apache.org/jira/browse/FLINK-4808
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.2, 1.1.3
>Reporter: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, if Flink cannot complete a checkpoint, it results in a failure and 
> recovery.
> To make the impact of less stable storage infrastructure on the performance 
> of Flink less severe, Flink should be able to tolerate a certain number of 
> failed checkpoints and simply keep executing.
> This should be controllable via a parameter, for example:
> {code}
> env.getCheckpointConfig().setAllowedFailedCheckpoints(3);
> {code}
> A value of {{-1}} could indicate an infinite number of checkpoint failures 
> tolerated by Flink.
> The default value should still be {{0}}, to keep compatibility with the 
> existing behavior.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7831) Retrieve last received heartbeat from HeartbeatManager

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202217#comment-16202217
 ] 

ASF GitHub Bot commented on FLINK-7831:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4817

[FLINK-7831] Make last received heartbeat retrievable

## What is the purpose of the change

This commit adds functionality to retrieve the last received heartbeat from
the HeartbeatManager.

## Verifying this change

- `HeartbeatManagerTest#testLastHeartbeatFrom` and 
`HeartbeatManager#testLastHeartbeatFromUnregisteredTarget`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink addLastHeartbeat

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4817.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4817


commit 9f86e7393d7825a16faf2cf91cb7ac9e6f511bff
Author: Till 
Date:   2017-10-12T16:24:02Z

[FLINK-7831] Make last received heartbeat retrievable

This commit adds functionality to retrieve the last received heartbeat from
the HeartbeatManager.




> Retrieve last received heartbeat from HeartbeatManager
> --
>
> Key: FLINK-7831
> URL: https://issues.apache.org/jira/browse/FLINK-7831
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> In order to serve the information needed for the Web UI the 
> {{HeartbeatManager}} should expose when it received the last heartbeat from 
> the individual {{TaskExecutors}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4817: [FLINK-7831] Make last received heartbeat retrieva...

2017-10-12 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4817

[FLINK-7831] Make last received heartbeat retrievable

## What is the purpose of the change

This commit adds functionality to retrieve the last received heartbeat from
the HeartbeatManager.

## Verifying this change

- `HeartbeatManagerTest#testLastHeartbeatFrom` and 
`HeartbeatManager#testLastHeartbeatFromUnregisteredTarget`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink addLastHeartbeat

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4817.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4817


commit 9f86e7393d7825a16faf2cf91cb7ac9e6f511bff
Author: Till 
Date:   2017-10-12T16:24:02Z

[FLINK-7831] Make last received heartbeat retrievable

This commit adds functionality to retrieve the last received heartbeat from
the HeartbeatManager.




---


[jira] [Commented] (FLINK-7830) Problematic interaction of CEP and asynchronous snapshots

2017-10-12 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202187#comment-16202187
 ] 

Shashank Agarwal commented on FLINK-7830:
-

[~aljoscha] I think you have covered all reported by me.

> Problematic interaction of CEP and asynchronous snapshots
> -
>
> Key: FLINK-7830
> URL: https://issues.apache.org/jira/browse/FLINK-7830
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing
>Reporter: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Just so we collect all the (possibly duplicate) issue reports.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7830) Problematic interaction of CEP and asynchronous snapshots

2017-10-12 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202172#comment-16202172
 ] 

Kostas Kloudas commented on FLINK-7830:
---

Thanks [~aljoscha] for creating this. As for the serialize duplication problem, 
the solution is posted here https://issues.apache.org/jira/browse/FLINK-7435

> Problematic interaction of CEP and asynchronous snapshots
> -
>
> Key: FLINK-7830
> URL: https://issues.apache.org/jira/browse/FLINK-7830
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing
>Reporter: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Just so we collect all the (possibly duplicate) issue reports.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.

2017-10-12 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202173#comment-16202173
 ] 

Shashank Agarwal commented on FLINK-7760:
-

[~kkl0u] Yes that is redundent. Actually i am using this stream in some other 
places also. Is this a problem ?? cause logically it shouldn't ??

[~aljoscha] Restart strategy also tried automatically and failed. I also tried 
using same jar and failed again and again. I have tried different jar also but 
failed.

> Restore failing from external checkpointing metadata.
> -
>
> Key: FLINK-7760
> URL: https://issues.apache.org/jira/browse/FLINK-7760
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, Flink 1.3.2, HDFS,  FsStateBackend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> My job failed due to failure of cassandra. I have enabled 
> ExternalizedCheckpoints. But when job tried to restore from that checkpoint 
> it's failing continuously with following error.
> {code:java}
> 2017-10-04 09:39:20,611 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - KeyedCEPPatternOperator -> Map (1/2) 
> (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
>   at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>   ... 6 more
> {code}
> I have tried to start new job also after failure with parameter {code:java} 
> -s [checkpoint meta data path]{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7831) Retrieve last received heartbeat from HeartbeatManager

2017-10-12 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7831:


 Summary: Retrieve last received heartbeat from HeartbeatManager
 Key: FLINK-7831
 URL: https://issues.apache.org/jira/browse/FLINK-7831
 Project: Flink
  Issue Type: Improvement
  Components: Distributed Coordination
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


In order to serve the information needed for the Web UI the 
{{HeartbeatManager}} should expose when it received the last heartbeat from the 
individual {{TaskExecutors}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.

2017-10-12 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202162#comment-16202162
 ] 

Shashank Agarwal commented on FLINK-6321:
-

[~kkl0u] I am using 4-5 CEP's and around 20-22 map, flat-map and co-flatmap in 
my code. Which have simple conditions like. 

{code}
val successOrderPattern = 
Pattern.begin[RawSignal]("event1").where(_._type.getOrElse(null).toInt == 1)
  .followedBy("event2").where(signal => (signal._type.getOrElse("0").toInt 
== 2 && "_creditCard".equalsIgnoreCase(signal._paymentType.getOrElse(null

val successOrderPatternStream = CEP.pattern(stream.keyBy((x) => 
(x._someKey1.getOrElse(0), x._someSubKey2.getOrElse(0))), successOrderPattern)

val ordersStream: DataStream[OrderSignal] =
  successOrderPatternStream.select(new TransPatternFlatMap).uid("order CEP")

{code}

> RocksDB state backend Checkpointing is not working with KeyedCEP.
> -
>
> Key: FLINK-6321
> URL: https://issues.apache.org/jira/browse/FLINK-6321
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.2.0
> Environment: yarn-cluster, RocksDB State backend, Checkpointing every 
> 1000 ms
>Reporter: Shashank Agarwal
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Checkpointing is not working with RocksDBStateBackend when using CEP. It's 
> working fine with FsStateBackend and MemoryStateBackend. Application failing 
> every-time.
> {code}
> 04/18/2017 21:53:20   Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 46 for operator KeyedCEPPatternOperator -> Map (1/4).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 46 for 
> operator KeyedCEPPatternOperator -> Map (1/4).
>   ... 6 more
> Caused by: java.util.concurrent.CancellationException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:121)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
>   ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7830) Problematic interaction of CEP and asynchronous snapshots

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202159#comment-16202159
 ] 

Aljoscha Krettek commented on FLINK-7830:
-

I suspect that there is some issue with asynchronicity (there is also this 
other ML thread: 
https://lists.apache.org/thread.html/73978cdfcf11185fd25b71a1351dcc5265afc7ea353f8d19b294ab59@%3Cuser.flink.apache.org%3E)
 and the NFA serialiser is not correctly duplicated (as [~kkl0u] already 
mentioned somewhere).

> Problematic interaction of CEP and asynchronous snapshots
> -
>
> Key: FLINK-7830
> URL: https://issues.apache.org/jira/browse/FLINK-7830
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing
>Reporter: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Just so we collect all the (possibly duplicate) issue reports.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6318) NFA serialisation doesn't work with TypeSerializers that load classes

2017-10-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-6318:

Issue Type: Sub-task  (was: Improvement)
Parent: FLINK-7830

> NFA serialisation doesn't work with TypeSerializers that load classes
> -
>
> Key: FLINK-6318
> URL: https://issues.apache.org/jira/browse/FLINK-6318
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> This is from a user reported stack trace:
> {code}
> 04/12/2017 10:05:04   Job execution switched to status FAILING.
> java.lang.RuntimeException: Could not deserialize NFA.
>   at org.apache.flink.cep.nfa.NFA$Serializer.deserialize(NFA.java:538)
>   at org.apache.flink.cep.nfa.NFA$Serializer.deserialize(NFA.java:469)
>   at 
> org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:81)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:124)
>   at 
> org.apache.flink.cep.operator.AbstractCEPBasePatternOperator.processElement(AbstractCEPBasePatternOperator.java:72)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processElement(AbstractKeyedCEPPatternOperator.java:162)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: 
> co.ronak.nto.Job$$anon$18$$anon$21$$anon$3
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>   at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
>   at 
> org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:53)
>   at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>   at 
> java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
>   at 
> org.apache.flink.cep.NonDuplicatingTypeSerializer.readObject(NonDuplicatingTypeSerializer.java:190)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> 

[jira] [Commented] (FLINK-7830) Problematic interaction of CEP and asynchronous snapshots

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7830?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202154#comment-16202154
 ] 

Aljoscha Krettek commented on FLINK-7830:
-

[~shashank734] and [~lidaiqing1993]: I created this as an umbrella issue. Could 
you please make sure that I actually got all the issues around that topic.

FYI: [~kkl0u]

> Problematic interaction of CEP and asynchronous snapshots
> -
>
> Key: FLINK-7830
> URL: https://issues.apache.org/jira/browse/FLINK-7830
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing
>Reporter: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Just so we collect all the (possibly duplicate) issue reports.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.

2017-10-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-6321:

Issue Type: Sub-task  (was: Bug)
Parent: FLINK-7830

> RocksDB state backend Checkpointing is not working with KeyedCEP.
> -
>
> Key: FLINK-6321
> URL: https://issues.apache.org/jira/browse/FLINK-6321
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.2.0
> Environment: yarn-cluster, RocksDB State backend, Checkpointing every 
> 1000 ms
>Reporter: Shashank Agarwal
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Checkpointing is not working with RocksDBStateBackend when using CEP. It's 
> working fine with FsStateBackend and MemoryStateBackend. Application failing 
> every-time.
> {code}
> 04/18/2017 21:53:20   Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 46 for operator KeyedCEPPatternOperator -> Map (1/4).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 46 for 
> operator KeyedCEPPatternOperator -> Map (1/4).
>   ... 6 more
> Caused by: java.util.concurrent.CancellationException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:121)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
>   ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2017-10-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7756:

Issue Type: Sub-task  (was: Bug)
Parent: FLINK-7830

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> 

[jira] [Updated] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5

2017-10-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7484?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7484:

Issue Type: Sub-task  (was: Bug)
Parent: FLINK-7830

> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> ---
>
> Key: FLINK-7484
> URL: https://issues.apache.org/jira/browse/FLINK-7484
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, DataStream API, Scala API
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> I am using many CEP's and Global Window. I am getting following error 
> sometimes and application  crashes. I have checked logically there's no flow 
> in the program. Here ItemPojo is a Pojo class and we are using 
> java.utill.List[ItemPojo]. We are using Scala DataStream API please find 
> attached logs.
> {code}
> 2017-08-17 10:04:12,814 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched 
> from RUNNING to FAILED.
> com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
> Index: 7, Size: 5
> Serialization trace:
> category (co.thirdwatch.pojo.ItemPojo)
> underlying (scala.collection.convert.Wrappers$SeqWrapper)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>   at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
>   at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>   at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>   at 
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>   at 
> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
>   at 
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
>   at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>   at java.util.ArrayList.get(ArrayList.java:429)
>   at 
> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>   at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>   at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
>   at 
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
>   ... 22 more
> 2017-08-17 10:04:12,816 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Freeing task resources for TriggerWindow(GlobalWindows(), 
> ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
>  co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
> WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
> 

[jira] [Updated] (FLINK-7760) Restore failing from external checkpointing metadata.

2017-10-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7760:

Issue Type: Sub-task  (was: Bug)
Parent: FLINK-7830

> Restore failing from external checkpointing metadata.
> -
>
> Key: FLINK-7760
> URL: https://issues.apache.org/jira/browse/FLINK-7760
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, Flink 1.3.2, HDFS,  FsStateBackend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> My job failed due to failure of cassandra. I have enabled 
> ExternalizedCheckpoints. But when job tried to restore from that checkpoint 
> it's failing continuously with following error.
> {code:java}
> 2017-10-04 09:39:20,611 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - KeyedCEPPatternOperator -> Map (1/2) 
> (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
>   at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>   ... 6 more
> {code}
> I have tried to start new job also after failure with parameter {code:java} 
> -s [checkpoint meta data path]{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7435:

Issue Type: Sub-task  (was: Bug)
Parent: FLINK-7830

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7830) Problematic interaction of CEP and asynchronous snapshots

2017-10-12 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-7830:
---

 Summary: Problematic interaction of CEP and asynchronous snapshots
 Key: FLINK-7830
 URL: https://issues.apache.org/jira/browse/FLINK-7830
 Project: Flink
  Issue Type: Bug
  Components: CEP, State Backends, Checkpointing
Reporter: Aljoscha Krettek
 Fix For: 1.4.0


Just so we collect all the (possibly duplicate) issue reports.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202139#comment-16202139
 ] 

Aljoscha Krettek commented on FLINK-7435:
-

[~lidaiqing1993] When you say incremental, did you mean asynchronous snapshots?

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7435:

Priority: Blocker  (was: Major)

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7435:

Fix Version/s: 1.4.0

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2017-10-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7756:

Fix Version/s: 1.4.0

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> 

[jira] [Updated] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2017-10-12 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7756:

Priority: Blocker  (was: Major)

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> 

[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.

2017-10-12 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202112#comment-16202112
 ] 

Aljoscha Krettek commented on FLINK-7760:
-

Are you using exactly the same jar when trying to recover the job?

> Restore failing from external checkpointing metadata.
> -
>
> Key: FLINK-7760
> URL: https://issues.apache.org/jira/browse/FLINK-7760
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, Flink 1.3.2, HDFS,  FsStateBackend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> My job failed due to failure of cassandra. I have enabled 
> ExternalizedCheckpoints. But when job tried to restore from that checkpoint 
> it's failing continuously with following error.
> {code:java}
> 2017-10-04 09:39:20,611 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - KeyedCEPPatternOperator -> Map (1/2) 
> (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
>   at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>   ... 6 more
> {code}
> I have tried to start new job also after failure with parameter {code:java} 
> -s [checkpoint meta data path]{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7810) Switch from custom Flakka to Akka 2.4.x

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202111#comment-16202111
 ] 

ASF GitHub Bot commented on FLINK-7810:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4807#discussion_r144326685
  
--- Diff: docs/start/building.md ---
@@ -115,19 +115,7 @@ The `-Pvendor-repos` activates a Maven [build 
profile](http://maven.apache.org/g
 
 Flink has APIs, libraries, and runtime modules written in 
[Scala](http://scala-lang.org). Users of the Scala API and libraries may have 
to match the Scala version of Flink with the Scala version of their projects 
(because Scala is not strictly backwards compatible).
 
-**By default, Flink is built with the Scala 2.11**. To build Flink with 
Scala *2.10*, you can change the default Scala *binary version* by using 
*scala-2.10* build profile:
-
-~~~bash
-# Build with Scala version 2.10
-mvn clean install -DskipTests -Pscala-2.10
-~~~
-
-To build against custom Scala versions, you need to define new custom 
build profile that will override *scala.version* and *scala.binary.version* 
values.
-
-Flink is developed against Scala *2.11* and tested additionally against 
Scala *2.10*. These two versions are known to be compatible. Earlier versions 
(like Scala *2.9*) are *not* compatible.
--- End diff --

Yes, I don't explicitly say what we're compatible with now. In fact we only 
support 2.11 now, supporting 2.12 is a bit harder than I though.


> Switch from custom Flakka to Akka 2.4.x
> ---
>
> Key: FLINK-7810
> URL: https://issues.apache.org/jira/browse/FLINK-7810
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4807: [FLINK-7810] Switch from custom Flakka to Akka 2.4...

2017-10-12 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4807#discussion_r144326685
  
--- Diff: docs/start/building.md ---
@@ -115,19 +115,7 @@ The `-Pvendor-repos` activates a Maven [build 
profile](http://maven.apache.org/g
 
 Flink has APIs, libraries, and runtime modules written in 
[Scala](http://scala-lang.org). Users of the Scala API and libraries may have 
to match the Scala version of Flink with the Scala version of their projects 
(because Scala is not strictly backwards compatible).
 
-**By default, Flink is built with the Scala 2.11**. To build Flink with 
Scala *2.10*, you can change the default Scala *binary version* by using 
*scala-2.10* build profile:
-
-~~~bash
-# Build with Scala version 2.10
-mvn clean install -DskipTests -Pscala-2.10
-~~~
-
-To build against custom Scala versions, you need to define new custom 
build profile that will override *scala.version* and *scala.binary.version* 
values.
-
-Flink is developed against Scala *2.11* and tested additionally against 
Scala *2.10*. These two versions are known to be compatible. Earlier versions 
(like Scala *2.9*) are *not* compatible.
--- End diff --

Yes, I don't explicitly say what we're compatible with now. In fact we only 
support 2.11 now, supporting 2.12 is a bit harder than I though.


---


[jira] [Commented] (FLINK-7810) Switch from custom Flakka to Akka 2.4.x

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202107#comment-16202107
 ] 

ASF GitHub Bot commented on FLINK-7810:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4807#discussion_r144325824
  
--- Diff: docs/start/building.md ---
@@ -115,19 +115,7 @@ The `-Pvendor-repos` activates a Maven [build 
profile](http://maven.apache.org/g
 
 Flink has APIs, libraries, and runtime modules written in 
[Scala](http://scala-lang.org). Users of the Scala API and libraries may have 
to match the Scala version of Flink with the Scala version of their projects 
(because Scala is not strictly backwards compatible).
 
-**By default, Flink is built with the Scala 2.11**. To build Flink with 
Scala *2.10*, you can change the default Scala *binary version* by using 
*scala-2.10* build profile:
-
-~~~bash
-# Build with Scala version 2.10
-mvn clean install -DskipTests -Pscala-2.10
-~~~
-
-To build against custom Scala versions, you need to define new custom 
build profile that will override *scala.version* and *scala.binary.version* 
values.
-
-Flink is developed against Scala *2.11* and tested additionally against 
Scala *2.10*. These two versions are known to be compatible. Earlier versions 
(like Scala *2.9*) are *not* compatible.
-
-Newer versions may be compatible, depending on breaking changes in the 
language features used by Flink, and the availability of Flink's dependencies 
in those Scala versions. The dependencies written in Scala include for example 
*Kafka*, *Akka*, *Scalatest*, and *scopt*.
-
+**By default, Flink is built with the Scala 2.11**.
--- End diff --

fixing


> Switch from custom Flakka to Akka 2.4.x
> ---
>
> Key: FLINK-7810
> URL: https://issues.apache.org/jira/browse/FLINK-7810
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4807: [FLINK-7810] Switch from custom Flakka to Akka 2.4...

2017-10-12 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/4807#discussion_r144325824
  
--- Diff: docs/start/building.md ---
@@ -115,19 +115,7 @@ The `-Pvendor-repos` activates a Maven [build 
profile](http://maven.apache.org/g
 
 Flink has APIs, libraries, and runtime modules written in 
[Scala](http://scala-lang.org). Users of the Scala API and libraries may have 
to match the Scala version of Flink with the Scala version of their projects 
(because Scala is not strictly backwards compatible).
 
-**By default, Flink is built with the Scala 2.11**. To build Flink with 
Scala *2.10*, you can change the default Scala *binary version* by using 
*scala-2.10* build profile:
-
-~~~bash
-# Build with Scala version 2.10
-mvn clean install -DskipTests -Pscala-2.10
-~~~
-
-To build against custom Scala versions, you need to define new custom 
build profile that will override *scala.version* and *scala.binary.version* 
values.
-
-Flink is developed against Scala *2.11* and tested additionally against 
Scala *2.10*. These two versions are known to be compatible. Earlier versions 
(like Scala *2.9*) are *not* compatible.
-
-Newer versions may be compatible, depending on breaking changes in the 
language features used by Flink, and the availability of Flink's dependencies 
in those Scala versions. The dependencies written in Scala include for example 
*Kafka*, *Akka*, *Scalatest*, and *scopt*.
-
+**By default, Flink is built with the Scala 2.11**.
--- End diff --

fixing


---


[jira] [Commented] (FLINK-7810) Switch from custom Flakka to Akka 2.4.x

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202086#comment-16202086
 ] 

ASF GitHub Bot commented on FLINK-7810:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4807
  
Thanks for doing this, we need Akka 2.4 badly for improved SSL support.


> Switch from custom Flakka to Akka 2.4.x
> ---
>
> Key: FLINK-7810
> URL: https://issues.apache.org/jira/browse/FLINK-7810
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4807: [FLINK-7810] Switch from custom Flakka to Akka 2.4.x

2017-10-12 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4807
  
Thanks for doing this, we need Akka 2.4 badly for improved SSL support.


---


[GitHub] flink pull request #4807: [FLINK-7810] Switch from custom Flakka to Akka 2.4...

2017-10-12 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4807#discussion_r144320740
  
--- Diff: docs/start/building.md ---
@@ -115,19 +115,7 @@ The `-Pvendor-repos` activates a Maven [build 
profile](http://maven.apache.org/g
 
 Flink has APIs, libraries, and runtime modules written in 
[Scala](http://scala-lang.org). Users of the Scala API and libraries may have 
to match the Scala version of Flink with the Scala version of their projects 
(because Scala is not strictly backwards compatible).
 
-**By default, Flink is built with the Scala 2.11**. To build Flink with 
Scala *2.10*, you can change the default Scala *binary version* by using 
*scala-2.10* build profile:
-
-~~~bash
-# Build with Scala version 2.10
-mvn clean install -DskipTests -Pscala-2.10
-~~~
-
-To build against custom Scala versions, you need to define new custom 
build profile that will override *scala.version* and *scala.binary.version* 
values.
-
-Flink is developed against Scala *2.11* and tested additionally against 
Scala *2.10*. These two versions are known to be compatible. Earlier versions 
(like Scala *2.9*) are *not* compatible.
--- End diff --

I read this paragraph to imply that Flink is still compatible with 2.10, 
but is that accurate given the use of Akka 2.4?


---


[jira] [Commented] (FLINK-7810) Switch from custom Flakka to Akka 2.4.x

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202084#comment-16202084
 ] 

ASF GitHub Bot commented on FLINK-7810:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/4807#discussion_r144320740
  
--- Diff: docs/start/building.md ---
@@ -115,19 +115,7 @@ The `-Pvendor-repos` activates a Maven [build 
profile](http://maven.apache.org/g
 
 Flink has APIs, libraries, and runtime modules written in 
[Scala](http://scala-lang.org). Users of the Scala API and libraries may have 
to match the Scala version of Flink with the Scala version of their projects 
(because Scala is not strictly backwards compatible).
 
-**By default, Flink is built with the Scala 2.11**. To build Flink with 
Scala *2.10*, you can change the default Scala *binary version* by using 
*scala-2.10* build profile:
-
-~~~bash
-# Build with Scala version 2.10
-mvn clean install -DskipTests -Pscala-2.10
-~~~
-
-To build against custom Scala versions, you need to define new custom 
build profile that will override *scala.version* and *scala.binary.version* 
values.
-
-Flink is developed against Scala *2.11* and tested additionally against 
Scala *2.10*. These two versions are known to be compatible. Earlier versions 
(like Scala *2.9*) are *not* compatible.
--- End diff --

I read this paragraph to imply that Flink is still compatible with 2.10, 
but is that accurate given the use of Akka 2.4?


> Switch from custom Flakka to Akka 2.4.x
> ---
>
> Key: FLINK-7810
> URL: https://issues.apache.org/jira/browse/FLINK-7810
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.

2017-10-12 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202083#comment-16202083
 ] 

Kostas Kloudas commented on FLINK-6321:
---

[~shashank734] And you may be able to also run it locally from your IDE if you 
have access to Flink source code and you do the change yourself.

> RocksDB state backend Checkpointing is not working with KeyedCEP.
> -
>
> Key: FLINK-6321
> URL: https://issues.apache.org/jira/browse/FLINK-6321
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.2.0
> Environment: yarn-cluster, RocksDB State backend, Checkpointing every 
> 1000 ms
>Reporter: Shashank Agarwal
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Checkpointing is not working with RocksDBStateBackend when using CEP. It's 
> working fine with FsStateBackend and MemoryStateBackend. Application failing 
> every-time.
> {code}
> 04/18/2017 21:53:20   Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 46 for operator KeyedCEPPatternOperator -> Map (1/4).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 46 for 
> operator KeyedCEPPatternOperator -> Map (1/4).
>   ... 6 more
> Caused by: java.util.concurrent.CancellationException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:121)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
>   ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.

2017-10-12 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202073#comment-16202073
 ] 

Kostas Kloudas commented on FLINK-7760:
---

Btw, although this may not be a problem, you {{keyBy}} twice in your code, 
which is redundant: 

{code}
val stream = env.addSource(kafka10).keyBy(_._someKey.getOrElse(0)) // ONCE

val successOrderPatternStream = CEP.pattern(stream.keyBy((x) => 
(x._someKey.getOrElse(0), x._someSubKey.getOrElse(0))), successOrderPattern) 
//TWICE
{code}

> Restore failing from external checkpointing metadata.
> -
>
> Key: FLINK-7760
> URL: https://issues.apache.org/jira/browse/FLINK-7760
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, Flink 1.3.2, HDFS,  FsStateBackend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> My job failed due to failure of cassandra. I have enabled 
> ExternalizedCheckpoints. But when job tried to restore from that checkpoint 
> it's failing continuously with following error.
> {code:java}
> 2017-10-04 09:39:20,611 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - KeyedCEPPatternOperator -> Map (1/2) 
> (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.readBlockHeader(ObjectInputStream.java:2519)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.refill(ObjectInputStream.java:2553)
>   at 
> java.io.ObjectInputStream$BlockDataInputStream.skipBlockData(ObjectInputStream.java:2455)
>   at java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1951)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1621)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders$StateTableByKeyGroupReaderV2V3.readMappingsInKeyGroup(StateTableByKeyGroupReaders.java:132)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:518)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:397)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:772)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:311)
>   ... 6 more
> {code}
> I have tried to start new job also after failure with parameter {code:java} 
> -s [checkpoint meta data path]{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.

2017-10-12 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202054#comment-16202054
 ] 

Kostas Kloudas commented on FLINK-6321:
---

Thanks, in the meantime I will try to see what is happening ;)
This is the same pattern you posted at JIRA 
https://issues.apache.org/jira/browse/FLINK-7760 , right? 
The pattern has no conditions.

> RocksDB state backend Checkpointing is not working with KeyedCEP.
> -
>
> Key: FLINK-6321
> URL: https://issues.apache.org/jira/browse/FLINK-6321
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.2.0
> Environment: yarn-cluster, RocksDB State backend, Checkpointing every 
> 1000 ms
>Reporter: Shashank Agarwal
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Checkpointing is not working with RocksDBStateBackend when using CEP. It's 
> working fine with FsStateBackend and MemoryStateBackend. Application failing 
> every-time.
> {code}
> 04/18/2017 21:53:20   Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 46 for operator KeyedCEPPatternOperator -> Map (1/4).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 46 for 
> operator KeyedCEPPatternOperator -> Map (1/4).
>   ... 6 more
> Caused by: java.util.concurrent.CancellationException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:121)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
>   ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.

2017-10-12 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202035#comment-16202035
 ] 

Shashank Agarwal commented on FLINK-6321:
-

I will try to run that in next week.

> RocksDB state backend Checkpointing is not working with KeyedCEP.
> -
>
> Key: FLINK-6321
> URL: https://issues.apache.org/jira/browse/FLINK-6321
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.2.0
> Environment: yarn-cluster, RocksDB State backend, Checkpointing every 
> 1000 ms
>Reporter: Shashank Agarwal
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Checkpointing is not working with RocksDBStateBackend when using CEP. It's 
> working fine with FsStateBackend and MemoryStateBackend. Application failing 
> every-time.
> {code}
> 04/18/2017 21:53:20   Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 46 for operator KeyedCEPPatternOperator -> Map (1/4).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 46 for 
> operator KeyedCEPPatternOperator -> Map (1/4).
>   ... 6 more
> Caused by: java.util.concurrent.CancellationException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:121)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
>   ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.

2017-10-12 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202029#comment-16202029
 ] 

Kostas Kloudas commented on FLINK-6321:
---

Well the failure message is similar to this one 
https://issues.apache.org/jira/browse/FLINK-7435. 
Could you try what I am writing at the last message there? Just to see if there 
is any difference.

> RocksDB state backend Checkpointing is not working with KeyedCEP.
> -
>
> Key: FLINK-6321
> URL: https://issues.apache.org/jira/browse/FLINK-6321
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.2.0
> Environment: yarn-cluster, RocksDB State backend, Checkpointing every 
> 1000 ms
>Reporter: Shashank Agarwal
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Checkpointing is not working with RocksDBStateBackend when using CEP. It's 
> working fine with FsStateBackend and MemoryStateBackend. Application failing 
> every-time.
> {code}
> 04/18/2017 21:53:20   Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 46 for operator KeyedCEPPatternOperator -> Map (1/4).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 46 for 
> operator KeyedCEPPatternOperator -> Map (1/4).
>   ... 6 more
> Caused by: java.util.concurrent.CancellationException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:121)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
>   ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7818) Synchronize MetricStore access in the TaskManagersHandler

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202030#comment-16202030
 ] 

ASF GitHub Bot commented on FLINK-7818:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4811


> Synchronize MetricStore access in the TaskManagersHandler
> -
>
> Key: FLINK-7818
> URL: https://issues.apache.org/jira/browse/FLINK-7818
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, REST
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.4.0
>
>
> The {{TaskManagersHandler}} accesses the {{MetricStore}} when details for a 
> single {{TaskManager}} are requested. The access is not synchronized which 
> can be problematic because the {{MetricStore}} is not thread safe.
> I propose to add synchronization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4811: [FLINK-7818] Synchronize MetricStore access in Tas...

2017-10-12 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4811


---


[jira] [Closed] (FLINK-7818) Synchronize MetricStore access in the TaskManagersHandler

2017-10-12 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-7818.

   Resolution: Fixed
Fix Version/s: 1.4.0

Fixed via 742e4a0ffd2fb244654e97098d0b23100789d4e9

> Synchronize MetricStore access in the TaskManagersHandler
> -
>
> Key: FLINK-7818
> URL: https://issues.apache.org/jira/browse/FLINK-7818
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, REST
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
> Fix For: 1.4.0
>
>
> The {{TaskManagersHandler}} accesses the {{MetricStore}} when details for a 
> single {{TaskManager}} are requested. The access is not synchronized which 
> can be problematic because the {{MetricStore}} is not thread safe.
> I propose to add synchronization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7818) Synchronize MetricStore access in the TaskManagersHandler

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202024#comment-16202024
 ] 

ASF GitHub Bot commented on FLINK-7818:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4811
  
Thanks for the review @zentol. Merging this PR.


> Synchronize MetricStore access in the TaskManagersHandler
> -
>
> Key: FLINK-7818
> URL: https://issues.apache.org/jira/browse/FLINK-7818
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, REST
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{TaskManagersHandler}} accesses the {{MetricStore}} when details for a 
> single {{TaskManager}} are requested. The access is not synchronized which 
> can be problematic because the {{MetricStore}} is not thread safe.
> I propose to add synchronization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4811: [FLINK-7818] Synchronize MetricStore access in TaskManage...

2017-10-12 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4811
  
Thanks for the review @zentol. Merging this PR.


---


[jira] [Commented] (FLINK-7806) Move CurrentJobsOverviewHandler to jobs/overview

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202019#comment-16202019
 ] 

ASF GitHub Bot commented on FLINK-7806:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4805
  
I'm not entirely sure, but I think we cannot put the `JobsOverviewHandler` 
under `/jobs/overview`. The problem is that it collides with `/jobs/:jobid`. If 
I access `http://localhost:8081/jobs/overview` I get a 
`java.lang.IllegalArgumentException: contains illegal character for hexBinary: 
overview`. Not sure why it works when the call comes from the web ui.

So either we don't do this change or we register the handler under `/jobs`. 
Then we would simply return a detailed view when accessing jobs. In the future 
we could add an optional filter statement to not include all information. What 
do you think @zentol?


> Move CurrentJobsOverviewHandler to jobs/overview
> 
>
> Key: FLINK-7806
> URL: https://issues.apache.org/jira/browse/FLINK-7806
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{CurrentJobsOverviewHandler}} is currently registered under 
> {{/joboverview}}. I think it would be more idiomatic to register it under 
> {{/jobs/overview}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7760) Restore failing from external checkpointing metadata.

2017-10-12 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202018#comment-16202018
 ] 

Shashank Agarwal commented on FLINK-7760:
-

Ho [~kkl0u],

Hi [~kkl0u] ,

Actually it's too complicated with kafka streams and custom serializer. Above 
steps are correct but still I try to put some code. I have modified parameter 
names and some things in code. If you find any issue let me know.

{code}
object Job {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val propertiesFile = 
getClass.getClassLoader.getResource("xyz.properties").getPath
val parameter = ParameterTool.fromPropertiesFile(propertiesFile)

env.getConfig.setGlobalJobParameters(parameter)
env.setStateBackend(new FsStateBackend(parameter.get("hdfsSnapshotPath")))

// enable fault-tolerance
env.enableCheckpointing(1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// enable restarts
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L))

val properties = new Properties()
properties.setProperty("bootstrap.servers", parameter.get("kafkaUrl"))
properties.setProperty("group.id", parameter.get("kafkaGroupId"))
val kafka10 = new 
FlinkKafkaConsumer010[RawSignal](parameter.get("kafkaBundleName"), new 
SignalDeserializationSchema(), properties)

val stream = env.addSource(kafka10).keyBy(_._someKey.getOrElse(0))


//Creating a pattern for successful event

val successOrderPattern = Pattern.begin[RawSignal]("someEvent").
  .followedBy("otherEvent")

val successOrderPatternStream = CEP.pattern(stream.keyBy((x) => 
(x._someKey.getOrElse(0), x._someSubKey.getOrElse(0))), successOrderPattern)

val ordersStream: DataStream[TransactionSignal] =
  successOrderPatternStream.select(new TransactionPatternFlatMap)

//Put Ip count in the stream with maintaining the state
val ipStateStream = ordersStream.keyBy((x) => (x._someKey, x._deviceIp))
  .mapWithState((in: OrderSignal, ipState: Option[Int]) => {
if(!in._deviceIp.equalsIgnoreCase(parameter.get("defaultIp"))) {
  val newCount = ipState.getOrElse(0) + 1
  val output = in.copy(_numOfOrderSameIp = newCount)
  (output, Some(newCount))
}
else
  {
(in, Some(0))
  }
  }
  )
ipStateStream.print
env.execute("Thirdwatch Mitra")
{code}

Here is the kafka deserialiser i am using SignalDeserializationSchema

{code}
import RawSignal
import 
org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse

/**
  * Created by shashank on 13/01/17.
  *
  * Deserialize raw json string from kafka to Raw signal object.
  */
class SignalDeserializationSchema extends 
AbstractDeserializationSchema[RawSignal] {

  implicit lazy val formats = DefaultFormats

  override def deserialize(message: Array[Byte]): RawSignal = {
parse(new String(message)).extract[RawSignal]
  }

  override def isEndOfStream(nextElement: RawSignal): Boolean = false
}
{code}

and RawSignal Example class...

{code}
case class RawSignal(name: Option[String], email: Option[String], UserId: 
Option[String])
{code}

> Restore failing from external checkpointing metadata.
> -
>
> Key: FLINK-7760
> URL: https://issues.apache.org/jira/browse/FLINK-7760
> Project: Flink
>  Issue Type: Bug
>  Components: CEP, State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, Flink 1.3.2, HDFS,  FsStateBackend
>Reporter: Shashank Agarwal
>Priority: Blocker
> Fix For: 1.4.0
>
>
> My job failed due to failure of cassandra. I have enabled 
> ExternalizedCheckpoints. But when job tried to restore from that checkpoint 
> it's failing continuously with following error.
> {code:java}
> 2017-10-04 09:39:20,611 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - KeyedCEPPatternOperator -> Map (1/2) 
> (8ff7913f820ead571c8b54ccc6b16045) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize keyed state backend.
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:321)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:217)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:676)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:663)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:252)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: 

[jira] [Comment Edited] (FLINK-7760) Restore failing from external checkpointing metadata.

2017-10-12 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7760?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202018#comment-16202018
 ] 

Shashank Agarwal edited comment on FLINK-7760 at 10/12/17 2:31 PM:
---

Ho [~kkl0u],

Actually it's too complicated with kafka streams and custom serializer. Above 
steps are correct but still I try to put some code. I have modified parameter 
names and some things in code. If you find any issue let me know.

{code}
object Job {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val propertiesFile = 
getClass.getClassLoader.getResource("xyz.properties").getPath
val parameter = ParameterTool.fromPropertiesFile(propertiesFile)

env.getConfig.setGlobalJobParameters(parameter)
env.setStateBackend(new FsStateBackend(parameter.get("hdfsSnapshotPath")))

// enable fault-tolerance
env.enableCheckpointing(1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// enable restarts
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L))

val properties = new Properties()
properties.setProperty("bootstrap.servers", parameter.get("kafkaUrl"))
properties.setProperty("group.id", parameter.get("kafkaGroupId"))
val kafka10 = new 
FlinkKafkaConsumer010[RawSignal](parameter.get("kafkaBundleName"), new 
SignalDeserializationSchema(), properties)

val stream = env.addSource(kafka10).keyBy(_._someKey.getOrElse(0))


//Creating a pattern for successful event

val successOrderPattern = Pattern.begin[RawSignal]("someEvent").
  .followedBy("otherEvent")

val successOrderPatternStream = CEP.pattern(stream.keyBy((x) => 
(x._someKey.getOrElse(0), x._someSubKey.getOrElse(0))), successOrderPattern)

val ordersStream: DataStream[TransactionSignal] =
  successOrderPatternStream.select(new TransactionPatternFlatMap)

//Put Ip count in the stream with maintaining the state
val ipStateStream = ordersStream.keyBy((x) => (x._someKey, x._deviceIp))
  .mapWithState((in: OrderSignal, ipState: Option[Int]) => {
if(!in._deviceIp.equalsIgnoreCase(parameter.get("defaultIp"))) {
  val newCount = ipState.getOrElse(0) + 1
  val output = in.copy(_numOfOrderSameIp = newCount)
  (output, Some(newCount))
}
else
  {
(in, Some(0))
  }
  }
  )
ipStateStream.print
env.execute("Thirdwatch Mitra")
{code}

Here is the kafka deserialiser i am using SignalDeserializationSchema

{code}
import RawSignal
import 
org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods.parse

/**
  * Created by shashank on 13/01/17.
  *
  * Deserialize raw json string from kafka to Raw signal object.
  */
class SignalDeserializationSchema extends 
AbstractDeserializationSchema[RawSignal] {

  implicit lazy val formats = DefaultFormats

  override def deserialize(message: Array[Byte]): RawSignal = {
parse(new String(message)).extract[RawSignal]
  }

  override def isEndOfStream(nextElement: RawSignal): Boolean = false
}
{code}

and RawSignal Example class...

{code}
case class RawSignal(name: Option[String], email: Option[String], UserId: 
Option[String])
{code}


was (Author: shashank734):
Ho [~kkl0u],

Hi [~kkl0u] ,

Actually it's too complicated with kafka streams and custom serializer. Above 
steps are correct but still I try to put some code. I have modified parameter 
names and some things in code. If you find any issue let me know.

{code}
object Job {
  def main(args: Array[String]) {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val propertiesFile = 
getClass.getClassLoader.getResource("xyz.properties").getPath
val parameter = ParameterTool.fromPropertiesFile(propertiesFile)

env.getConfig.setGlobalJobParameters(parameter)
env.setStateBackend(new FsStateBackend(parameter.get("hdfsSnapshotPath")))

// enable fault-tolerance
env.enableCheckpointing(1000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

// enable restarts
  env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L))

val properties = new Properties()
properties.setProperty("bootstrap.servers", parameter.get("kafkaUrl"))
properties.setProperty("group.id", parameter.get("kafkaGroupId"))
val kafka10 = new 
FlinkKafkaConsumer010[RawSignal](parameter.get("kafkaBundleName"), new 
SignalDeserializationSchema(), properties)

val stream = env.addSource(kafka10).keyBy(_._someKey.getOrElse(0))


//Creating a pattern for successful event

val successOrderPattern = Pattern.begin[RawSignal]("someEvent").
  .followedBy("otherEvent")

val successOrderPatternStream = CEP.pattern(stream.keyBy((x) => 
(x._someKey.getOrElse(0), x._someSubKey.getOrElse(0))), successOrderPattern)

val ordersStream: DataStream[TransactionSignal] =
  

[GitHub] flink issue #4805: [FLINK-7806] [flip6] Register CurrentJobsOverviewHandler ...

2017-10-12 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4805
  
I'm not entirely sure, but I think we cannot put the `JobsOverviewHandler` 
under `/jobs/overview`. The problem is that it collides with `/jobs/:jobid`. If 
I access `http://localhost:8081/jobs/overview` I get a 
`java.lang.IllegalArgumentException: contains illegal character for hexBinary: 
overview`. Not sure why it works when the call comes from the web ui.

So either we don't do this change or we register the handler under `/jobs`. 
Then we would simply return a detailed view when accessing jobs. In the future 
we could add an optional filter statement to not include all information. What 
do you think @zentol?


---


[jira] [Comment Edited] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202007#comment-16202007
 ] 

Kostas Kloudas edited comment on FLINK-7435 at 10/12/17 2:28 PM:
-

Hi [~lidaiqing]. I am looking into this issue, and the problem is encountered 
when FlinkCEP tries to deserialize the conditions of your pattern 
({{NFASerializer.deserializeCondition}}). 

Could you please share more details about your job and even some code. For 
example the code of your pattern, the state backend you are using? 

Also, as I said in the previous comment, there is definitely a problem with the 
{{duplicate()}} method of the serializer. Could you please try this branch 
https://github.com/kl0u/flink/tree/cep-nfa-serializer-bug to see if it solves 
the problem? The main change is that the {{NFASerializer.duplicate()}} method 
becomes now:

{code}
@Override
public TypeSerializer duplicate() {
return new NFASerializer<>(eventSerializer.duplicate());
}
{code}

So if you have Flink's source code, then you can make this change yourself and 
try it out.


was (Author: kkl0u):
Hi [~lidaiqing]. I am looking into this issue, and the problem is encountered 
when FlinkCEP tries to deserialize the conditions of your pattern 
(`NFASerializer.deserializeCondition`). 

Could you please share more details about your job and even some code. For 
example the code of your pattern, the state backend you are using? 

Also, as I said in the previous comment, there is definitely a problem with the 
`duplicate()` method of the serializer. Could you please try this branch 
https://github.com/kl0u/flink/tree/cep-nfa-serializer-bug to see if it solves 
the problem? The main change is that the `NFASerializer.duplicate()` method 
becomes now:

```
@Override
public TypeSerializer duplicate() {
return new NFASerializer<>(eventSerializer.duplicate());
}
```

So if you have Flink's source code, then you can make this change yourself and 
try it out.

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> 

[jira] [Commented] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202007#comment-16202007
 ] 

Kostas Kloudas commented on FLINK-7435:
---

Hi [~lidaiqing]. I am looking into this issue, and the problem is encountered 
when FlinkCEP tries to deserialize the conditions of your pattern 
(`NFASerializer.deserializeCondition`). 

Could you please share more details about your job and even some code. For 
example the code of your pattern, the state backend you are using? 

Also, as I said in the previous comment, there is definitely a problem with the 
`duplicate()` method of the serializer. Could you please try this branch 
https://github.com/kl0u/flink/tree/cep-nfa-serializer-bug to see if it solves 
the problem? The main change is that the `NFASerializer.duplicate()` method 
becomes now:

```
@Override
public TypeSerializer duplicate() {
return new NFASerializer<>(eventSerializer.duplicate());
}
```

So if you have Flink's source code, then you can make this change yourself and 
try it out.

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at 

[GitHub] flink issue #4805: [FLINK-7806] [flip6] Register CurrentJobsOverviewHandler ...

2017-10-12 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4805
  
I reverted the changes to `vendor.css` and `vendor.js` and updated the 
`rest_api.md`.


---


[jira] [Commented] (FLINK-7806) Move CurrentJobsOverviewHandler to jobs/overview

2017-10-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16202001#comment-16202001
 ] 

ASF GitHub Bot commented on FLINK-7806:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4805
  
I reverted the changes to `vendor.css` and `vendor.js` and updated the 
`rest_api.md`.


> Move CurrentJobsOverviewHandler to jobs/overview
> 
>
> Key: FLINK-7806
> URL: https://issues.apache.org/jira/browse/FLINK-7806
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{CurrentJobsOverviewHandler}} is currently registered under 
> {{/joboverview}}. I think it would be more idiomatic to register it under 
> {{/jobs/overview}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6321) RocksDB state backend Checkpointing is not working with KeyedCEP.

2017-10-12 Thread Shashank Agarwal (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201999#comment-16201999
 ] 

Shashank Agarwal commented on FLINK-6321:
-

Hi, [~kkl0u] ,

My program is in Scala, But before sinking to cassandra i am converting stream 
to java stream. There are some java collection objects used. I am using FS 
backend for that.

> RocksDB state backend Checkpointing is not working with KeyedCEP.
> -
>
> Key: FLINK-6321
> URL: https://issues.apache.org/jira/browse/FLINK-6321
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.2.0
> Environment: yarn-cluster, RocksDB State backend, Checkpointing every 
> 1000 ms
>Reporter: Shashank Agarwal
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Checkpointing is not working with RocksDBStateBackend when using CEP. It's 
> working fine with FsStateBackend and MemoryStateBackend. Application failing 
> every-time.
> {code}
> 04/18/2017 21:53:20   Job execution switched to status FAILING.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 
> 46 for operator KeyedCEPPatternOperator -> Map (1/4).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:980)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 46 for 
> operator KeyedCEPPatternOperator -> Map (1/4).
>   ... 6 more
> Caused by: java.util.concurrent.CancellationException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:121)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:915)
>   ... 5 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP

2017-10-12 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16124875#comment-16124875
 ] 

Kostas Kloudas edited comment on FLINK-7435 at 10/12/17 2:15 PM:
-

The problem here seems to be on the {{duplicate()}} method of the 
{{NFASerializer}} which currently returns {{this}} and not an actual copy.


was (Author: kkl0u):
The problem here seems to be on the {{copy()}} method of the {{NFA}} which 
currently returns {{this}} and not an actual copy.

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --
>
> Key: FLINK-7435
> URL: https://issues.apache.org/jira/browse/FLINK-7435
> Project: Flink
>  Issue Type: Bug
>  Components: CEP
>Affects Versions: 1.3.1, 1.3.2
> Environment: AWS EMR YARN, use CEP with pattern start -> next 
> (oneOrMore.Optional.Consective) -> next(end). Store it with FsStatebackend 
> with Incremental option open. 
>Reporter: daiqing
>Assignee: Kostas Kloudas
>
> java.lang.RuntimeException: Exception occurred while processing valve output 
> watermark: 
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>   at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>   at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>   at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>   at 
> org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>   at 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>   at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>   ... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>   at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>   at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>   at 
> java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>   at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>   at 
> org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>   at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>   ... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >