[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2021-04-29 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336783#comment-17336783
 ] 

Flink Jira Bot commented on FLINK-6115:
---

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>Priority: Major
>  Labels: stale-major
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17328808#comment-17328808
 ] 

Flink Jira Bot commented on FLINK-6115:
---

This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>Priority: Major
>  Labels: stale-major
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-09-23 Thread Bin Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177627#comment-16177627
 ] 

Bin Wang commented on FLINK-6115:
-

I am a new user of Flink, I would like to recommend to support null serializing 
in Tuple. This is what I am doing:
I am working on calculating some of the properties of a data instance. Before 
calculating, the property is null. I load the data from Cassandra DB. Firstly I 
need to filter which data hasn't been calculated. As you know, A null property 
cannot be filtered in Cassandra 2.x without secondary index. Than I am facing 
this problem. I want to use filter chain to find the data with null property, 
but filter is not work with data having property. :(

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian JIRA

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-31 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950505#comment-15950505
 ] 

Luke Hutchison commented on FLINK-6115:
---

[~Zentol] In point (1), I wasn't referring to the overhead of serialization at 
all -- I was referring to the memory and time overhead of wrapping types in 
{{Optional}}, as a workaround for the lack of {{null}} value support in tuples. 
Specifically, using {{Optional}} types requires an additional heap allocation, 
which incurs a 16-byte overhead on a 64-bit VM, and comes at significant speed 
cost (memory allocations and garbage collection are the principal reason for 
inefficiency in JITted Java vs. native code). ( {{Optional}} also adds a layer 
of indirection, which adds a marginal additional performance cost.)

In other words, the point I was making in (1) is that it is circular reasoning 
to say "we decided not to support null values in tuples for efficiency reasons 
-- so use {{Optional}} instead if you need null values".

The other case to consider is the impact on code that never needs {{null}} 
values. That is what my point (2) addresses: that even for code that doesn't 
use {{null}} , the overhead of adding a bitfield to the serialized format will 
have an extremely minimal performance impact, relative to the amount of time 
actually doing anything with the serialized format -- because inevitably when 
you serialize an object, you're going to store it or transmit it, which takes 
several orders of magnitude more time to do than serializing or deserializing. 
And the memory overhead is one bit per field, rounded up to the nearest byte.

So -- to summarize those two points more succintly:

# Not supporting serialization of {{null}} incurs a major overhead for code 
that needs to use {{null}} values, by requiring the use of {{Optional}} 
wrappers, which can incur a large performance and memory penalty.
# Supporting serialization of {{null}} would incur only a very minimal relative 
overhead on code that doesn't need to use {{null}} values.

In other words, so far the efficiency argument has been the strongest argument 
made in favor of the status quo -- however, the efficiency argument really 
doesn't hold water as a reason not to support {{null}} values in tuples.

Premature optimization is often a source of problems, which is why I asked if 
this has been benchmarked before. I have to imagine that somebody has benchmark 
numbers somewhere comparing different approaches.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-31 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950482#comment-15950482
 ] 

Chesnay Schepler commented on FLINK-6115:
-

[~lukehutch] Regardind 1)  I believe you're once again assuming that we would 
standard java serialization. Given that we can provide custom serializers for 
basically any type I can't see a reason why serializing Optional should be in 
any way more expensive than null.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-30 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950319#comment-15950319
 ] 

Luke Hutchison commented on FLINK-6115:
---

[~greghogan] You're assuming data with {{null}} values in it is "bad data", or 
that getting a {{null}} value is always going to be unexpected, or that it will 
trigger {{NullPointerException}}s downstream. Far from it, there is a huge 
range of valid and intentional uses for {{null}} values. Many programmers won't 
be able to use {{Optional}}, for years, due to being stuck on Java 7 for 
reasons beyond their control). But even with {{Optional}} available, many Java 
8 programmers still use {{null}} for a wide range of uses, and will until the 
language dies a very slow death. (It will always be impossible until the end of 
time, for example, to tell without an extra call if {{Map#get(key)}} returned 
{{null}} because there was no value corresponding to that key, or because 
{{null}} was mapped to the key -- but programmers know and expect this, for 
better or for worse.)

The most ironic thing about this whole conversation though is the claim that 
omitting {{null}} was done for performance reasons, followed by the 
recommendation to use {{Optional}}:

(1) both the performance overhead _and_ the memory overhead of wrapping types 
in {{Optional}} is _significantly higher_ than using a bitfield in the wire 
format of a serialized tuple to mark null fields;

(2) the overhead of actually _using_ a serialized tuple for any of the things 
you ever need to serialize one for (i.e. writing a serialized tuple to 
persistent storage, and/or sending it over the wire) takes orders of magnitude 
more time than the serialization and deserialization process, which makes the 
impact of adding a bitfield vanishingly small. Did you actually benchmark the 
impact of serializing {{null}} before concluding it would be too inefficient, 
and are those numbers available? Having hard numbers, particularly in a 
head-to-head comparison (both using tuples without {{null}}s, using {{Record}}s 
with nulls, and using tuples with {{Optional}}) would be an important factor in 
the community decision process that you linked.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-30 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949190#comment-15949190
 ] 

Greg Hogan commented on FLINK-6115:
---

[~zjureel] [~lukehutch] these are valid points. No one is saying that tuples 
should not support {{null}} serialization since that is a community decision. 
Before [proposing a 
change|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals]
 it is vital to understand the how and why of existing behavior as well as the 
effects of any change. Adding {{null}} serialization will likewise be a major 
point of surprise and pain for users of Flink.

As noted previously, serializing {{null}} does not prevent 
{{NullPointerException}} but rather forces all functions processing tuples to 
properly check for and handle {{null}}. I would suggest sanitizing data once 
when read rather than passing bad data downstream (and use `Optional` when 
applicable) where a NPE would also force job termination and restart. This 
issue is unrelated to the size of a {{DataSet}} and Flink has ample support for 
job recovery.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-29 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948412#comment-15948412
 ] 

Luke Hutchison commented on FLINK-6115:
---

[~greghogan]: fang yong's comment exactly illustrates the point: whether or not 
null values are efficient, and whether or not they were a good idea to add to 
Java in the first place, they are a fundamental part of the language _that is 
used, today, by millions of programmers, for reasons that go well beyond cases 
where something went wrong_ -- and not supporting this usage will only ever be 
a major point of surprise and pain for users of Flink. This is made worse by 
the fact that code that doesn't crash on small test datasets will start to 
crash on larger datasets, or in production, once things start to get 
serialized, as sounds like was experienced by fan yong.

It's already an error in Flink to use non-serializable types in parts of the 
computation graph where objects can be serialized. This is statically checked, 
for exactly the reason that Flink should not simply find out about the need to 
serialize a non-serializable object at runtime. Similarly, Flink should not be 
discovering non-serializable values (nulls) within a serializable object only 
at runtime.

If you strongly believe that tuples should never support nulls, then you should 
not allow tuple types to be used anywhere a serializable type is required, and 
enforce this in the computation graph builder/analyzer before graph execution 
even begins. Of course this will never happen, because tuples are too 
fundamental and useful -- but so is {{null}}. Ergo, tuples must support nulls. 
Sorry to belabor the point.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-29 Thread fang yong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948396#comment-15948396
 ] 

fang yong commented on FLINK-6115:
--

OMG, null value not be supported in tuple really suprised me, and my flink jobs 
always use tuple to define data.  I think thow NPE is not enough, it means when 
my running stream jobs were crashed, they will never recovery themselves. It 
may need a flag setted by user allow the null value to be serialized as needed, 
and I strongly believe that this should be the default, even if it will lose 
some performance of serialization

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-26 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942344#comment-15942344
 ] 

Greg Hogan commented on FLINK-6115:
---

Java 8 has {{Optional}} but I'm not seeing support for this in Flink (so would 
be serialized with Kryo). I suspect that users wanting these features are 
generally writing Flink programs in Scala.

I agree that left/right/full outer joins would be safer with {{Optional}} 
rather than {{null}} (this would be preclude use of the shared {{JoinFunction}} 
interface but I'm not sure how useful it is to reuse {{JoinFunction}} 
implementations).

{{DataSet}} and {{DataStream}} elements are not stored as objects. It is 
straightforward to add new types with a custom serializer and comparator. 
Savepoints [can be 
restored|https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/upgrading.html]
 against new program topologies and framework versions.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-25 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941882#comment-15941882
 ] 

Luke Hutchison commented on FLINK-6115:
---

[~greghogan] versioning of durable binary formats is not only standard 
practice, it is important.

Yes, allowing null values in Java was a big mistake, but option types didn't 
exist back when nullable references were added to Java. And an {{Option}} or 
{{Maybe}} type was the first thing I went looking for in Flink to address the 
non-nullable tuple field issue. Flink definitely needs an {{Option}} type until 
Java supports its own (just as having its own {{Tuple}} types is critical to 
the usefulness of Flink). (For that matter, {{flatMap}} should work over 
{{Option}} types, treating them as collected lists of length 0 or 1, I went 
looking for that too...)

However, the fact that null pointers (and general lack of strong nullability 
analysis) have caused no manner of pain to users of Java doesn't mean that they 
are not crucial to the way that the language works today, or to how programmers 
tend to use it. Even Flink uses null values the way they're generally used in 
Java in place of {{Option}} types -- e.g. giving you null values on an outer 
join when there is no corresponding key in one dataset.

Yes, throwing a NPE in tuple constructors misses the manual setting of field 
values, I mentioned that in an earlier comment. However, it actually highly 
surprised me when I noticed that the tuple fields were non-final, based on one 
of the first things I read in the Flink documentation: "Flink has the special 
classes {{DataSet}} and {{DataStream}} to represent data in a program. You can 
think of them as immutable collections of data that can contain duplicates." If 
the collections and streams that contain tuples are supposed to be thought of 
as immutable, why should the individual elements of those collections and 
streams be mutable? Perhaps tuple field values should be made final (which 
would of course be a breaking change for some users, and would probably 
especially require changes internally in the aggregation operator code).

Setting aside the issue of null fields in tuples, this will surely not be the 
last time that the serialization format will need to change! What if, for 
example, Flink needs to add support for some future new Java type, or needs to 
support another JVM language that requires some extra metadata of some form to 
be stored along with its serialized objects? I strongly recommend versioning 
the checkpoint files.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>  

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-24 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940376#comment-15940376
 ] 

Greg Hogan commented on FLINK-6115:
---

[~lukehutch] and [~fhueske], I'm not sure that versioning is necessary nor that 
the current behavior in regards to null values should be changed.

Allowing null values in tuples breaks the behavior of all user functions 
depending on the old, safe behavior. The creator of {{null}} has called it his 
[billion dollar 
mistake|https://en.wikipedia.org/wiki/Tony_Hoare#Apologies_and_retractions]. 
Just because we can serialize {{null}} does not mean that we should. I see 
Flink has an {{Either}} type but I don't see an {{Option}} type. Would this not 
be a cleaner way to process values explicitly?

As for versioning, schemas should generally be versioned but a tuple is 
literally a concatenation of values. Different representations (e.g. integer vs 
variable-length integer) should often be different types.

Throwing NPE in tuple constructors misses that fields may be set between object 
instantiation and serialization. Flink was designed for object reuse which 
improves performance when billions to trillions of elements are serialized in 
short order.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-24 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940101#comment-15940101
 ] 

Luke Hutchison commented on FLINK-6115:
---

Hi [~fhueske]],

So the wire format of individual serialized objects cannot be touched, but is 
there no version code or magic number at the beginning of savepoint files? If 
there is currently no magic number or version code, there should be one, for 
exactly this reason!

You could add a magic number and serialization number to the beginning of 
future savepoint files, with only a vanishingly small chance of spurious 
collision with legitimate values if these numbers are looked for in the first 
couple of words of existing savepoint files, assuming you only need backwards 
compatibility, not also forwards compatibility. Then you would be free to use 
different serializers depending on the wire format version.

If this is not done, then the Tuple constructors should probably throw 
NullPointerExceptions for now, until this is fixed. This is clearly not 
desirable, as not all tuples will be serialized -- but my point was that the 
user can't even predict or control when tuples are serialized, so this should 
probably be enforced at the point where the problem is introduced, and not 
later when the Flink runtime decides to serialize a tuple without the context 
that caused the problem being available.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-23 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938866#comment-15938866
 ] 

Fabian Hueske commented on FLINK-6115:
--

Hi Luke, I agree this is not expected behavior. 

However, it cannot be easily changed at the moment, because we cannot touch the 
serialization format. 
We need to ensure that serialized data can be deserialized across versions due 
to Flink's savepoint feature. There is some work going on to support serializer 
upgrades.
In addition tuples are internally used in many places because they are very 
efficient. 

Serializing POJOs is expensive because fields are read and written using Java 
Reflection. Some time ago was some work on code-generated POJO serializers 
which would have improved the situation.
{{Row}} is an alternative to tuples. It supports null values (by serializing a 
bitmask) and is not restricted to 25 fields (or 22 in case of Scala). 
However, they are not typed with generics, so you have to manually define 
{{RowTypeInfo}}.

Regarding the exception, I agree that the error message could be improved by 
pointing out that a null String array cannot be serialized.
Apart from the message, the stack trace shows that the serialization of a 
String array in a tuple failed.

Best, Fabian

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-22 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15937185#comment-15937185
 ] 

Luke Hutchison commented on FLINK-6115:
---

bq. Why do you think there is no penalty to encoding null?

I didn't know that POJO serialization was slower than tuple serialization, I 
based my assumption on a tuple being a POJO.

However, it still seems like the time penalty would still be minimal to have a 
bit vector at the beginning of a tuple's serialized representation indicating 
which fields were null. This would cost a few cycles per field to set or test a 
bit for a specific field, which would add maybe 50% to the amount of time 
needed to serialize or deserialize the bytes of an Integer, but would be 
negligible relative to the time needed to instantiate an object, or to 
deserialize most strings.

What am I missing?

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-22 Thread Greg Hogan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936558#comment-15936558
 ] 

Greg Hogan commented on FLINK-6115:
---

Qualifying the following by noting that we are discussing the Batch API. 
Collected values are not collected in memory but are serialized immediately 
(except for {{CollectionEnvironment}} which is used for unit tests). Values are 
not serialized when chaining operators, such as a join followed by a map. 
Serializing tuples is faster than serializing POJOs. Why do you think there is 
no penalty to encoding {{null}}?

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-20 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934071#comment-15934071
 ] 

Luke Hutchison commented on FLINK-6115:
---

It's quite easy to end up with null values in tuples though -- it's currently 
entirely valid to store values in tuples, they just can't be serialized -- and 
you can't always predict when tuples will be serialized -- e.g. (I suspect) 
code may work fine for a while in RAM on a single machine, but then when you 
scale your code up to run on a cluster, or even when code decides to spill to 
disk, suddenly it breaks. This is very poor behavior.

Even worse though is that it's exceedingly hard to tell where the problem is 
caused, as shown in the stack trace I posted. Not only is the location where 
the null value is set separated from the location where the problem is 
triggered on serialization, but the serialization trace doesn't tell you 
anything about where in the program the serializer was running, other than what 
operation type it was contained within.

Another common scenario in which null values get set in tuples is doing an 
outer join. Basically if the Flink policy is "we won't support nulls in tuples 
as valid, ever", then you should not be able to produce a tuple as a result of 
an outer join. More generally, you should simply throw an exception when the 
constructor of a tuple is called with a null parameter, so that the user is 
notified immediately of the invalid behavior, with the exception tied directly 
to where the null value setting happened. This would not be a perfect fix 
though, since the fields of a tuple are not final, so it is possible to simply 
set the field values to null directly.

I don't see any of these as good solutions to this issue. Really the best thing 
to do is find a way to efficiently serialize null values in tuples. Why exactly 
is it slower to support serializing null values in tuples than it is for a POJO 
or a {{Row}} object?

In theory, if you simply ran tuples through the POJO serializer, it should be 
able to serialize them fine, with the same efficiency that it can serialize 
regular POJOs (which are allowed to contain null values) -- so I don't see how 
or why this would incur a performance pentalty.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-20 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932716#comment-15932716
 ] 

Fabian Hueske commented on FLINK-6115:
--

Hi Luke, null values are not supported for performance reasons.
The {{getFieldNotNull()}} method was added at some later point in time.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-20 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932558#comment-15932558
 ] 

Luke Hutchison commented on FLINK-6115:
---

Hi Fabian, is there a fundamental reason why the Tuple serializers do not 
support null? If it is because of type inference, the compiler will give a 
warning or error if the type cannot be inferred from a null value.

The Tuple class has a {{getFieldNotNull(int pos)}} method, with the description 
"Gets the field at the specified position, throws NullFieldException if the 
field is null" -- so it sounds like Tuples were actually designed to support 
storing nulls, just not serializing them? (But serializing them can happen 
implicitly as Flink passes things around...)

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by 

[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-20 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932353#comment-15932353
 ] 

Fabian Hueske commented on FLINK-6115:
--

Hi [~lukehutch], the Tuple serializers do not support null fields. 
You can use {{Row}} or POJOs if you need support for null fields. 
Another option is to only use fields with types that support null serialization 
by themselves.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-19 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932059#comment-15932059
 ] 

Luke Hutchison commented on FLINK-6115:
---

N.B. the tuple field in question was of type String[], so maybe it's only array 
types that cannot be null? I'm pretty sure I have successfully used null values 
for String and Integer tuple fields before.

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field

2017-03-19 Thread Luke Hutchison (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931600#comment-15931600
 ] 

Luke Hutchison commented on FLINK-6115:
---

More generally, why is it not possible to serialize tuples with null fields? 
It's possible to serialize POJOs with null fields, right?

> Need more helpful error message when trying to serialize a tuple with a null 
> field
> --
>
> Key: FLINK-6115
> URL: https://issues.apache.org/jira/browse/FLINK-6115
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.2.0
>Reporter: Luke Hutchison
>
> When Flink tries to serialize a tuple with a null field, you get the 
> following, which has no information about where in the program the problem 
> occurred (all the stack trace lines are in Flink, not in user code).
> {noformat}
> Exception in thread "main" 
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>   at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>   at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.IllegalArgumentException: The record must not be null.
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73)
>   at 
> org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
>   at 
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>   at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>   at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113)
>   at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
>   at 
> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79)
>   at 
> org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
>   at 
> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
>   at 
> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108)
>   at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>   at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only thing I can tell from this is that it happened somewhere in a 
> flatMap (but I have dozens of them in my code). Surely there's a way to pull 
> out the source file name and line number from the program DAG node when 
> errors like this occur?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)