Hi Shiti,

here is the issue [1].

Cheers,
Till

[1] https://issues.apache.org/jira/browse/FLINK-2203

On Thu, Jun 11, 2015 at 8:42 AM Shiti Saxena <ssaxena....@gmail.com> wrote:

> Hi Aljoscha,
>
> Could you please point me to the JIRA tickets? If you could provide some
> guidance on how to resolve these, I will work on them and raise a
> pull-request.
>
> Thanks,
> Shiti
>
> On Thu, Jun 11, 2015 at 11:31 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi,
>> yes, I think the problem is that the RowSerializer does not support
>> null-values. I think we can add support for this, I will open a Jira issue.
>>
>> Another problem I then see is that the aggregations can not properly deal
>> with null-values. This would need separate support.
>>
>> Regards,
>> Aljoscha
>>
>> On Thu, 11 Jun 2015 at 06:41 Shiti Saxena <ssaxena....@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> In our project, we are using the Flink Table API and are facing the
>>> following issues,
>>>
>>> We load data from a CSV file and create a DataSet[Row]. The CSV file can
>>> also have invalid entries in some of the fields which we replace with null
>>> when building the DataSet[Row].
>>>
>>> This DataSet[Row] is later on transformed to Table whenever required and
>>> specific operation such as select or aggregate, etc are performed.
>>>
>>> When a null value is encountered, we get a null pointer exception and
>>> the whole job fails. (We can see this by calling collect on the resulting
>>> DataSet).
>>>
>>> The error message is similar to,
>>>
>>> Job execution failed.
>>> org.apache.flink.runtime.client.JobExecutionException: Job execution
>>> failed.
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:315)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>> at
>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>> at
>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:94)
>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> Caused by: java.lang.NullPointerException
>>> at
>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:63)
>>> at
>>> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:27)
>>> at
>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:80)
>>> at
>>> org.apache.flink.api.table.typeinfo.RowSerializer.serialize(RowSerializer.scala:28)
>>> at
>>> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:51)
>>> at
>>> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>>> at
>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:83)
>>> at
>>> org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
>>> at
>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> at
>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:177)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>> at java.lang.Thread.run(Thread.java:724)
>>>
>>> Could this be because the RowSerializer does not support null values?
>>> (Similar to Flink-629 <https://issues.apache.org/jira/browse/FLINK-629>
>>>  )
>>>
>>> Currently, to overcome this issue, we are ignoring all the rows which
>>> may have null values. For example, we have a method cleanData defined as,
>>>
>>> def cleanData(table:Table, relevantColumns:Seq[String]):Table = {
>>>     val whereClause: String = relevantColumns.map{
>>>         cName=>
>>>             s"$cName.isNotNull"
>>>     }.mkString(" && ")
>>>
>>>     val result :Table =
>>> table.select(relevantColumns.mkString(",")).where(whereClause)
>>>     result
>>> }
>>>
>>> Before operating on any Table, we use this method and then continue with
>>> task.
>>>
>>> Is this the right way to handle this? If not please let me know how to
>>> go about it.
>>>
>>>
>>> Thanks,
>>> Shiti
>>>
>>>
>>>
>>>
>

Reply via email to