[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17336783#comment-17336783 ] Flink Jira Bot commented on FLINK-6115: --- This issue was labeled "stale-major" 7 ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Major > Labels: stale-major > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17328808#comment-17328808 ] Flink Jira Bot commented on FLINK-6115: --- This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.2.0 >Reporter: Luke Hutchison >Priority: Major > Labels: stale-major > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16177627#comment-16177627 ] Bin Wang commented on FLINK-6115: - I am a new user of Flink, I would like to recommend to support null serializing in Tuple. This is what I am doing: I am working on calculating some of the properties of a data instance. Before calculating, the property is null. I load the data from Cassandra DB. Firstly I need to filter which data hasn't been calculated. As you know, A null property cannot be filtered in Cassandra 2.x without secondary index. Than I am facing this problem. I want to use filter chain to find the data with null property, but filter is not work with data having property. :( > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950505#comment-15950505 ] Luke Hutchison commented on FLINK-6115: --- [~Zentol] In point (1), I wasn't referring to the overhead of serialization at all -- I was referring to the memory and time overhead of wrapping types in {{Optional}}, as a workaround for the lack of {{null}} value support in tuples. Specifically, using {{Optional}} types requires an additional heap allocation, which incurs a 16-byte overhead on a 64-bit VM, and comes at significant speed cost (memory allocations and garbage collection are the principal reason for inefficiency in JITted Java vs. native code). ( {{Optional}} also adds a layer of indirection, which adds a marginal additional performance cost.) In other words, the point I was making in (1) is that it is circular reasoning to say "we decided not to support null values in tuples for efficiency reasons -- so use {{Optional}} instead if you need null values". The other case to consider is the impact on code that never needs {{null}} values. That is what my point (2) addresses: that even for code that doesn't use {{null}} , the overhead of adding a bitfield to the serialized format will have an extremely minimal performance impact, relative to the amount of time actually doing anything with the serialized format -- because inevitably when you serialize an object, you're going to store it or transmit it, which takes several orders of magnitude more time to do than serializing or deserializing. And the memory overhead is one bit per field, rounded up to the nearest byte. So -- to summarize those two points more succintly: # Not supporting serialization of {{null}} incurs a major overhead for code that needs to use {{null}} values, by requiring the use of {{Optional}} wrappers, which can incur a large performance and memory penalty. # Supporting serialization of {{null}} would incur only a very minimal relative overhead on code that doesn't need to use {{null}} values. In other words, so far the efficiency argument has been the strongest argument made in favor of the status quo -- however, the efficiency argument really doesn't hold water as a reason not to support {{null}} values in tuples. Premature optimization is often a source of problems, which is why I asked if this has been benchmarked before. I have to imagine that somebody has benchmark numbers somewhere comparing different approaches. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at >
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950482#comment-15950482 ] Chesnay Schepler commented on FLINK-6115: - [~lukehutch] Regardind 1) I believe you're once again assuming that we would standard java serialization. Given that we can provide custom serializers for basically any type I can't see a reason why serializing Optional should be in any way more expensive than null. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15950319#comment-15950319 ] Luke Hutchison commented on FLINK-6115: --- [~greghogan] You're assuming data with {{null}} values in it is "bad data", or that getting a {{null}} value is always going to be unexpected, or that it will trigger {{NullPointerException}}s downstream. Far from it, there is a huge range of valid and intentional uses for {{null}} values. Many programmers won't be able to use {{Optional}}, for years, due to being stuck on Java 7 for reasons beyond their control). But even with {{Optional}} available, many Java 8 programmers still use {{null}} for a wide range of uses, and will until the language dies a very slow death. (It will always be impossible until the end of time, for example, to tell without an extra call if {{Map#get(key)}} returned {{null}} because there was no value corresponding to that key, or because {{null}} was mapped to the key -- but programmers know and expect this, for better or for worse.) The most ironic thing about this whole conversation though is the claim that omitting {{null}} was done for performance reasons, followed by the recommendation to use {{Optional}}: (1) both the performance overhead _and_ the memory overhead of wrapping types in {{Optional}} is _significantly higher_ than using a bitfield in the wire format of a serialized tuple to mark null fields; (2) the overhead of actually _using_ a serialized tuple for any of the things you ever need to serialize one for (i.e. writing a serialized tuple to persistent storage, and/or sending it over the wire) takes orders of magnitude more time than the serialization and deserialization process, which makes the impact of adding a bitfield vanishingly small. Did you actually benchmark the impact of serializing {{null}} before concluding it would be too inefficient, and are those numbers available? Having hard numbers, particularly in a head-to-head comparison (both using tuples without {{null}}s, using {{Record}}s with nulls, and using tuples with {{Optional}}) would be an important factor in the community decision process that you linked. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15949190#comment-15949190 ] Greg Hogan commented on FLINK-6115: --- [~zjureel] [~lukehutch] these are valid points. No one is saying that tuples should not support {{null}} serialization since that is a community decision. Before [proposing a change|https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals] it is vital to understand the how and why of existing behavior as well as the effects of any change. Adding {{null}} serialization will likewise be a major point of surprise and pain for users of Flink. As noted previously, serializing {{null}} does not prevent {{NullPointerException}} but rather forces all functions processing tuples to properly check for and handle {{null}}. I would suggest sanitizing data once when read rather than passing bad data downstream (and use `Optional` when applicable) where a NPE would also force job termination and restart. This issue is unrelated to the size of a {{DataSet}} and Flink has ample support for job recovery. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948412#comment-15948412 ] Luke Hutchison commented on FLINK-6115: --- [~greghogan]: fang yong's comment exactly illustrates the point: whether or not null values are efficient, and whether or not they were a good idea to add to Java in the first place, they are a fundamental part of the language _that is used, today, by millions of programmers, for reasons that go well beyond cases where something went wrong_ -- and not supporting this usage will only ever be a major point of surprise and pain for users of Flink. This is made worse by the fact that code that doesn't crash on small test datasets will start to crash on larger datasets, or in production, once things start to get serialized, as sounds like was experienced by fan yong. It's already an error in Flink to use non-serializable types in parts of the computation graph where objects can be serialized. This is statically checked, for exactly the reason that Flink should not simply find out about the need to serialize a non-serializable object at runtime. Similarly, Flink should not be discovering non-serializable values (nulls) within a serializable object only at runtime. If you strongly believe that tuples should never support nulls, then you should not allow tuple types to be used anywhere a serializable type is required, and enforce this in the computation graph builder/analyzer before graph execution even begins. Of course this will never happen, because tuples are too fundamental and useful -- but so is {{null}}. Ergo, tuples must support nulls. Sorry to belabor the point. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at >
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15948396#comment-15948396 ] fang yong commented on FLINK-6115: -- OMG, null value not be supported in tuple really suprised me, and my flink jobs always use tuple to define data. I think thow NPE is not enough, it means when my running stream jobs were crashed, they will never recovery themselves. It may need a flag setted by user allow the null value to be serialized as needed, and I strongly believe that this should be the default, even if it will lose some performance of serialization > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15942344#comment-15942344 ] Greg Hogan commented on FLINK-6115: --- Java 8 has {{Optional}} but I'm not seeing support for this in Flink (so would be serialized with Kryo). I suspect that users wanting these features are generally writing Flink programs in Scala. I agree that left/right/full outer joins would be safer with {{Optional}} rather than {{null}} (this would be preclude use of the shared {{JoinFunction}} interface but I'm not sure how useful it is to reuse {{JoinFunction}} implementations). {{DataSet}} and {{DataStream}} elements are not stored as objects. It is straightforward to add new types with a custom serializer and comparator. Savepoints [can be restored|https://ci.apache.org/projects/flink/flink-docs-release-1.2/ops/upgrading.html] against new program topologies and framework versions. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15941882#comment-15941882 ] Luke Hutchison commented on FLINK-6115: --- [~greghogan] versioning of durable binary formats is not only standard practice, it is important. Yes, allowing null values in Java was a big mistake, but option types didn't exist back when nullable references were added to Java. And an {{Option}} or {{Maybe}} type was the first thing I went looking for in Flink to address the non-nullable tuple field issue. Flink definitely needs an {{Option}} type until Java supports its own (just as having its own {{Tuple}} types is critical to the usefulness of Flink). (For that matter, {{flatMap}} should work over {{Option}} types, treating them as collected lists of length 0 or 1, I went looking for that too...) However, the fact that null pointers (and general lack of strong nullability analysis) have caused no manner of pain to users of Java doesn't mean that they are not crucial to the way that the language works today, or to how programmers tend to use it. Even Flink uses null values the way they're generally used in Java in place of {{Option}} types -- e.g. giving you null values on an outer join when there is no corresponding key in one dataset. Yes, throwing a NPE in tuple constructors misses the manual setting of field values, I mentioned that in an earlier comment. However, it actually highly surprised me when I noticed that the tuple fields were non-final, based on one of the first things I read in the Flink documentation: "Flink has the special classes {{DataSet}} and {{DataStream}} to represent data in a program. You can think of them as immutable collections of data that can contain duplicates." If the collections and streams that contain tuples are supposed to be thought of as immutable, why should the individual elements of those collections and streams be mutable? Perhaps tuple field values should be made final (which would of course be a breaking change for some users, and would probably especially require changes internally in the aggregation operator code). Setting aside the issue of null fields in tuples, this will surely not be the last time that the serialization format will need to change! What if, for example, Flink needs to add support for some future new Java type, or needs to support another JVM language that requires some extra metadata of some form to be stored along with its serialized objects? I strongly recommend versioning the checkpoint files. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) >
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940376#comment-15940376 ] Greg Hogan commented on FLINK-6115: --- [~lukehutch] and [~fhueske], I'm not sure that versioning is necessary nor that the current behavior in regards to null values should be changed. Allowing null values in tuples breaks the behavior of all user functions depending on the old, safe behavior. The creator of {{null}} has called it his [billion dollar mistake|https://en.wikipedia.org/wiki/Tony_Hoare#Apologies_and_retractions]. Just because we can serialize {{null}} does not mean that we should. I see Flink has an {{Either}} type but I don't see an {{Option}} type. Would this not be a cleaner way to process values explicitly? As for versioning, schemas should generally be versioned but a tuple is literally a concatenation of values. Different representations (e.g. integer vs variable-length integer) should often be different types. Throwing NPE in tuple constructors misses that fields may be set between object instantiation and serialization. Flink was designed for object reuse which improves performance when billions to trillions of elements are serialized in short order. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at >
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15940101#comment-15940101 ] Luke Hutchison commented on FLINK-6115: --- Hi [~fhueske]], So the wire format of individual serialized objects cannot be touched, but is there no version code or magic number at the beginning of savepoint files? If there is currently no magic number or version code, there should be one, for exactly this reason! You could add a magic number and serialization number to the beginning of future savepoint files, with only a vanishingly small chance of spurious collision with legitimate values if these numbers are looked for in the first couple of words of existing savepoint files, assuming you only need backwards compatibility, not also forwards compatibility. Then you would be free to use different serializers depending on the wire format version. If this is not done, then the Tuple constructors should probably throw NullPointerExceptions for now, until this is fixed. This is clearly not desirable, as not all tuples will be serialized -- but my point was that the user can't even predict or control when tuples are serialized, so this should probably be enforced at the point where the problem is introduced, and not later when the Flink runtime decides to serialize a tuple without the context that caused the problem being available. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at >
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15938866#comment-15938866 ] Fabian Hueske commented on FLINK-6115: -- Hi Luke, I agree this is not expected behavior. However, it cannot be easily changed at the moment, because we cannot touch the serialization format. We need to ensure that serialized data can be deserialized across versions due to Flink's savepoint feature. There is some work going on to support serializer upgrades. In addition tuples are internally used in many places because they are very efficient. Serializing POJOs is expensive because fields are read and written using Java Reflection. Some time ago was some work on code-generated POJO serializers which would have improved the situation. {{Row}} is an alternative to tuples. It supports null values (by serializing a bitmask) and is not restricted to 25 fields (or 22 in case of Scala). However, they are not typed with generics, so you have to manually define {{RowTypeInfo}}. Regarding the exception, I agree that the error message could be improved by pointing out that a null String array cannot be serialized. Apart from the message, the stack trace shows that the serialization of a String array in a tuple failed. Best, Fabian > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15937185#comment-15937185 ] Luke Hutchison commented on FLINK-6115: --- bq. Why do you think there is no penalty to encoding null? I didn't know that POJO serialization was slower than tuple serialization, I based my assumption on a tuple being a POJO. However, it still seems like the time penalty would still be minimal to have a bit vector at the beginning of a tuple's serialized representation indicating which fields were null. This would cost a few cycles per field to set or test a bit for a specific field, which would add maybe 50% to the amount of time needed to serialize or deserialize the bytes of an Integer, but would be negligible relative to the time needed to instantiate an object, or to deserialize most strings. What am I missing? > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15936558#comment-15936558 ] Greg Hogan commented on FLINK-6115: --- Qualifying the following by noting that we are discussing the Batch API. Collected values are not collected in memory but are serialized immediately (except for {{CollectionEnvironment}} which is used for unit tests). Values are not serialized when chaining operators, such as a join followed by a map. Serializing tuples is faster than serializing POJOs. Why do you think there is no penalty to encoding {{null}}? > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15934071#comment-15934071 ] Luke Hutchison commented on FLINK-6115: --- It's quite easy to end up with null values in tuples though -- it's currently entirely valid to store values in tuples, they just can't be serialized -- and you can't always predict when tuples will be serialized -- e.g. (I suspect) code may work fine for a while in RAM on a single machine, but then when you scale your code up to run on a cluster, or even when code decides to spill to disk, suddenly it breaks. This is very poor behavior. Even worse though is that it's exceedingly hard to tell where the problem is caused, as shown in the stack trace I posted. Not only is the location where the null value is set separated from the location where the problem is triggered on serialization, but the serialization trace doesn't tell you anything about where in the program the serializer was running, other than what operation type it was contained within. Another common scenario in which null values get set in tuples is doing an outer join. Basically if the Flink policy is "we won't support nulls in tuples as valid, ever", then you should not be able to produce a tuple as a result of an outer join. More generally, you should simply throw an exception when the constructor of a tuple is called with a null parameter, so that the user is notified immediately of the invalid behavior, with the exception tied directly to where the null value setting happened. This would not be a perfect fix though, since the fields of a tuple are not final, so it is possible to simply set the field values to null directly. I don't see any of these as good solutions to this issue. Really the best thing to do is find a way to efficiently serialize null values in tuples. Why exactly is it slower to support serializing null values in tuples than it is for a POJO or a {{Row}} object? In theory, if you simply ran tuples through the POJO serializer, it should be able to serialize them fine, with the same efficiency that it can serialize regular POJOs (which are allowed to contain null values) -- so I don't see how or why this would incur a performance pentalty. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at >
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932716#comment-15932716 ] Fabian Hueske commented on FLINK-6115: -- Hi Luke, null values are not supported for performance reasons. The {{getFieldNotNull()}} method was added at some later point in time. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932558#comment-15932558 ] Luke Hutchison commented on FLINK-6115: --- Hi Fabian, is there a fundamental reason why the Tuple serializers do not support null? If it is because of type inference, the compiler will give a warning or error if the type cannot be inferred from a null value. The Tuple class has a {{getFieldNotNull(int pos)}} method, with the description "Gets the field at the specified position, throws NullFieldException if the field is null" -- so it sounds like Tuples were actually designed to support storing nulls, just not serializing them? (But serializing them can happen implicitly as Flink passes things around...) > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932353#comment-15932353 ] Fabian Hueske commented on FLINK-6115: -- Hi [~lukehutch], the Tuple serializers do not support null fields. You can use {{Row}} or POJOs if you need support for null fields. Another option is to only use fields with types that support null serialization by themselves. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15932059#comment-15932059 ] Luke Hutchison commented on FLINK-6115: --- N.B. the tuple field in question was of type String[], so maybe it's only array types that cannot be null? I'm pretty sure I have successfully used null values for String and Integer tuple fields before. > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6115) Need more helpful error message when trying to serialize a tuple with a null field
[ https://issues.apache.org/jira/browse/FLINK-6115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15931600#comment-15931600 ] Luke Hutchison commented on FLINK-6115: --- More generally, why is it not possible to serialize tuples with null fields? It's possible to serialize POJOs with null fields, right? > Need more helpful error message when trying to serialize a tuple with a null > field > -- > > Key: FLINK-6115 > URL: https://issues.apache.org/jira/browse/FLINK-6115 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.2.0 >Reporter: Luke Hutchison > > When Flink tries to serialize a tuple with a null field, you get the > following, which has no information about where in the program the problem > occurred (all the stack trace lines are in Flink, not in user code). > {noformat} > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.IllegalArgumentException: The record must not be null. > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:73) > at > org.apache.flink.api.common.typeutils.base.array.StringArraySerializer.serialize(StringArraySerializer.java:33) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:77) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:113) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) > at > org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:79) > at > org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) > at > org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:108) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > at java.lang.Thread.run(Thread.java:745) > {noformat} > The only thing I can tell from this is that it happened somewhere in a > flatMap (but I have dozens of them in my code). Surely there's a way to pull > out the source file name and line number from the program DAG node when > errors like this occur? -- This message was sent by Atlassian JIRA (v6.3.15#6346)