[jira] [Updated] (FLINK-4640) Serialization of the initialValue of a Fold on WindowedStream fails

2016-09-20 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-4640:
-
Description: 
The following program

{code}
DataStream> src = env.fromElements(new Tuple2("a", 1L));

src
  .keyBy(1)
  .timeWindow(Time.minutes(5))
  .fold(TreeMultimap.create(), new FoldFunction, TreeMultimap>() {
@Override
public TreeMultimap fold(
TreeMultimap topKSoFar, 
Tuple2 itemCount) throws Exception 
{
  String item = itemCount.f0;
  Long count = itemCount.f1;
  topKSoFar.put(count, item);
  if (topKSoFar.keySet().size() > 10) {
topKSoFar.removeAll(topKSoFar.keySet().first());
  }
  return topKSoFar;
}
});
{code}

throws this exception

{quote}
Caused by: java.lang.RuntimeException: Could not add value to folding state.
at 
org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
at 
com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
at 
org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
at 
org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:109)
at 
org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:85)
... 6 more
{quote}

The exception is caused because the initial value was not correctly 
deserialized and is {{null}}.

The user reporting this issue said that using the same {{FoldFunction}} on a 
{{KeyedStream}} (without a window) works fine.

I tracked the problem down to the serialization of the {{StateDescriptor}}, 
i.e., the {{writeObject()}} and {{readObject()}} methods. The methods use 
Flink's TypeSerializers to serialize the default value. In case of the 
{{TreeMultiMap}} this is the {{KryoSerializer}} which fails to read the 
serialized data for some reason.

A quick workaround to solve this issue would be to check if the default value 
implements {{Serializable}} and use Java Serialization in this case. However, 
it would be good to track the root cause of this problem.

  was:
The following program

{code}
DataStream> src = env.fromElements(new Tuple2("a", 1L));

src
  .keyBy(1)
  .timeWindow(Time.minutes(5))
  .fold(TreeMultimap.create(), new FoldFunction, TreeMultimap>() {
@Override
public TreeMultimap fold(
TreeMultimap topKSoFar, 
Tuple2 itemCount) throws Exception 
{
  String item = itemCount.f0;
  Long count = itemCount.f1;
  topKSoFar.put(count, item);
  if (topKSoFar.keySet().size() > 10) {
topKSoFar.removeAll(topKSoFar.keySet().first());
  }
  return topKSoFar;
}
});
{code}

throws this exception

{quote}
Caused by: java.lang.RuntimeException: Could not add value to folding state.
at 
org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
at 
com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
at 
org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
at 

[jira] [Updated] (FLINK-4640) Serialization of the initialValue of a Fold on WindowedStream fails

2016-09-20 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4640:

Priority: Blocker  (was: Critical)

> Serialization of the initialValue of a Fold on WindowedStream fails
> ---
>
> Key: FLINK-4640
> URL: https://issues.apache.org/jira/browse/FLINK-4640
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.2.0, 1.1.2
>Reporter: Fabian Hueske
>Priority: Blocker
> Fix For: 1.2.0, 1.1.3
>
>
> The following program
> {code}
> DataStream> src = env.fromElements(new Tuple2 Long>("a", 1L));
> src
>   .keyBy(1)
>   .timeWindow(Time.minutes(5))
>   .fold(TreeMultimap.create(), new FoldFunction Long>, TreeMultimap>() {
> @Override
> public TreeMultimap fold(
> TreeMultimap topKSoFar, 
> Tuple2 itemCount) throws Exception 
> {
>   String item = itemCount.f0;
>   Long count = itemCount.f1;
>   topKSoFar.put(count, item);
>   if (topKSoFar.keySet().size() > 10) {
> topKSoFar.removeAll(topKSoFar.keySet().first());
>   }
>   return topKSoFar;
> }
> });
> {code}
> throws this exception
> {quote}
> Caused by: java.lang.RuntimeException: Could not add value to folding state.
>   at 
> org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
>   at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>   at 
> com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
>   at 
> com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
>   at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
>   at 
> org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
>   at 
> org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:109)
>   at 
> org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:85)
>   ... 6 more
> {quote}
> The exception is caused because the initial value was not correctly 
> deserialized and is {{null}}.
> Using the same {{FoldFunction}} on a {{KeyedStream}} (without a window) works 
> fine.
> I tracked the problem down to the serialization of the {{StateDescriptor}}, 
> i.e., the {{writeObject()}} and {{readObject()}} methods. The methods use 
> Flink's TypeSerializers to serialize the default value. In case of the 
> {{TreeMultiMap}} this is the {{KryoSerializer}} which fails to read the 
> serialized data for some reason.
> A quick workaround to solve this issue would be to check if the default value 
> implements {{Serializable}} and use Java Serialization in this case. However, 
> it would be good to track the root cause of this problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4640) Serialization of the initialValue of a Fold on WindowedStream fails

2016-09-20 Thread Fabian Hueske (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-4640:
-
Description: 
The following program

{code}
DataStream> src = env.fromElements(new Tuple2("a", 1L));

src
  .keyBy(1)
  .timeWindow(Time.minutes(5))
  .fold(TreeMultimap.create(), new FoldFunction, TreeMultimap>() {
@Override
public TreeMultimap fold(
TreeMultimap topKSoFar, 
Tuple2 itemCount) throws Exception 
{
  String item = itemCount.f0;
  Long count = itemCount.f1;
  topKSoFar.put(count, item);
  if (topKSoFar.keySet().size() > 10) {
topKSoFar.removeAll(topKSoFar.keySet().first());
  }
  return topKSoFar;
}
});
{code}

throws this exception

{quote}
Caused by: java.lang.RuntimeException: Could not add value to folding state.
at 
org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
at 
com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
at 
org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
at 
org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:109)
at 
org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:85)
... 6 more
{quote}

The exception is caused because the initial value was not correctly 
deserialized and is {{null}}.

Using the same {{FoldFunction}} on a {{KeyedStream}} (without a window) works 
fine.

I tracked the problem down to the serialization of the {{StateDescriptor}}, 
i.e., the {{writeObject()}} and {{readObject()}} methods. The methods use 
Flink's TypeSerializers to serialize the default value. In case of the 
{{TreeMultiMap}} this is the {{KryoSerializer}} which fails to read the 
serialized data for some reason.

A quick workaround to solve this issue would be to check if the default value 
implements {{Serializable}} and use Java Serialization in this case. However, 
it would be good to track the root cause of this problem.

  was:
The following program

{code}
DataStream> src = env.fromElements(new Tuple2("a", 1L));

src
  .keyBy(1)
  .timeWindow(Time.minutes(5))
  .fold(TreeMultimap.create(), new FoldFunction, TreeMultimap>() {
@Override
public TreeMultimap fold(
TreeMultimap topKSoFar, 
Tuple2 itemCount) throws Exception 
{
  String item = itemCount.f0;
  Long count = itemCount.f1;
  topKSoFar.put(count, item);
  if (topKSoFar.keySet().size() > 10) {
topKSoFar.removeAll(topKSoFar.keySet().first());
  }
  return topKSoFar;
}
});
{code}

throws this exception

{quote}
Caused by: java.lang.RuntimeException: Could not add value to folding state.
at 
org.apache.flink.runtime.state.memory.MemFoldingState.add(MemFoldingState.java:91)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:382)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at 
com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:192)
at 
com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:121)
at com.google.common.collect.TreeMultimap.put(TreeMultimap.java:78)
at 
org.apache.flink.streaming.examples.wordcount.WordCount$1.fold(WordCount.java:115)
at