Re: Nullpointerexception error when in repartition
Hi, I know it, but my purpose it to transforming json string in DataSet to Dataset, while spark.readStream can only support read json file in specified path. https://stackoverflow.com/questions/48617474/how-to-convert-json-dataset-to-dataframe-in-spark-structured-streaming gives an essential method, but the formats of every json data are not same. Either Spark java api seems not supporting grammer like .select(from_json($"value", colourSchema)) Regard, Junfeng Chen On Fri, Apr 13, 2018 at 7:09 AM, Tathagata Das wrote: > Have you read through the documentation of Structured Streaming? > https://spark.apache.org/docs/latest/structured-streaming- > programming-guide.html > > One of the basic mistakes you are making is defining the dataset as with > `spark.read()`. You define a streaming Dataset as `spark.readStream()` > > On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen wrote: > >> Hi, Tathagata >> >> I have tried structured streaming, but in line >> >>> Dataset rowDataset = spark.read().json(jsondataset); >> >> >> Always throw >> >>> Queries with streaming sources must be executed with writeStream.start() >> >> >> But what i need to do in this step is only transforming json string data >> to Dataset . How to fix it? >> >> Thanks! >> >> >> Regard, >> Junfeng Chen >> >> On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das < >> tathagata.das1...@gmail.com> wrote: >> >>> It's not very surprising that doing this sort of RDD to DF conversion >>> inside DStream.foreachRDD has weird corner cases like this. In fact, you >>> are going to have additional problems with partial parquet files (when >>> there are failures) in this approach. I strongly suggest that you use >>> Structured Streaming, which is designed to do this sort of processing. It >>> will take care of tracking the written parquet files correctly. >>> >>> TD >>> >>> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen >>> wrote: >>> I write a program to read some json data from kafka and purpose to save them to parquet file on hdfs. Here is my code: > JavaInputDstream stream = ... > JavaDstream rdd = stream.map... > rdd.repartition(taksNum).foreachRDD(VoldFunction > stringjavardd->{ > Dataset df = spark.read().json( stringjavardd ); // convert > json to df > JavaRDD rowJavaRDD = df.javaRDD().map... //add some new > fields > StructType type = df.schema()...; // constuct new type for new > added fields > Dataset //create new dataframe > newdf.repatition(taskNum).write().mode(SaveMode.Append).pati > tionedBy("appname").parquet(savepath); // save to parquet > }) However, if I remove the repartition method of newdf in writing parquet stage, the program always throw nullpointerexception error in json convert line: Java.lang.NullPointerException > at org.apache.spark.SparkContext.getPreferredLocs(SparkContext. > scala:1783) > ... While it looks make no sense, writing parquet operation should be in different stage with json transforming operation. So how to solve it? Thanks! Regard, Junfeng Chen >>> >>> >> >
Re: Nullpointerexception error when in repartition
Have you read through the documentation of Structured Streaming? https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html One of the basic mistakes you are making is defining the dataset as with `spark.read()`. You define a streaming Dataset as `spark.readStream()` On Thu, Apr 12, 2018 at 3:02 AM, Junfeng Chen wrote: > Hi, Tathagata > > I have tried structured streaming, but in line > >> Dataset rowDataset = spark.read().json(jsondataset); > > > Always throw > >> Queries with streaming sources must be executed with writeStream.start() > > > But what i need to do in this step is only transforming json string data > to Dataset . How to fix it? > > Thanks! > > > Regard, > Junfeng Chen > > On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> It's not very surprising that doing this sort of RDD to DF conversion >> inside DStream.foreachRDD has weird corner cases like this. In fact, you >> are going to have additional problems with partial parquet files (when >> there are failures) in this approach. I strongly suggest that you use >> Structured Streaming, which is designed to do this sort of processing. It >> will take care of tracking the written parquet files correctly. >> >> TD >> >> On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen wrote: >> >>> I write a program to read some json data from kafka and purpose to save >>> them to parquet file on hdfs. >>> Here is my code: >>> JavaInputDstream stream = ... JavaDstream rdd = stream.map... rdd.repartition(taksNum).foreachRDD(VoldFunction stringjavardd->{ Dataset df = spark.read().json( stringjavardd ); // convert json to df JavaRDD rowJavaRDD = df.javaRDD().map... //add some new fields StructType type = df.schema()...; // constuct new type for new added fields Dataset>>> //create new dataframe newdf.repatition(taskNum).write().mode(SaveMode.Append).pati tionedBy("appname").parquet(savepath); // save to parquet }) >>> >>> >>> >>> However, if I remove the repartition method of newdf in writing parquet >>> stage, the program always throw nullpointerexception error in json convert >>> line: >>> >>> Java.lang.NullPointerException at org.apache.spark.SparkContext.getPreferredLocs(SparkContext. scala:1783) ... >>> >>> >>> While it looks make no sense, writing parquet operation should be in >>> different stage with json transforming operation. >>> So how to solve it? Thanks! >>> >>> Regard, >>> Junfeng Chen >>> >> >> >
Re: Nullpointerexception error when in repartition
Hi, Tathagata I have tried structured streaming, but in line > Dataset rowDataset = spark.read().json(jsondataset); Always throw > Queries with streaming sources must be executed with writeStream.start() But what i need to do in this step is only transforming json string data to Dataset . How to fix it? Thanks! Regard, Junfeng Chen On Thu, Apr 12, 2018 at 3:08 PM, Tathagata Das wrote: > It's not very surprising that doing this sort of RDD to DF conversion > inside DStream.foreachRDD has weird corner cases like this. In fact, you > are going to have additional problems with partial parquet files (when > there are failures) in this approach. I strongly suggest that you use > Structured Streaming, which is designed to do this sort of processing. It > will take care of tracking the written parquet files correctly. > > TD > > On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen wrote: > >> I write a program to read some json data from kafka and purpose to save >> them to parquet file on hdfs. >> Here is my code: >> >>> JavaInputDstream stream = ... >>> JavaDstream rdd = stream.map... >>> rdd.repartition(taksNum).foreachRDD(VoldFunction >>> stringjavardd->{ >>> Dataset df = spark.read().json( stringjavardd ); // convert >>> json to df >>> JavaRDD rowJavaRDD = df.javaRDD().map... //add some new fields >>> StructType type = df.schema()...; // constuct new type for new added >>> fields >>> Dataset>> //create new dataframe >>> newdf.repatition(taskNum).write().mode(SaveMode.Append).pati >>> tionedBy("appname").parquet(savepath); // save to parquet >>> }) >> >> >> >> However, if I remove the repartition method of newdf in writing parquet >> stage, the program always throw nullpointerexception error in json convert >> line: >> >> Java.lang.NullPointerException >>> at org.apache.spark.SparkContext.getPreferredLocs(SparkContext. >>> scala:1783) >>> ... >> >> >> While it looks make no sense, writing parquet operation should be in >> different stage with json transforming operation. >> So how to solve it? Thanks! >> >> Regard, >> Junfeng Chen >> > >
Re: Nullpointerexception error when in repartition
It's not very surprising that doing this sort of RDD to DF conversion inside DStream.foreachRDD has weird corner cases like this. In fact, you are going to have additional problems with partial parquet files (when there are failures) in this approach. I strongly suggest that you use Structured Streaming, which is designed to do this sort of processing. It will take care of tracking the written parquet files correctly. TD On Wed, Apr 11, 2018 at 6:58 PM, Junfeng Chen wrote: > I write a program to read some json data from kafka and purpose to save > them to parquet file on hdfs. > Here is my code: > >> JavaInputDstream stream = ... >> JavaDstream rdd = stream.map... >> rdd.repartition(taksNum).foreachRDD(VoldFunction >> stringjavardd->{ >> Dataset df = spark.read().json( stringjavardd ); // convert >> json to df >> JavaRDD rowJavaRDD = df.javaRDD().map... //add some new fields >> StructType type = df.schema()...; // constuct new type for new added >> fields >> Dataset> //create new dataframe >> newdf.repatition(taskNum).write().mode(SaveMode.Append). >> patitionedBy("appname").parquet(savepath); // save to parquet >> }) > > > > However, if I remove the repartition method of newdf in writing parquet > stage, the program always throw nullpointerexception error in json convert > line: > > Java.lang.NullPointerException >> at org.apache.spark.SparkContext.getPreferredLocs(SparkContext. >> scala:1783) >> ... > > > While it looks make no sense, writing parquet operation should be in > different stage with json transforming operation. > So how to solve it? Thanks! > > Regard, > Junfeng Chen >