Re: Imported CSV file content isn't identical to the original file
This Error message does not appear as I upgraded to 1.6.0 . -- Cheers, Todd Leo On Tue, Feb 9, 2016 at 9:07 AM SLiZn Liu <sliznmail...@gmail.com> wrote: > At least works for me though, temporarily disabled Kyro serilizer until > upgrade to 1.6.0. Appreciate for your update. :) > Luciano Resende <luckbr1...@gmail.com>于2016年2月9日 周二02:37写道: > >> Sorry, same expected results with trunk and Kryo serializer >> >> On Mon, Feb 8, 2016 at 4:15 AM, SLiZn Liu <sliznmail...@gmail.com> wrote: >> >>> I’ve found the trigger of my issue: if I start my spark-shell or submit >>> by spark-submit with --conf >>> spark.serializer=org.apache.spark.serializer.KryoSerializer, the >>> DataFrame content goes wrong, as I described earlier. >>> >>> >>> On Mon, Feb 8, 2016 at 5:42 PM SLiZn Liu <sliznmail...@gmail.com> wrote: >>> >>>> Thanks Luciano, now it looks like I’m the only guy who have this issue. >>>> My options is narrowed down to upgrade my spark to 1.6.0, to see if this >>>> issue is gone. >>>> >>>> — >>>> Cheers, >>>> Todd Leo >>>> >>>> >>>> >>>> On Mon, Feb 8, 2016 at 2:12 PM Luciano Resende <luckbr1...@gmail.com> >>>> wrote: >>>> >>>>> I tried in both 1.5.0, 1.6.0 and 2.0.0 trunk and >>>>> com.databricks:spark-csv_2.10:1.3.0 with expected results, where the >>>>> columns seem to be read properly. >>>>> >>>>> +--+--+ >>>>> |C0|C1| >>>>> +--+--+ >>>>> >>>>> |1446566430 | 2015-11-0400:00:30| >>>>> |1446566430 | 2015-11-0400:00:30| >>>>> |1446566430 | 2015-11-0400:00:30| >>>>> |1446566430 | 2015-11-0400:00:30| >>>>> |1446566430 | 2015-11-0400:00:30| >>>>> |1446566431 | 2015-11-0400:00:31| >>>>> |1446566431 | 2015-11-0400:00:31| >>>>> |1446566431 | 2015-11-0400:00:31| >>>>> |1446566431 | 2015-11-0400:00:31| >>>>> |1446566431 | 2015-11-0400:00:31| >>>>> +--+--+ >>>>> >>>>> >>>>> >>>>> >>>>> On Sat, Feb 6, 2016 at 11:44 PM, SLiZn Liu <sliznmail...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Spark Users Group, >>>>>> >>>>>> I have a csv file to analysis with Spark, but I’m troubling with >>>>>> importing as DataFrame. >>>>>> >>>>>> Here’s the minimal reproducible example. Suppose I’m having a >>>>>> *10(rows)x2(cols)* *space-delimited csv* file, shown as below: >>>>>> >>>>>> 1446566430 2015-11-0400:00:30 >>>>>> 1446566430 2015-11-0400:00:30 >>>>>> 1446566430 2015-11-0400:00:30 >>>>>> 1446566430 2015-11-0400:00:30 >>>>>> 1446566430 2015-11-0400:00:30 >>>>>> 1446566431 2015-11-0400:00:31 >>>>>> 1446566431 2015-11-0400:00:31 >>>>>> 1446566431 2015-11-0400:00:31 >>>>>> 1446566431 2015-11-0400:00:31 >>>>>> 1446566431 2015-11-0400:00:31 >>>>>> >>>>>> the in column 2 represents sub-delimiter within that column, >>>>>> and this file is stored on HDFS, let’s say the path is >>>>>> hdfs:///tmp/1.csv >>>>>> >>>>>> I’m using *spark-csv* to import this file as Spark *DataFrame*: >>>>>> >>>>>> sqlContext.read.format("com.databricks.spark.csv") >>>>>> .option("header", "false") // Use first line of all files as >>>>>> header >>>>>> .option("inferSchema", "false") // Automatically infer data types >>>>>> .option("delimiter", " ") >>>>>> .load("hdfs:///tmp/1.csv") >>>>>> .show >>>>>> >>>>>> Oddly, the output shows only a part of each column: >>>>>> >>>>>> [image: Screenshot from 2016-02-07 15-27-51.png] >>>>>> >>>>>> and even the boundary of the table wasn’t shown correctly. I also >>>>>> used the other way to read csv file, by sc.textFile(...).map(_.split(" >>>>>> ")) and sqlContext.createDataFrame, and the result is the same. Can >>>>>> someone point me out where I did it wrong? >>>>>> >>>>>> — >>>>>> BR, >>>>>> Todd Leo >>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Luciano Resende >>>>> http://people.apache.org/~lresende >>>>> http://twitter.com/lresende1975 >>>>> http://lresende.blogspot.com/ >>>>> >>>> >> >> >> -- >> Luciano Resende >> http://people.apache.org/~lresende >> http://twitter.com/lresende1975 >> http://lresende.blogspot.com/ >> >
Is this Task Scheduler Error normal?
Hi Spark Users, I’m running Spark jobs on Mesos, and sometimes I get vast number of Task Scheduler Errors: ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1161 because its task set is gone (this is likely the result of receiving duplicate task finished status updates)T It looks just a warning message despite begins with ERROR, and do no harm the eventual result, but I don’t know if these indicates tasks run slower than usual. -- BR, Todd Leo
Re: Imported CSV file content isn't identical to the original file
Thanks Luciano, now it looks like I’m the only guy who have this issue. My options is narrowed down to upgrade my spark to 1.6.0, to see if this issue is gone. — Cheers, Todd Leo On Mon, Feb 8, 2016 at 2:12 PM Luciano Resende <luckbr1...@gmail.com> wrote: > I tried in both 1.5.0, 1.6.0 and 2.0.0 trunk and > com.databricks:spark-csv_2.10:1.3.0 with expected results, where the > columns seem to be read properly. > > +--+--+ > |C0|C1| > +--+--+ > > |1446566430 | 2015-11-0400:00:30| > |1446566430 | 2015-11-0400:00:30| > |1446566430 | 2015-11-0400:00:30| > |1446566430 | 2015-11-0400:00:30| > |1446566430 | 2015-11-0400:00:30| > |1446566431 | 2015-11-0400:00:31| > |1446566431 | 2015-11-0400:00:31| > |1446566431 | 2015-11-0400:00:31| > |1446566431 | 2015-11-0400:00:31| > |1446566431 | 2015-11-0400:00:31| > +--+------+ > > > > > On Sat, Feb 6, 2016 at 11:44 PM, SLiZn Liu <sliznmail...@gmail.com> wrote: > >> Hi Spark Users Group, >> >> I have a csv file to analysis with Spark, but I’m troubling with >> importing as DataFrame. >> >> Here’s the minimal reproducible example. Suppose I’m having a >> *10(rows)x2(cols)* *space-delimited csv* file, shown as below: >> >> 1446566430 2015-11-0400:00:30 >> 1446566430 2015-11-0400:00:30 >> 1446566430 2015-11-0400:00:30 >> 1446566430 2015-11-0400:00:30 >> 1446566430 2015-11-0400:00:30 >> 1446566431 2015-11-0400:00:31 >> 1446566431 2015-11-0400:00:31 >> 1446566431 2015-11-0400:00:31 >> 1446566431 2015-11-0400:00:31 >> 1446566431 2015-11-0400:00:31 >> >> the in column 2 represents sub-delimiter within that column, and >> this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv >> >> I’m using *spark-csv* to import this file as Spark *DataFrame*: >> >> sqlContext.read.format("com.databricks.spark.csv") >> .option("header", "false") // Use first line of all files as header >> .option("inferSchema", "false") // Automatically infer data types >> .option("delimiter", " ") >> .load("hdfs:///tmp/1.csv") >> .show >> >> Oddly, the output shows only a part of each column: >> >> [image: Screenshot from 2016-02-07 15-27-51.png] >> >> and even the boundary of the table wasn’t shown correctly. I also used >> the other way to read csv file, by sc.textFile(...).map(_.split(" ")) >> and sqlContext.createDataFrame, and the result is the same. Can someone >> point me out where I did it wrong? >> >> — >> BR, >> Todd Leo >> >> > > > > -- > Luciano Resende > http://people.apache.org/~lresende > http://twitter.com/lresende1975 > http://lresende.blogspot.com/ >
Re: Imported CSV file content isn't identical to the original file
I’ve found the trigger of my issue: if I start my spark-shell or submit by spark-submit with --conf spark.serializer=org.apache.spark.serializer.KryoSerializer, the DataFrame content goes wrong, as I described earlier. On Mon, Feb 8, 2016 at 5:42 PM SLiZn Liu <sliznmail...@gmail.com> wrote: > Thanks Luciano, now it looks like I’m the only guy who have this issue. My > options is narrowed down to upgrade my spark to 1.6.0, to see if this issue > is gone. > > — > Cheers, > Todd Leo > > > > On Mon, Feb 8, 2016 at 2:12 PM Luciano Resende <luckbr1...@gmail.com> > wrote: > >> I tried in both 1.5.0, 1.6.0 and 2.0.0 trunk and >> com.databricks:spark-csv_2.10:1.3.0 with expected results, where the >> columns seem to be read properly. >> >> +--+--+ >> |C0|C1| >> +--+--+ >> >> |1446566430 | 2015-11-0400:00:30| >> |1446566430 | 2015-11-0400:00:30| >> |1446566430 | 2015-11-0400:00:30| >> |1446566430 | 2015-11-0400:00:30| >> |1446566430 | 2015-11-0400:00:30| >> |1446566431 | 2015-11-0400:00:31| >> |1446566431 | 2015-11-0400:00:31| >> |1446566431 | 2015-11-0400:00:31| >> |1446566431 | 2015-11-0400:00:31| >> |1446566431 | 2015-11-0400:00:31| >> +--+--+ >> >> >> >> >> On Sat, Feb 6, 2016 at 11:44 PM, SLiZn Liu <sliznmail...@gmail.com> >> wrote: >> >>> Hi Spark Users Group, >>> >>> I have a csv file to analysis with Spark, but I’m troubling with >>> importing as DataFrame. >>> >>> Here’s the minimal reproducible example. Suppose I’m having a >>> *10(rows)x2(cols)* *space-delimited csv* file, shown as below: >>> >>> 1446566430 2015-11-0400:00:30 >>> 1446566430 2015-11-0400:00:30 >>> 1446566430 2015-11-0400:00:30 >>> 1446566430 2015-11-0400:00:30 >>> 1446566430 2015-11-0400:00:30 >>> 1446566431 2015-11-0400:00:31 >>> 1446566431 2015-11-0400:00:31 >>> 1446566431 2015-11-0400:00:31 >>> 1446566431 2015-11-0400:00:31 >>> 1446566431 2015-11-0400:00:31 >>> >>> the in column 2 represents sub-delimiter within that column, and >>> this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv >>> >>> I’m using *spark-csv* to import this file as Spark *DataFrame*: >>> >>> sqlContext.read.format("com.databricks.spark.csv") >>> .option("header", "false") // Use first line of all files as header >>> .option("inferSchema", "false") // Automatically infer data types >>> .option("delimiter", " ") >>> .load("hdfs:///tmp/1.csv") >>> .show >>> >>> Oddly, the output shows only a part of each column: >>> >>> [image: Screenshot from 2016-02-07 15-27-51.png] >>> >>> and even the boundary of the table wasn’t shown correctly. I also used >>> the other way to read csv file, by sc.textFile(...).map(_.split(" ")) >>> and sqlContext.createDataFrame, and the result is the same. Can someone >>> point me out where I did it wrong? >>> >>> — >>> BR, >>> Todd Leo >>> >>> >> >> >> >> -- >> Luciano Resende >> http://people.apache.org/~lresende >> http://twitter.com/lresende1975 >> http://lresende.blogspot.com/ >> >
Re: Imported CSV file content isn't identical to the original file
At least works for me though, temporarily disabled Kyro serilizer until upgrade to 1.6.0. Appreciate for your update. :) Luciano Resende <luckbr1...@gmail.com>于2016年2月9日 周二02:37写道: > Sorry, same expected results with trunk and Kryo serializer > > On Mon, Feb 8, 2016 at 4:15 AM, SLiZn Liu <sliznmail...@gmail.com> wrote: > >> I’ve found the trigger of my issue: if I start my spark-shell or submit >> by spark-submit with --conf >> spark.serializer=org.apache.spark.serializer.KryoSerializer, the >> DataFrame content goes wrong, as I described earlier. >> >> >> On Mon, Feb 8, 2016 at 5:42 PM SLiZn Liu <sliznmail...@gmail.com> wrote: >> >>> Thanks Luciano, now it looks like I’m the only guy who have this issue. >>> My options is narrowed down to upgrade my spark to 1.6.0, to see if this >>> issue is gone. >>> >>> — >>> Cheers, >>> Todd Leo >>> >>> >>> >>> On Mon, Feb 8, 2016 at 2:12 PM Luciano Resende <luckbr1...@gmail.com> >>> wrote: >>> >>>> I tried in both 1.5.0, 1.6.0 and 2.0.0 trunk and >>>> com.databricks:spark-csv_2.10:1.3.0 with expected results, where the >>>> columns seem to be read properly. >>>> >>>> +--+--+ >>>> |C0|C1| >>>> +--+--+ >>>> >>>> |1446566430 | 2015-11-0400:00:30| >>>> |1446566430 | 2015-11-0400:00:30| >>>> |1446566430 | 2015-11-0400:00:30| >>>> |1446566430 | 2015-11-0400:00:30| >>>> |1446566430 | 2015-11-0400:00:30| >>>> |1446566431 | 2015-11-0400:00:31| >>>> |1446566431 | 2015-11-0400:00:31| >>>> |1446566431 | 2015-11-0400:00:31| >>>> |1446566431 | 2015-11-0400:00:31| >>>> |1446566431 | 2015-11-0400:00:31| >>>> +--+--+ >>>> >>>> >>>> >>>> >>>> On Sat, Feb 6, 2016 at 11:44 PM, SLiZn Liu <sliznmail...@gmail.com> >>>> wrote: >>>> >>>>> Hi Spark Users Group, >>>>> >>>>> I have a csv file to analysis with Spark, but I’m troubling with >>>>> importing as DataFrame. >>>>> >>>>> Here’s the minimal reproducible example. Suppose I’m having a >>>>> *10(rows)x2(cols)* *space-delimited csv* file, shown as below: >>>>> >>>>> 1446566430 2015-11-0400:00:30 >>>>> 1446566430 2015-11-0400:00:30 >>>>> 1446566430 2015-11-0400:00:30 >>>>> 1446566430 2015-11-0400:00:30 >>>>> 1446566430 2015-11-0400:00:30 >>>>> 1446566431 2015-11-0400:00:31 >>>>> 1446566431 2015-11-0400:00:31 >>>>> 1446566431 2015-11-0400:00:31 >>>>> 1446566431 2015-11-0400:00:31 >>>>> 1446566431 2015-11-0400:00:31 >>>>> >>>>> the in column 2 represents sub-delimiter within that column, and >>>>> this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv >>>>> >>>>> I’m using *spark-csv* to import this file as Spark *DataFrame*: >>>>> >>>>> sqlContext.read.format("com.databricks.spark.csv") >>>>> .option("header", "false") // Use first line of all files as >>>>> header >>>>> .option("inferSchema", "false") // Automatically infer data types >>>>> .option("delimiter", " ") >>>>> .load("hdfs:///tmp/1.csv") >>>>> .show >>>>> >>>>> Oddly, the output shows only a part of each column: >>>>> >>>>> [image: Screenshot from 2016-02-07 15-27-51.png] >>>>> >>>>> and even the boundary of the table wasn’t shown correctly. I also used >>>>> the other way to read csv file, by sc.textFile(...).map(_.split(" ")) >>>>> and sqlContext.createDataFrame, and the result is the same. Can >>>>> someone point me out where I did it wrong? >>>>> >>>>> — >>>>> BR, >>>>> Todd Leo >>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> Luciano Resende >>>> http://people.apache.org/~lresende >>>> http://twitter.com/lresende1975 >>>> http://lresende.blogspot.com/ >>>> >>> > > > -- > Luciano Resende > http://people.apache.org/~lresende > http://twitter.com/lresende1975 > http://lresende.blogspot.com/ >
Re: Imported CSV file content isn't identical to the original file
Plus, I’m using *Spark 1.5.2*, with *spark-csv 1.3.0*. Also tried HiveContext, but the result is exactly the same. On Sun, Feb 7, 2016 at 3:44 PM SLiZn Liu <sliznmail...@gmail.com> wrote: > Hi Spark Users Group, > > I have a csv file to analysis with Spark, but I’m troubling with importing > as DataFrame. > > Here’s the minimal reproducible example. Suppose I’m having a > *10(rows)x2(cols)* *space-delimited csv* file, shown as below: > > 1446566430 2015-11-0400:00:30 > 1446566430 2015-11-0400:00:30 > 1446566430 2015-11-0400:00:30 > 1446566430 2015-11-0400:00:30 > 1446566430 2015-11-0400:00:30 > 1446566431 2015-11-0400:00:31 > 1446566431 2015-11-0400:00:31 > 1446566431 2015-11-0400:00:31 > 1446566431 2015-11-0400:00:31 > 1446566431 2015-11-0400:00:31 > > the in column 2 represents sub-delimiter within that column, and > this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv > > I’m using *spark-csv* to import this file as Spark *DataFrame*: > > sqlContext.read.format("com.databricks.spark.csv") > .option("header", "false") // Use first line of all files as header > .option("inferSchema", "false") // Automatically infer data types > .option("delimiter", " ") > .load("hdfs:///tmp/1.csv") > .show > > Oddly, the output shows only a part of each column: > > [image: Screenshot from 2016-02-07 15-27-51.png] > > and even the boundary of the table wasn’t shown correctly. I also used the > other way to read csv file, by sc.textFile(...).map(_.split(" ")) and > sqlContext.createDataFrame, and the result is the same. Can someone point > me out where I did it wrong? > > — > BR, > Todd Leo > >
Re: Imported CSV file content isn't identical to the original file
Hi Igor, In my case, it’s not a matter of *truncate*. As the show() function in Spark API doc reads, truncate: Whether truncate long strings. If true, strings more than 20 characters will be truncated and all cells will be aligned right… whereas the leading characters of my two columns are missing. Good to know the way to show the whole content in a cell. — BR, Todd Leo On Sun, Feb 7, 2016 at 5:42 PM Igor Berman <igor.ber...@gmail.com> wrote: > show has argument of truncate > pass false so it wont truncate your results > > On 7 February 2016 at 11:01, SLiZn Liu <sliznmail...@gmail.com> wrote: > >> Plus, I’m using *Spark 1.5.2*, with *spark-csv 1.3.0*. Also tried >> HiveContext, but the result is exactly the same. >> >> >> On Sun, Feb 7, 2016 at 3:44 PM SLiZn Liu <sliznmail...@gmail.com> wrote: >> >>> Hi Spark Users Group, >>> >>> I have a csv file to analysis with Spark, but I’m troubling with >>> importing as DataFrame. >>> >>> Here’s the minimal reproducible example. Suppose I’m having a >>> *10(rows)x2(cols)* *space-delimited csv* file, shown as below: >>> >>> 1446566430 2015-11-0400:00:30 >>> 1446566430 2015-11-0400:00:30 >>> 1446566430 2015-11-0400:00:30 >>> 1446566430 2015-11-0400:00:30 >>> 1446566430 2015-11-0400:00:30 >>> 1446566431 2015-11-0400:00:31 >>> 1446566431 2015-11-0400:00:31 >>> 1446566431 2015-11-0400:00:31 >>> 1446566431 2015-11-0400:00:31 >>> 1446566431 2015-11-0400:00:31 >>> >>> the in column 2 represents sub-delimiter within that column, and >>> this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv >>> >>> I’m using *spark-csv* to import this file as Spark *DataFrame*: >>> >>> sqlContext.read.format("com.databricks.spark.csv") >>> .option("header", "false") // Use first line of all files as header >>> .option("inferSchema", "false") // Automatically infer data types >>> .option("delimiter", " ") >>> .load("hdfs:///tmp/1.csv") >>> .show >>> >>> Oddly, the output shows only a part of each column: >>> >>> [image: Screenshot from 2016-02-07 15-27-51.png] >>> >>> and even the boundary of the table wasn’t shown correctly. I also used >>> the other way to read csv file, by sc.textFile(...).map(_.split(" ")) >>> and sqlContext.createDataFrame, and the result is the same. Can someone >>> point me out where I did it wrong? >>> >>> — >>> BR, >>> Todd Leo >>> >>> >> >
Re: Imported CSV file content isn't identical to the original file
*Update*: on local mode(spark-shell --local[2], no matter read from local file system or hdfs) , it works well. But it doesn’t solve this issue, since my data scale requires hundreds of CPU cores and hundreds GB of RAM. BTW, it’s Chinese Tradition New Year now, wish you all have a happy year and have Great fortune in the Year of Monkey! — BR, Todd Leo On Sun, Feb 7, 2016 at 6:09 PM SLiZn Liu <sliznmail...@gmail.com> wrote: > Hi Igor, > > In my case, it’s not a matter of *truncate*. As the show() function in > Spark API doc reads, > > truncate: Whether truncate long strings. If true, strings more than 20 > characters will be truncated and all cells will be aligned right… > > whereas the leading characters of my two columns are missing. > > Good to know the way to show the whole content in a cell. > > — > BR, > Todd Leo > > > > > > On Sun, Feb 7, 2016 at 5:42 PM Igor Berman <igor.ber...@gmail.com> wrote: > >> show has argument of truncate >> pass false so it wont truncate your results >> >> On 7 February 2016 at 11:01, SLiZn Liu <sliznmail...@gmail.com> wrote: >> >>> Plus, I’m using *Spark 1.5.2*, with *spark-csv 1.3.0*. Also tried >>> HiveContext, but the result is exactly the same. >>> >>> >>> On Sun, Feb 7, 2016 at 3:44 PM SLiZn Liu <sliznmail...@gmail.com> wrote: >>> >>>> Hi Spark Users Group, >>>> >>>> I have a csv file to analysis with Spark, but I’m troubling with >>>> importing as DataFrame. >>>> >>>> Here’s the minimal reproducible example. Suppose I’m having a >>>> *10(rows)x2(cols)* *space-delimited csv* file, shown as below: >>>> >>>> 1446566430 2015-11-0400:00:30 >>>> 1446566430 2015-11-0400:00:30 >>>> 1446566430 2015-11-0400:00:30 >>>> 1446566430 2015-11-0400:00:30 >>>> 1446566430 2015-11-0400:00:30 >>>> 1446566431 2015-11-0400:00:31 >>>> 1446566431 2015-11-0400:00:31 >>>> 1446566431 2015-11-0400:00:31 >>>> 1446566431 2015-11-0400:00:31 >>>> 1446566431 2015-11-0400:00:31 >>>> >>>> the in column 2 represents sub-delimiter within that column, and >>>> this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv >>>> >>>> I’m using *spark-csv* to import this file as Spark *DataFrame*: >>>> >>>> sqlContext.read.format("com.databricks.spark.csv") >>>> .option("header", "false") // Use first line of all files as header >>>> .option("inferSchema", "false") // Automatically infer data types >>>> .option("delimiter", " ") >>>> .load("hdfs:///tmp/1.csv") >>>> .show >>>> >>>> Oddly, the output shows only a part of each column: >>>> >>>> [image: Screenshot from 2016-02-07 15-27-51.png] >>>> >>>> and even the boundary of the table wasn’t shown correctly. I also used >>>> the other way to read csv file, by sc.textFile(...).map(_.split(" ")) >>>> and sqlContext.createDataFrame, and the result is the same. Can >>>> someone point me out where I did it wrong? >>>> >>>> — >>>> BR, >>>> Todd Leo >>>> >>>> >>> >>
Imported CSV file content isn't identical to the original file
Hi Spark Users Group, I have a csv file to analysis with Spark, but I’m troubling with importing as DataFrame. Here’s the minimal reproducible example. Suppose I’m having a *10(rows)x2(cols)* *space-delimited csv* file, shown as below: 1446566430 2015-11-0400:00:30 1446566430 2015-11-0400:00:30 1446566430 2015-11-0400:00:30 1446566430 2015-11-0400:00:30 1446566430 2015-11-0400:00:30 1446566431 2015-11-0400:00:31 1446566431 2015-11-0400:00:31 1446566431 2015-11-0400:00:31 1446566431 2015-11-0400:00:31 1446566431 2015-11-0400:00:31 the in column 2 represents sub-delimiter within that column, and this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv I’m using *spark-csv* to import this file as Spark *DataFrame*: sqlContext.read.format("com.databricks.spark.csv") .option("header", "false") // Use first line of all files as header .option("inferSchema", "false") // Automatically infer data types .option("delimiter", " ") .load("hdfs:///tmp/1.csv") .show Oddly, the output shows only a part of each column: [image: Screenshot from 2016-02-07 15-27-51.png] and even the boundary of the table wasn’t shown correctly. I also used the other way to read csv file, by sc.textFile(...).map(_.split(" ")) and sqlContext.createDataFrame, and the result is the same. Can someone point me out where I did it wrong? — BR, Todd Leo
Re: Save GraphX to disk
Hi Gaurav, Your graph can be saved to graph databases like Neo4j or Titan through their drivers, that eventually saved to the disk. BR, Todd Gaurav Kumar gauravkuma...@gmail.com>于2015年11月13日 周五22:08写道: > Hi, > > I was wondering how to save a graph to disk and load it back again. I know > how to save vertices and edges to disk and construct the graph from them, > not sure if there's any method to save the graph itself to disk. > > Best Regards, > Gaurav Kumar > Big Data • Data Science • Photography • Music > +91 9953294125 >
Re: Spark executor on Mesos - how to set effective user id?
Hi Jerry, I think you are referring to --no-switch_user. =) chiling...@gmail.com>于2015年10月19日 周一21:05写道: > Can you try setting SPARK_USER at the driver? It is used to impersonate > users at the executor. So if you have user setup for launching spark jobs > on the executor machines, simply set it to that user name for SPARK_USER. > There is another configuration that will prevents jobs being launched with > a different user except the one that is configured. I don't remember the > name of it but it is in the documentation. > > > Sent from my iPhone > > > On 19 Oct, 2015, at 8:14 am, Eugene Chepurniy> wrote: > > > > Hi everyone! > > While we are trying to utilize Spark On Mesos cluster, we are facing an > > issue related to effective linux user id being used to start executors on > > Mesos slaves: all executors are trying to use driver's linux user id to > > start on Mesos slaves. > > Let me explain in detail: spark driver program (which is going to spawn > > Spark on Mesos in coarse mode) is started as unprivileged linux user, for > > example 'user1'. We have Spark distribution unpacked and ready-to-use on > > every mesos slave (placed at /opt/spark, 'spark.mesos.executor.home' is > > pointing to this folder). And after attempt to run every executor fails > to > > start with error log telling user 'user1' is not available. And it is > really > > true - there is no 'user1' present on Mesos slaves. > > So my question is: how can I control effective user id which will be used > > for start executors on Mesos? > > Actually I was trying to setup SPARK_USER=nobody on every slave but it > > wasn't useful. > > Thanks for advice if any. > > > > > > > > -- > > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-on-Mesos-how-to-set-effective-user-id-tp25118.html > > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > > > - > > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > > For additional commands, e-mail: user-h...@spark.apache.org > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: OutOfMemoryError When Reading Many json Files
Yes it went wrong when processing a large file only. I removed transformations on DF, and it worked just fine. But doing a simple filter operation on the DF became the last straw that breaks the camel’s back. That’s confusing. On Wed, Oct 14, 2015 at 2:11 PM Deenar Toraskar <deenar.toras...@gmail.com> wrote: > Hi > > Why dont you check if you can just process the large file standalone and > then do the outer loop next. > > sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn( > "new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) ) > .combineByKey() > > Deenar > > On 14 October 2015 at 05:18, SLiZn Liu <sliznmail...@gmail.com> wrote: > >> Hey Spark Users, >> >> I kept getting java.lang.OutOfMemoryError: Java heap space as I read a >> massive amount of json files, iteratively via read.json(). Even the >> result RDD is rather small, I still get the OOM Error. The brief structure >> of my program reads as following, in psuedo-code: >> >> file_path_list.map{ jsonFile: String => >> sqlContext.read.json(jsonFile) >> .select($"some", $"fields") >> .withColumn("new_col", some_transformations($"col")) >> .rdd.map( x: Row => (k, v) ) >> .combineByKey() // which groups a column into item lists by another >> column as keys >> }.reduce( (i, j) => i.union(j) ) >> .combineByKey() // which combines results from all json files >> >> I confess some of the json files are Gigabytes huge, yet the combined RDD >> is in a few Megabytes. I’m not familiar with the under-the-hood mechanism, >> but my intuitive understanding of how the code executes is, read the file >> once a time (where I can easily modify map to foreach when fetching from >> file_path_list, if that’s the case), do the inner transformation on DF >> and combine, then reduce and do the outer combine immediately, which >> doesn’t require to hold all RDDs generated from all files in the memory. >> Obviously, as my code raises OOM Error, I must have missed something >> important. >> >> From the debug log, I can tell the OOM Error happens when reading the >> same file, which is in a modest size of 2GB, while driver.memory is set to >> 13GB, and the available memory size before the code execution is around >> 8GB, on my standalone machine running as “local[8]”. >> >> To overcome this, I also tried to initialize an empty universal RDD >> variable, iteratively read one file at a time using foreach, then >> instead of reduce, simply combine each RDD generated by the json files, >> except the OOM Error remains. >> >> Other configurations: >> >>- set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is >>used >>- set(“spark.serializer”, >>“org.apache.spark.serializer.KryoSerializer”) >> >> Any suggestions other than scale up/out the spark cluster? >> >> BR, >> Todd Leo >> >> > >
Re: Spark DataFrame GroupBy into List
Thanks, Michael and java8964! Does Hive Context also provides udf for combining existing lists, into flattened(not nested) list? (list->list of lists -[flatten]->list). On Thu, Oct 15, 2015 at 1:16 AM Michael Armbrust <mich...@databricks.com> wrote: > Thats correct. It is a Hive UDAF. > > On Wed, Oct 14, 2015 at 6:45 AM, java8964 <java8...@hotmail.com> wrote: > >> My guess is the same as UDAF of (collect_set) in Hive. >> >> >> https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF) >> >> Yong >> >> -- >> From: sliznmail...@gmail.com >> Date: Wed, 14 Oct 2015 02:45:48 + >> Subject: Re: Spark DataFrame GroupBy into List >> To: mich...@databricks.com >> CC: user@spark.apache.org >> >> >> Hi Michael, >> >> Can you be more specific on `collect_set`? Is it a built-in function or, >> if it is an UDF, how it is defined? >> >> BR, >> Todd Leo >> >> On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust <mich...@databricks.com> >> wrote: >> >> import org.apache.spark.sql.functions._ >> >> df.groupBy("category") >> .agg(callUDF("collect_set", df("id")).as("id_list")) >> >> On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu <sliznmail...@gmail.com> >> wrote: >> >> Hey Spark users, >> >> I'm trying to group by a dataframe, by appending occurrences into a list >> instead of count. >> >> Let's say we have a dataframe as shown below: >> >> | category | id | >> | |:--:| >> | A| 1 | >> | A| 2 | >> | B| 3 | >> | B| 4 | >> | C| 5 | >> >> ideally, after some magic group by (reverse explode?): >> >> | category | id_list | >> | | | >> | A| 1,2 | >> | B| 3,4 | >> | C| 5| >> >> any tricks to achieve that? Scala Spark API is preferred. =D >> >> BR, >> Todd Leo >> >> >> >> >> >
Re: Spark DataFrame GroupBy into List
Hi Michael, Can you be more specific on `collect_set`? Is it a built-in function or, if it is an UDF, how it is defined? BR, Todd Leo On Wed, Oct 14, 2015 at 2:12 AM Michael Armbrust <mich...@databricks.com> wrote: > import org.apache.spark.sql.functions._ > > df.groupBy("category") > .agg(callUDF("collect_set", df("id")).as("id_list")) > > On Mon, Oct 12, 2015 at 11:08 PM, SLiZn Liu <sliznmail...@gmail.com> > wrote: > >> Hey Spark users, >> >> I'm trying to group by a dataframe, by appending occurrences into a list >> instead of count. >> >> Let's say we have a dataframe as shown below: >> >> | category | id | >> | |:--:| >> | A| 1 | >> | A| 2 | >> | B| 3 | >> | B| 4 | >> | C| 5 | >> >> ideally, after some magic group by (reverse explode?): >> >> | category | id_list | >> | | | >> | A| 1,2 | >> | B| 3,4 | >> | C| 5| >> >> any tricks to achieve that? Scala Spark API is preferred. =D >> >> BR, >> Todd Leo >> >> >> >> >
OutOfMemoryError When Reading Many json Files
Hey Spark Users, I kept getting java.lang.OutOfMemoryError: Java heap space as I read a massive amount of json files, iteratively via read.json(). Even the result RDD is rather small, I still get the OOM Error. The brief structure of my program reads as following, in psuedo-code: file_path_list.map{ jsonFile: String => sqlContext.read.json(jsonFile) .select($"some", $"fields") .withColumn("new_col", some_transformations($"col")) .rdd.map( x: Row => (k, v) ) .combineByKey() // which groups a column into item lists by another column as keys }.reduce( (i, j) => i.union(j) ) .combineByKey() // which combines results from all json files I confess some of the json files are Gigabytes huge, yet the combined RDD is in a few Megabytes. I’m not familiar with the under-the-hood mechanism, but my intuitive understanding of how the code executes is, read the file once a time (where I can easily modify map to foreach when fetching from file_path_list, if that’s the case), do the inner transformation on DF and combine, then reduce and do the outer combine immediately, which doesn’t require to hold all RDDs generated from all files in the memory. Obviously, as my code raises OOM Error, I must have missed something important. >From the debug log, I can tell the OOM Error happens when reading the same file, which is in a modest size of 2GB, while driver.memory is set to 13GB, and the available memory size before the code execution is around 8GB, on my standalone machine running as “local[8]”. To overcome this, I also tried to initialize an empty universal RDD variable, iteratively read one file at a time using foreach, then instead of reduce, simply combine each RDD generated by the json files, except the OOM Error remains. Other configurations: - set(“spark.storage.memoryFraction”, “0.1”) // no cache of RDD is used - set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) Any suggestions other than scale up/out the spark cluster? BR, Todd Leo
Re: Spark DataFrame GroupBy into List
Hi Rishitesh, I did it by CombineByKey, but your solution is more clear and readable, at least doesn't require 3 lambda functions to get confused with. Will definitely try it out tomorrow, thanks. Plus, OutOfMemoryError keeps bothering me as I read a massive amount of json files, whereas the yielded RDD by CombineByKey is rather small. Anyway I'll file another mail to describe this. BR, Todd Leo Rishitesh Mishra <rmis...@snappydata.io>于2015年10月13日 周二19:05写道: > Hi Liu, > I could not see any operator on DataFrame which will give the desired > result . DataFrame APIs as expected works on Row format and a fixed set of > operators on them. > However you can achive the desired result by accessing the internal RDD as > below.. > > val s = Seq(Test("A",1), Test("A",2),Test("B",1),Test("B",2)) > val rdd = testSparkContext.parallelize(s) > val df = snc.createDataFrame(rdd) > val rdd1 = df.rdd.map(p => (Seq(p.getString(0)), Seq(p.getInt(1 > > val reduceF = (p: Seq[Int], q: Seq[Int]) => { Seq(p.head, q.head) } > > val rdd3 = rdd1.reduceByKey(reduceF) > rdd3.foreach(r => println(r)) > > > > You can always reconvert the obtained RDD after tranformation and reduce to a > DataFrame. > > > Regards, > Rishitesh Mishra, > SnappyData . (http://www.snappydata.io/) > > > https://www.linkedin.com/profile/view?id=AAIAAAIFdkMB_v-nolCrFH6_pKf9oH6tZD8Qlgo=nav_responsive_tab_profile > > On Tue, Oct 13, 2015 at 11:38 AM, SLiZn Liu <sliznmail...@gmail.com> > wrote: > >> Hey Spark users, >> >> I'm trying to group by a dataframe, by appending occurrences into a list >> instead of count. >> >> Let's say we have a dataframe as shown below: >> >> | category | id | >> | |:--:| >> | A| 1 | >> | A| 2 | >> | B| 3 | >> | B| 4 | >> | C| 5 | >> >> ideally, after some magic group by (reverse explode?): >> >> | category | id_list | >> | | | >> | A| 1,2 | >> | B| 3,4 | >> | C| 5| >> >> any tricks to achieve that? Scala Spark API is preferred. =D >> >> BR, >> Todd Leo >> >> >> >> > > > -- > > >
Spark DataFrame GroupBy into List
Hey Spark users, I'm trying to group by a dataframe, by appending occurrences into a list instead of count. Let's say we have a dataframe as shown below: | category | id | | |:--:| | A| 1 | | A| 2 | | B| 3 | | B| 4 | | C| 5 | ideally, after some magic group by (reverse explode?): | category | id_list | | | | | A| 1,2 | | B| 3,4 | | C| 5| any tricks to achieve that? Scala Spark API is preferred. =D BR, Todd Leo
Re: Streaming Receiver Imbalance Problem
The imbalance was caused by the stuck partition, after 10s of hours the receiving rate went down. But the second ERR log I mentioned in the first mail now occur at most of tasks(I did’t count, but keep flushing my terminal) and jeopardize the job, as every batch takes 2 min(2-15 seconds before) execution time and delays as much as 2+ hours. The daunting ERROR log: 5/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414899 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414896 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414898 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414893 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414900 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414895 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414897 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:24 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414894 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) [Stage 4:> (0 + 3) / 3][Stage 29809:> (0 + 176) / 180]15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414899 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414896 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414898 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414895 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414893 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414900 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414897 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) 15/09/23 14:20:54 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 1414894 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) the source code where the errors were thrown: statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L355>, any suggestions that where should I dig in? BR, Todd Leo On Wed, Sep 23, 2015 at 1:53 PM Tathagata Das t...@databricks.com <http://mailto:t...@databricks.com> wrote: Also, you could switch to the Direct KAfka API which was first released as > experimental in 1.3. In 1.5 we graduated it from experimental, but its > quite usable in Spark 1.3.1 > > TD > > On Tue, Sep 22, 2015 at 7:45 PM, SLiZn Liu <sliznmail...@gmail.com> wrote: > >> Cool, we are still sticking with 1.3.1, will upgrade to 1.5 ASAP. Thanks >> for the tips, Tathagata! >> >> On Wed, Sep 23, 2015 at 10:40 AM Tathagata Das <t...@databricks.com> >> wrote: >> >>> A lot of these imbalances were solved in spark 1.5. Could you give that >>> a spin? >>> >>> https://issues.apache.org/jira/browse/SPARK-8882 >>> >>> On Tue, Sep 22, 2015 at 12:17 AM, SLiZn Liu <sliz
Re: Streaming Receiver Imbalance Problem
Cool, we are still sticking with 1.3.1, will upgrade to 1.5 ASAP. Thanks for the tips, Tathagata! On Wed, Sep 23, 2015 at 10:40 AM Tathagata Das <t...@databricks.com> wrote: > A lot of these imbalances were solved in spark 1.5. Could you give that a > spin? > > https://issues.apache.org/jira/browse/SPARK-8882 > > On Tue, Sep 22, 2015 at 12:17 AM, SLiZn Liu <sliznmail...@gmail.com> > wrote: > >> Hi spark users, >> >> In our Spark Streaming app via Kafka integration on Mesos, we initialed 3 >> receivers to receive 3 Kafka partitions, whereas records receiving rate >> imbalance been observed, with spark.streaming.receiver.maxRate is set to >> 120, sometimes 1 of which receives very close to the limit while the >> other two only at roughly fifty per second. >> >> This may be caused by previous receiver failure, where one of the >> receivers’ receiving rate drop to 0. We restarted the Spark Streaming app, >> and the imbalance began. We suspect that the partition which received by >> the failing receiver got jammed, and the other two receivers cannot take up >> its data. >> >> The 3-nodes cluster tends to run slowly, nearly all the tasks is >> registered at the node with previous receiver failure(I used unionto >> combine 3 receivers’ DStream, thus I expect the combined DStream is well >> distributed across all nodes), cannot guarantee to finish one batch in a >> single batch time, stages get piled up, and the digested log shows as >> following: >> >> ... >> 5728.399: [GC (Allocation Failure) [PSYoungGen: 6954678K->17088K(6961152K)] >> 7074614K->138108K(20942336K), 0.0203877 secs] [Times: user=0.20 sys=0.00, >> real=0.02 secs] >> >> ... >> 5/09/22 13:33:35 ERROR TaskSchedulerImpl: Ignoring update with state >> FINISHED for TID 77219 because its task set is gone (this is likely the >> result of >> receiving duplicate task finished status updates) >> >> ... >> >> the two type of log was printed in execution of some (not all) stages. >> >> My configurations: >> # of cores on each node: 64 >> # of nodes: 3 >> batch time is set to 10 seconds >> >> spark.streaming.receiver.maxRate120 >> spark.streaming.blockInterval 160 // set to the value that >> divides 10 seconds approx. to total cores, which is 64, to max out all the >> nodes: 10s * 1000 / 64 >> spark.storage.memoryFraction0.1 // this one doesn't seem to >> work, since the young gen / old gen ratio is nearly 0.3 instead of 0.1 >> >> anyone got an idea? Appreciate for your patience. >> >> BR, >> Todd Leo >> >> > >
Streaming Receiver Imbalance Problem
Hi spark users, In our Spark Streaming app via Kafka integration on Mesos, we initialed 3 receivers to receive 3 Kafka partitions, whereas records receiving rate imbalance been observed, with spark.streaming.receiver.maxRate is set to 120, sometimes 1 of which receives very close to the limit while the other two only at roughly fifty per second. This may be caused by previous receiver failure, where one of the receivers’ receiving rate drop to 0. We restarted the Spark Streaming app, and the imbalance began. We suspect that the partition which received by the failing receiver got jammed, and the other two receivers cannot take up its data. The 3-nodes cluster tends to run slowly, nearly all the tasks is registered at the node with previous receiver failure(I used unionto combine 3 receivers’ DStream, thus I expect the combined DStream is well distributed across all nodes), cannot guarantee to finish one batch in a single batch time, stages get piled up, and the digested log shows as following: ... 5728.399: [GC (Allocation Failure) [PSYoungGen: 6954678K->17088K(6961152K)] 7074614K->138108K(20942336K), 0.0203877 secs] [Times: user=0.20 sys=0.00, real=0.02 secs] ... 5/09/22 13:33:35 ERROR TaskSchedulerImpl: Ignoring update with state FINISHED for TID 77219 because its task set is gone (this is likely the result of receiving duplicate task finished status updates) ... the two type of log was printed in execution of some (not all) stages. My configurations: # of cores on each node: 64 # of nodes: 3 batch time is set to 10 seconds spark.streaming.receiver.maxRate120 spark.streaming.blockInterval 160 // set to the value that divides 10 seconds approx. to total cores, which is 64, to max out all the nodes: 10s * 1000 / 64 spark.storage.memoryFraction0.1 // this one doesn't seem to work, since the young gen / old gen ratio is nearly 0.3 instead of 0.1 anyone got an idea? Appreciate for your patience. BR, Todd Leo
Re: Can Dependencies Be Resolved on Spark Cluster?
Thanks for the enlightening solution! On Wed, Jul 1, 2015 at 12:03 AM Burak Yavuz brk...@gmail.com wrote: Hi, In your build.sbt file, all the dependencies you have (hopefully they're not too many, they only have a lot of transitive dependencies), for example: ``` libraryDependencies += org.apache.hbase % hbase % 1.1.1 libraryDependencies += junit % junit % x resolvers += Some other repo at http://some.other.repo; resolvers += Some other repo2 at http://some.other.repo2; ``` call `sbt package`, and then run spark-submit as: $ bin/spark-submit --packages org.apache.hbase:hbase:1.1.1, junit:junit:x --repositories http://some.other.repo,http://some.other.repo2 $YOUR_JAR Best, Burak On Mon, Jun 29, 2015 at 11:33 PM, SLiZn Liu sliznmail...@gmail.com wrote: Hi Burak, Is `--package` flag only available for maven, no sbt support? On Tue, Jun 30, 2015 at 2:26 PM Burak Yavuz brk...@gmail.com wrote: You can pass `--packages your:comma-separated:maven-dependencies` to spark submit if you have Spark 1.3 or greater. Best regards, Burak On Mon, Jun 29, 2015 at 10:46 PM, SLiZn Liu sliznmail...@gmail.com wrote: Hey Spark Users, I'm writing a demo with Spark and HBase. What I've done is packaging a **fat jar**: place dependencies in `build.sbt`, and use `sbt assembly` to package **all dependencies** into one big jar. The rest work is copy the fat jar to Spark master node and then launch by `spark-submit`. The defect of the fat jar fashion is obvious: all dependencies is packed, yielding a huge jar file. Even worse, in my case, a vast amount of the conflicting package files in `~/.ivy/cache`fails when merging, I had to manually specify `MergingStrategy` as `rename` for all conflicting files to bypass this issue. Then I thought, there should exists an easier way to submit a thin jar with build.sbt-like file specifying dependencies, and then dependencies are automatically resolved across the cluster before the actual job is launched. I googled, except nothing related was found. Is this plausible, or is there other better ways to achieve the same goal? BEST REGARDS, Todd Leo
Re: Can Dependencies Be Resolved on Spark Cluster?
Hi Burak, Is `--package` flag only available for maven, no sbt support? On Tue, Jun 30, 2015 at 2:26 PM Burak Yavuz brk...@gmail.com wrote: You can pass `--packages your:comma-separated:maven-dependencies` to spark submit if you have Spark 1.3 or greater. Best regards, Burak On Mon, Jun 29, 2015 at 10:46 PM, SLiZn Liu sliznmail...@gmail.com wrote: Hey Spark Users, I'm writing a demo with Spark and HBase. What I've done is packaging a **fat jar**: place dependencies in `build.sbt`, and use `sbt assembly` to package **all dependencies** into one big jar. The rest work is copy the fat jar to Spark master node and then launch by `spark-submit`. The defect of the fat jar fashion is obvious: all dependencies is packed, yielding a huge jar file. Even worse, in my case, a vast amount of the conflicting package files in `~/.ivy/cache`fails when merging, I had to manually specify `MergingStrategy` as `rename` for all conflicting files to bypass this issue. Then I thought, there should exists an easier way to submit a thin jar with build.sbt-like file specifying dependencies, and then dependencies are automatically resolved across the cluster before the actual job is launched. I googled, except nothing related was found. Is this plausible, or is there other better ways to achieve the same goal? BEST REGARDS, Todd Leo
Can Dependencies Be Resolved on Spark Cluster?
Hey Spark Users, I'm writing a demo with Spark and HBase. What I've done is packaging a **fat jar**: place dependencies in `build.sbt`, and use `sbt assembly` to package **all dependencies** into one big jar. The rest work is copy the fat jar to Spark master node and then launch by `spark-submit`. The defect of the fat jar fashion is obvious: all dependencies is packed, yielding a huge jar file. Even worse, in my case, a vast amount of the conflicting package files in `~/.ivy/cache`fails when merging, I had to manually specify `MergingStrategy` as `rename` for all conflicting files to bypass this issue. Then I thought, there should exists an easier way to submit a thin jar with build.sbt-like file specifying dependencies, and then dependencies are automatically resolved across the cluster before the actual job is launched. I googled, except nothing related was found. Is this plausible, or is there other better ways to achieve the same goal? BEST REGARDS, Todd Leo
Reading Really Big File Stream from HDFS
Hi Spark Users, I'm trying to load a literally big file (50GB when compressed as gzip file, stored in HDFS) by receiving a DStream using `ssc.textFileStream`, as this file cannot be fitted in my memory. However, it looks like no RDD will be received until I copy this big file to a prior-specified location on HDFS. Ideally, I'd like read this file by a small number of lines at a time, but receiving a file stream requires additional writing to HDFS. Any idea to achieve this? BEST REGARDS, Todd Leo
Re: Reading Really Big File Stream from HDFS
Hmm, you have a good point. So should I load the file by `sc.textFile()` and specify a high number of partitions, and the file is then split into partitions in memory across the cluster? On Thu, Jun 11, 2015 at 9:27 PM ayan guha guha.a...@gmail.com wrote: Why do you need to use stream in this use case? 50g need not to be in memory. Give it a try with high number of partitions. On 11 Jun 2015 23:09, SLiZn Liu sliznmail...@gmail.com wrote: Hi Spark Users, I'm trying to load a literally big file (50GB when compressed as gzip file, stored in HDFS) by receiving a DStream using `ssc.textFileStream`, as this file cannot be fitted in my memory. However, it looks like no RDD will be received until I copy this big file to a prior-specified location on HDFS. Ideally, I'd like read this file by a small number of lines at a time, but receiving a file stream requires additional writing to HDFS. Any idea to achieve this? BEST REGARDS, Todd Leo
Re: DataFrame Column Alias problem
However this returns a single column of c, without showing the original col1 . On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha sriharsha@gmail.com wrote: df.groupBy($col1).agg(count($col1).as(c)).show On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com wrote: Hi Spark Users Group, I’m doing groupby operations on my DataFrame *df* as following, to get count for each value of col1: df.groupBy(col1).agg(col1 - count).show // I don't know if I should write like this. col1 COUNT(col1#347) aaa2 bbb4 ccc4 ... and more... As I ‘d like to sort by the resulting count, with .sort(COUNT(col1#347)), but the column name of the count result obviously cannot be retrieved in advance. Intuitively one might consider acquire column name by column index in a fashion of R’s DataFrame, except Spark doesn’t support. I have Googled *spark agg alias* and so forth, and checked DataFrame.as in Spark API, neither helped on this. Am I the only one who had ever got stuck on this issue or anything I have missed? REGARDS, Todd Leo
Re: DataFrame Column Alias problem
Despite the odd usage, it does the trick, thanks Reynold! On Fri, May 22, 2015 at 2:47 PM Reynold Xin r...@databricks.com wrote: In 1.4 it actually shows col1 by default. In 1.3, you can add col1 to the output, i.e. df.groupBy($col1).agg($col1, count($col1).as(c)).show() On Thu, May 21, 2015 at 11:22 PM, SLiZn Liu sliznmail...@gmail.com wrote: However this returns a single column of c, without showing the original col1. On Thu, May 21, 2015 at 11:25 PM Ram Sriharsha sriharsha@gmail.com wrote: df.groupBy($col1).agg(count($col1).as(c)).show On Thu, May 21, 2015 at 3:09 AM, SLiZn Liu sliznmail...@gmail.com wrote: Hi Spark Users Group, I’m doing groupby operations on my DataFrame *df* as following, to get count for each value of col1: df.groupBy(col1).agg(col1 - count).show // I don't know if I should write like this. col1 COUNT(col1#347) aaa2 bbb4 ccc4 ... and more... As I ‘d like to sort by the resulting count, with .sort(COUNT(col1#347)), but the column name of the count result obviously cannot be retrieved in advance. Intuitively one might consider acquire column name by column index in a fashion of R’s DataFrame, except Spark doesn’t support. I have Googled *spark agg alias* and so forth, and checked DataFrame.as in Spark API, neither helped on this. Am I the only one who had ever got stuck on this issue or anything I have missed? REGARDS, Todd Leo
DataFrame Column Alias problem
Hi Spark Users Group, I’m doing groupby operations on my DataFrame *df* as following, to get count for each value of col1: df.groupBy(col1).agg(col1 - count).show // I don't know if I should write like this. col1 COUNT(col1#347) aaa2 bbb4 ccc4 ... and more... As I ‘d like to sort by the resulting count, with .sort(COUNT(col1#347)), but the column name of the count result obviously cannot be retrieved in advance. Intuitively one might consider acquire column name by column index in a fashion of R’s DataFrame, except Spark doesn’t support. I have Googled *spark agg alias* and so forth, and checked DataFrame.as in Spark API, neither helped on this. Am I the only one who had ever got stuck on this issue or anything I have missed? REGARDS, Todd Leo
Re: value toDF is not a member of RDD object
No, creating DF using createDataFrame won’t work: val peopleDF = sqlContext.createDataFrame(people) the code can be compiled but raised the same error as toDF at the line above. On Wed, May 13, 2015 at 6:22 PM Sebastian Alfers [sebastian.alf...@googlemail.com](mailto:sebastian.alf...@googlemail.com) http://mailto:[sebastian.alf...@googlemail.com](mailto:sebastian.alf...@googlemail.com) wrote: I use: val conf = new SparkConf()... val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rdd: RDD[...] = ... val schema: StructType = ... sqlContext.createDataFrame(rdd, schema) 2015-05-13 12:00 GMT+02:00 SLiZn Liu sliznmail...@gmail.com: Additionally, after I successfully packaged the code, and submitted via spark-submit webcat_2.11-1.0.jar, the following error was thrown at the line where toDF() been called: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at WebcatApp$.main(webcat.scala:49) at WebcatApp.main(webcat.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Unsurprisingly, if I remove toDF, no error occurred. I have moved the case class definition outside of main but inside the outer object scope, and removed the provided specification in build.sbt. However, when I tried *Dean Wampler*‘s suggestion of using sc.createDataFrame() the compiler says this function is not a member of sc, and I cannot find any reference in the latest documents. What else should I try? REGARDS, Todd Leo On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com wrote: Thanks folks, really appreciate all your replies! I tried each of your suggestions and in particular, *Animesh*‘s second suggestion of *making case class definition global* helped me getting off the trap. Plus, I should have paste my entire code with this mail to help the diagnose. REGARDS, Todd Leo On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com wrote: It's the import statement Olivier showed that makes the method available. Note that you can also use `sc.createDataFrame(myRDD)`, without the need for the import statement. I personally prefer this approach. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com wrote: you need to instantiate a SQLContext : val sc : SparkContext = ... val sqlContext = new SQLContext(sc) import sqlContext.implicits._ Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit : I added `libraryDependencies += org.apache.spark % spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I need to import modules other than `import org.apache.spark.sql.{ Row, SQLContext }`? On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com wrote: toDF is part of spark SQL so you need Spark SQL dependency + import sqlContext.implicits._ to get the toDF method. Regards, Olivier. Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit : Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection, and got a compile error when packaging with sbt: [error] myfile.scala:30: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM I double checked my code includes import sqlContext.implicits._ after reading this post https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E on spark mailing list, even tried to use toDF(col1, col2) suggested
Fwd: value toDF is not a member of RDD object
Are you sure that you are submitting it correctly? Can you post the entire command you are using to run the .jar file via spark-submit? Ok, here it is: /opt/spark-1.3.1-bin-hadoop2.6/bin/spark-submit target/scala-2.11/webcat_2.11-1.0.jar However, on the server somehow I have to specify main class using spark-submit --class WebcatApp --verbose webcat_2.11-1.0.jar to make spark recognize the main class. The error and stack trace remains the same. -- Forwarded message - From: Animesh Baranawal animeshbarana...@gmail.com Date: Wed, May 13, 2015 at 6:49 PM Subject: Re: value toDF is not a member of RDD object To: SLiZn Liu sliznmail...@gmail.com Are you sure that you are submitting it correctly? Can you post the entire command you are using to run the .jar file via spark-submit? On Wed, May 13, 2015 at 4:07 PM, SLiZn Liu sliznmail...@gmail.com wrote: No, creating DF using createDataFrame won’t work: val peopleDF = sqlContext.createDataFrame(people) the code can be compiled but raised the same error as toDF at the line above. On Wed, May 13, 2015 at 6:22 PM Sebastian Alfers [sebastian.alf...@googlemail.com](mailto:sebastian.alf...@googlemail.com) http://mailto:%5bsebastian.alf...@googlemail.com%5D(mailto:sebastian.alf...@googlemail.com) wrote: I use: val conf = new SparkConf()... val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rdd: RDD[...] = ... val schema: StructType = ... sqlContext.createDataFrame(rdd, schema) 2015-05-13 12:00 GMT+02:00 SLiZn Liu sliznmail...@gmail.com: Additionally, after I successfully packaged the code, and submitted via spark-submit webcat_2.11-1.0.jar, the following error was thrown at the line where toDF() been called: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at WebcatApp$.main(webcat.scala:49) at WebcatApp.main(webcat.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Unsurprisingly, if I remove toDF, no error occurred. I have moved the case class definition outside of main but inside the outer object scope, and removed the provided specification in build.sbt. However, when I tried *Dean Wampler*‘s suggestion of using sc.createDataFrame() the compiler says this function is not a member of sc, and I cannot find any reference in the latest documents. What else should I try? REGARDS, Todd Leo On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com wrote: Thanks folks, really appreciate all your replies! I tried each of your suggestions and in particular, *Animesh*‘s second suggestion of *making case class definition global* helped me getting off the trap. Plus, I should have paste my entire code with this mail to help the diagnose. REGARDS, Todd Leo On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com wrote: It's the import statement Olivier showed that makes the method available. Note that you can also use `sc.createDataFrame(myRDD)`, without the need for the import statement. I personally prefer this approach. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com wrote: you need to instantiate a SQLContext : val sc : SparkContext = ... val sqlContext = new SQLContext(sc) import sqlContext.implicits._ Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit : I added `libraryDependencies += org.apache.spark % spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I need to import modules other than `import org.apache.spark.sql.{ Row, SQLContext }`? On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com wrote: toDF is part of spark SQL so you need Spark SQL dependency + import sqlContext.implicits._ to get the toDF method. Regards, Olivier. Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit : Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide https://spark.apache.org/docs/latest/sql
Re: value toDF is not a member of RDD object
Additionally, after I successfully packaged the code, and submitted via spark-submit webcat_2.11-1.0.jar, the following error was thrown at the line where toDF() been called: Exception in thread main java.lang.NoSuchMethodError: scala.reflect.api.JavaUniverse.runtimeMirror(Ljava/lang/ClassLoader;)Lscala/reflect/api/JavaUniverse$JavaMirror; at WebcatApp$.main(webcat.scala:49) at WebcatApp.main(webcat.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Unsurprisingly, if I remove toDF, no error occurred. I have moved the case class definition outside of main but inside the outer object scope, and removed the provided specification in build.sbt. However, when I tried *Dean Wampler*‘s suggestion of using sc.createDataFrame() the compiler says this function is not a member of sc, and I cannot find any reference in the latest documents. What else should I try? REGARDS, Todd Leo On Wed, May 13, 2015 at 11:27 AM SLiZn Liu sliznmail...@gmail.com wrote: Thanks folks, really appreciate all your replies! I tried each of your suggestions and in particular, *Animesh*‘s second suggestion of *making case class definition global* helped me getting off the trap. Plus, I should have paste my entire code with this mail to help the diagnose. REGARDS, Todd Leo On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com wrote: It's the import statement Olivier showed that makes the method available. Note that you can also use `sc.createDataFrame(myRDD)`, without the need for the import statement. I personally prefer this approach. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com wrote: you need to instantiate a SQLContext : val sc : SparkContext = ... val sqlContext = new SQLContext(sc) import sqlContext.implicits._ Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit : I added `libraryDependencies += org.apache.spark % spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I need to import modules other than `import org.apache.spark.sql.{ Row, SQLContext }`? On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com wrote: toDF is part of spark SQL so you need Spark SQL dependency + import sqlContext.implicits._ to get the toDF method. Regards, Olivier. Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit : Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection, and got a compile error when packaging with sbt: [error] myfile.scala:30: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM I double checked my code includes import sqlContext.implicits._ after reading this post https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E on spark mailing list, even tried to use toDF(col1, col2) suggested by Xiangrui Meng in that post and got the same error. The Spark version is specified in build.sbt file as follows: scalaVersion := 2.11.6 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 % provided libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1 Anyone have ideas the cause of this error? REGARDS, Todd Leo
value toDF is not a member of RDD object
Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection, and got a compile error when packaging with sbt: [error] myfile.scala:30: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM I double checked my code includes import sqlContext.implicits._ after reading this post https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E on spark mailing list, even tried to use toDF(col1, col2) suggested by Xiangrui Meng in that post and got the same error. The Spark version is specified in build.sbt file as follows: scalaVersion := 2.11.6 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 % provided libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1 Anyone have ideas the cause of this error? REGARDS, Todd Leo
Re: value toDF is not a member of RDD object
Thanks folks, really appreciate all your replies! I tried each of your suggestions and in particular, *Animesh*‘s second suggestion of *making case class definition global* helped me getting off the trap. Plus, I should have paste my entire code with this mail to help the diagnose. REGARDS, Todd Leo On Wed, May 13, 2015 at 12:10 AM Dean Wampler deanwamp...@gmail.com wrote: It's the import statement Olivier showed that makes the method available. Note that you can also use `sc.createDataFrame(myRDD)`, without the need for the import statement. I personally prefer this approach. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Tue, May 12, 2015 at 9:33 AM, Olivier Girardot ssab...@gmail.com wrote: you need to instantiate a SQLContext : val sc : SparkContext = ... val sqlContext = new SQLContext(sc) import sqlContext.implicits._ Le mar. 12 mai 2015 à 12:29, SLiZn Liu sliznmail...@gmail.com a écrit : I added `libraryDependencies += org.apache.spark % spark-sql_2.11 % 1.3.1` to `build.sbt` but the error remains. Do I need to import modules other than `import org.apache.spark.sql.{ Row, SQLContext }`? On Tue, May 12, 2015 at 5:56 PM Olivier Girardot ssab...@gmail.com wrote: toDF is part of spark SQL so you need Spark SQL dependency + import sqlContext.implicits._ to get the toDF method. Regards, Olivier. Le mar. 12 mai 2015 à 11:36, SLiZn Liu sliznmail...@gmail.com a écrit : Hi User Group, I’m trying to reproduce the example on Spark SQL Programming Guide https://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection, and got a compile error when packaging with sbt: [error] myfile.scala:30: value toDF is not a member of org.apache.spark.rdd.RDD[Person] [error] val people = sc.textFile(examples/src/main/resources/people.txt).map(_.split(,)).map(p = Person(p(0), p(1).trim.toInt)).toDF() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed [error] Total time: 3 s, completed May 12, 2015 4:11:53 PM I double checked my code includes import sqlContext.implicits._ after reading this post https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3c1426522113299-22083.p...@n3.nabble.com%3E on spark mailing list, even tried to use toDF(col1, col2) suggested by Xiangrui Meng in that post and got the same error. The Spark version is specified in build.sbt file as follows: scalaVersion := 2.11.6 libraryDependencies += org.apache.spark % spark-core_2.11 % 1.3.1 % provided libraryDependencies += org.apache.spark % spark-mllib_2.11 % 1.3.1 Anyone have ideas the cause of this error? REGARDS, Todd Leo
OutOfMemoryError when using DataFrame created by Spark SQL
Hi, I am using *Spark SQL* to query on my *Hive cluster*, following Spark SQL and DataFrame Guide https://spark.apache.org/docs/latest/sql-programming-guide.html step by step. However, my HiveQL via sqlContext.sql() fails and java.lang.OutOfMemoryError was raised. The expected result of such query is considered to be small (by adding limit 1000 clause). My code is shown below: scala import sqlContext.implicits._ scala val df = sqlContext.sql(select * from some_table where logdate=2015-03-24 limit 1000) and the error msg: [ERROR] [03/25/2015 16:08:22.379] [sparkDriver-scheduler-27] [ActorSystem(sparkDriver)] Uncaught fatal error from thread [sparkDriver-scheduler-27] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded the master heap memory is set by -Xms512m -Xmx512m, while workers set by -Xms4096M -Xmx4096M, which I presume sufficient for this trivial query. Additionally, after restarted the spark-shell and re-run the limit 5 query , the df object is returned and can be printed by df.show(), but other APIs fails on OutOfMemoryError, namely, df.count(), df.select(some_field).show() and so forth. I understand that the RDD can be collected to master hence further transmutations can be applied, as DataFrame has “richer optimizations under the hood” and the convention from an R/julia user, I really hope this error is able to be tackled, and DataFrame is robust enough to depend. Thanks in advance! REGARDS, Todd