Re: [discuss] dropping Python 2.6 support

2016-01-09 Thread Sasha Kacanski
+1
Companies that use stock python in redhat 2.6 will need to upgrade or
install fresh version wich is total of 3.5 minutes so no issues ...

On Tue, Jan 5, 2016 at 2:17 AM, Reynold Xin  wrote:

> Does anybody here care about us dropping support for Python 2.6 in Spark
> 2.0?
>
> Python 2.6 is ancient, and is pretty slow in many aspects (e.g. json
> parsing) when compared with Python 2.7. Some libraries that Spark depend on
> stopped supporting 2.6. We can still convince the library maintainers to
> support 2.6, but it will be extra work. I'm curious if anybody still uses
> Python 2.6 to run Spark.
>
> Thanks.
>
>
>


-- 
Aleksandar Kacanski


Re: pyspark: conditionals inside functions

2016-01-09 Thread Maciej Szymkiewicz


On 01/09/2016 04:45 AM, Franc Carter wrote:
>
> Hi,
>
> I'm trying to write a short function that returns the last sunday of
> the week of a given date, code below
>
> def getSunday(day):
>
> day = day.cast("date")
>
> sun = next_day(day, "Sunday")
>
> n = datediff(sun,day)
>
> if (n == 7):
>
> return day
>
> else:
>
> return sun
>
>
> this gives me
>
> ValueError: Cannot convert column into bool:
>
>
> Can someone point out what I am doing wrong

You operate on expressions so conditional should be an expression as well:

def getSunday(day):

day = day.cast("date")

sun = next_day(day, "Sunday")

n = datediff(sun,day)

return when(n == 7, day).otherwise(sun)


>
> thanks
>
>
> -- 
> Franc


signature.asc
Description: OpenPGP digital signature


Re: [discuss] dropping Python 2.6 support

2016-01-09 Thread Sean Owen
Chiming in late, but my take on this line of argument is: these
companies are welcome to keep using Spark 1.x. If anything the
argument here is about how long to maintain 1.x, and indeed, it's
going to go dormant quite soon.

But using RHEL 6 (or any old-er version of any platform) and not
wanting to update already means you prefer stability more than change.
I don't receive an expectation that major releases of major things
support older major releases of other things.

Conversely: supporting something in Spark 2.x means making sure
nothing breaks compatibility with it for a couple years. This is
effort than can be spent elsewhere; this has to be weighed.

(For similar reasons I personally don't favor supporting Java 7 or
Scala 2.10 in Spark 2.x.)

On Tue, Jan 5, 2016 at 7:07 PM, Koert Kuipers  wrote:
> rhel/centos 6 ships with python 2.6, doesnt it?
>
> if so, i still know plenty of large companies where python 2.6 is the only
> option. asking them for python 2.7 is not going to work
>
> so i think its a bad idea
>
> On Tue, Jan 5, 2016 at 1:52 PM, Juliet Hougland 
> wrote:
>>
>> I don't see a reason Spark 2.0 would need to support Python 2.6. At this
>> point, Python 3 should be the default that is encouraged.
>> Most organizations acknowledge the 2.7 is common, but lagging behind the
>> version they should theoretically use. Dropping python 2.6
>> support sounds very reasonable to me.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



broadcast params to workers at the very beginning

2016-01-09 Thread octavian.ganea
Hi,

In my app, I have a Params scala object that keeps all the specific
(hyper)parameters of my program. This object is read in each worker. I would
like to be able to pass specific values of the Params' fields in the command
line. One way would be to simply update all the fields of the Params object
using the values in the command line arguments. However, this will only
update the Params local copy at the master node, while the worker nodes will
still use the default Params version that is broadcasted by default at the
very beginning of the program.

Does anyone have an idea of how can I change at runtime the parameters of a
specific object for each of its copies located at each worker ?

Thanks, 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-params-to-workers-at-the-very-beginning-tp25927.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to merge two large table and remove duplicates?

2016-01-09 Thread Ted Yu
See the first half of this wiki:

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+LZO

> On Jan 9, 2016, at 1:02 AM, Gavin Yue  wrote:
> 
> So I tried to set the parquet compression codec to lzo, but hadoop does not 
> have the lzo natives, while lz4 does included. 
> But I could set the code to lz4, it only accepts lzo. 
> 
> Any solution here?
> 
> Thank,
> Gavin
> 
> 
> 
>> On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue  wrote:
>> I saw in the document, the value is LZO.Is it LZO or LZ4? 
>> 
>> https://github.com/Cyan4973/lz4
>> 
>> Based on this benchmark, they differ quite a lot. 
>> 
>> 
>> 
>>> On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu  wrote:
>>> gzip is relatively slow. It consumes much CPU.
>>> 
>>> snappy is faster.
>>> 
>>> LZ4 is faster than GZIP and smaller than Snappy.
>>> 
>>> Cheers
>>> 
 On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue  wrote:
 Thank you .
 
 And speaking of compression, is there big difference on performance 
 between gzip and snappy? And why parquet is using gzip by default?
 
 Thanks.
 
 
> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu  wrote:
> Cycling old bits:
> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
> 
> Gavin:
> Which release of hbase did you play with ?
> 
> HBase has been evolving and is getting more stable.
> 
> Cheers
> 
>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue  wrote:
>> I used to maintain a HBase cluster. The experience with it was not 
>> happy. 
>> 
>> I just tried query the data  from each day's first and dedup with 
>> smaller set, the performance is acceptable.  So I guess I will use this 
>> method. 
>> 
>> Again, could anyone give advice about: 
>> Automatically determine the number of reducers for joins and groupbys: 
>> Currently in Spark SQL, you need to control the degree of parallelism 
>> post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.
>> Thanks.
>> 
>> Gavin
>> 
>> 
>> 
>> 
>> 
>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu  wrote:
>>> bq. in an noSQL db such as Hbase
>>> 
>>> +1 :-)
>>> 
>>> 
 On Fri, Jan 8, 2016 at 6:25 PM, ayan guha  wrote:
 One option you may want to explore is writing event table in an noSQL 
 db such as Hbase. One inherent problem in your approach is you always 
 need to load either full data set or a defined number of partitions to 
 see if the event has already come (and no gurantee it is full proof, 
 but lead to unnecessary loading in most cases).
 
> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue  
> wrote:
> Hey, 
> Thank you for the answer. I checked the setting you mentioend they 
> are all correct.  I noticed that in the job, there are always only 
> 200 reducers for shuffle read, I believe it is setting in the sql 
> shuffle parallism. 
> 
> In the doc, it mentions: 
> Automatically determine the number of reducers for joins and 
> groupbys: Currently in Spark SQL, you need to control the degree of 
> parallelism post-shuffle using “SET 
> spark.sql.shuffle.partitions=[num_tasks];”.
> 
> 
> What would be the ideal number for this setting? Is it based on the 
> hardware of cluster?
> 
> 
> 
> Thanks,
> 
> Gavin 
> 
> 
>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang  
>> wrote:
> 
>> I assume your parquet files are compressed. Gzip or Snappy?
>> What spark version did you use? It seems at least 1.4. If you use 
>> spark-sql and tungsten, you might have better performance. but spark 
>> 1.5.2 gave me a wrong result when the data was about 300~400GB, just 
>> for a simple group-by and aggregate.
>> Did you use kyro serialization? 
>> you should have spark.shuffle.compress=true, verify it.
>> How many tasks did you use? spark.default.parallelism=? 
>> What about this: 
>> Read the data day by day
>> compute a bucket id from timestamp, e.g., the date and hour
>> Write into different buckets (you probably need a special writer to 
>> write data efficiently without shuffling the data).
>> distinct for each bucket. Because each bucket is small, spark can 
>> get it done faster than having everything in one run.
>> I think using groupBy (userId, timestamp) might be better than 
>> distinct. I guess distinct() will compare every field. 
>> 
>>> On Fri, Jan 8, 2016 at 2:31 PM, 

Re: [discuss] dropping Python 2.6 support

2016-01-09 Thread Jacek Laskowski
On Sat, Jan 9, 2016 at 1:48 PM, Sean Owen  wrote:

> (For similar reasons I personally don't favor supporting Java 7 or
> Scala 2.10 in Spark 2.x.)

That reflects my sentiments as well. Thanks Sean for bringing that up!

Jacek

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: pyspark: conditionals inside functions

2016-01-09 Thread Franc Carter
My Python is not particularly good, so I'm afraid I don't understand what
that mean

cheers


On 9 January 2016 at 14:45, Franc Carter  wrote:

>
> Hi,
>
> I'm trying to write a short function that returns the last sunday of the
> week of a given date, code below
>
> def getSunday(day):
>
> day = day.cast("date")
>
> sun = next_day(day, "Sunday")
>
> n = datediff(sun,day)
>
> if (n == 7):
>
> return day
>
> else:
>
> return sun
>
>
> this gives me
>
> ValueError: Cannot convert column into bool:
>
>
> Can someone point out what I am doing wrong
>
> thanks
>
>
> --
> Franc
>



-- 
Franc


pyspark: calculating row deltas

2016-01-09 Thread Franc Carter
Hi,

I have a DataFrame with the columns

 ID,Year,Value

I'd like to create a new Column that is Value2-Value1 where the
corresponding Year2=Year-1

At the moment I am creating  a new DataFrame with renamed columns and doing

   DF.join(DF2, . . . .)

 This looks cumbersome to me, is there abtter way ?

thanks


-- 
Franc


Best IDE Configuration

2016-01-09 Thread Jorge Machado
Hello everyone, 


I´m just wondering how do you guys develop for spark. 

For example I cannot find any decent documentation for connecting Spark to 
Eclipse using maven or sbt. 

Is there any link around ? 


Jorge
thanks 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark access old version of Hadoop 2.1.0 and Hive version 0.11

2016-01-09 Thread Jade Liu
Hi, All:

I'm trying to read and write from the hdfs cluster using SparkSQL hive context. 
My current build of spark is 1.5.2.  The problem is that currently our company 
has very old version of hdfs (hadoop 2.1.0) and hive metastore (0.11) using 
Hortonworks bundle.

One of the possible solution is to establish a separate cluster running hadoop 
2.6.0 and hive >0.12. But is it still possible I can read data from old hdfs 
cluster and write to new cluster ? Or the only solution is to upgrade my 
Hortonworks bundle ? Will that impact current Hive tables ?

Here is the versions on the hdfs cluster:
Hue 2.2.0:67
HDP 2.0.5
Hadoop 2.1.0
HCatalog 0.11.0
Pig 0.11.2
Hive 0.11.0
Oozie 4.0.0


Thanks a lot,

Jade






Re: Best IDE Configuration

2016-01-09 Thread Ted Yu
Please take a look at:
https://cwiki.apache.org/confluence/display/SPARK/
Useful+Developer+Tools#UsefulDeveloperTools-IDESetup

On Sat, Jan 9, 2016 at 11:16 AM, Jorge Machado  wrote:

> Hello everyone,
>
>
> I´m just wondering how do you guys develop for spark.
>
> For example I cannot find any decent documentation for connecting Spark to
> Eclipse using maven or sbt.
>
> Is there any link around ?
>
>
> Jorge
> thanks
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: org.apache.spark.storage.BlockNotFoundException in Spark1.5.2+Tachyon0.7.1

2016-01-09 Thread Gene Pang
Yes, the tiered storage feature in Tachyon can address this issue. Here is
a link to more information:
http://tachyon-project.org/documentation/Tiered-Storage-on-Tachyon.html

Thanks,
Gene

On Wed, Jan 6, 2016 at 8:44 PM, Ted Yu  wrote:

> Have you seen this thread ?
>
> http://search-hadoop.com/m/q3RTtAiQta22XrCI
>
> On Wed, Jan 6, 2016 at 8:41 PM, Jia Zou  wrote:
>
>> Dear all,
>>
>> I am using Spark1.5.2 and Tachyon0.7.1 to run KMeans with
>> inputRDD.persist(StorageLevel.OFF_HEAP()).
>>
>> I've set tired storage for Tachyon. It is all right when working set is
>> smaller than available memory. However, when working set exceeds available
>> memory, I keep getting errors like below:
>>
>> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.1 in stage
>> 0.0 (TID 206) on executor 10.149.11.81: java.lang.RuntimeException
>> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found
>>
>> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 191.1 in stage
>> 0.0 (TID 207) on executor 10.149.11.81: java.lang.RuntimeException
>> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_191 not found
>>
>> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.2 in stage
>> 0.0 (TID 208) on executor 10.149.11.81: java.lang.RuntimeException
>> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found
>>
>> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 191.2 in stage
>> 0.0 (TID 209) on executor 10.149.11.81: java.lang.RuntimeException
>> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_191 not found
>>
>> 16/01/07 04:18:53 INFO scheduler.TaskSetManager: Lost task 197.3 in stage
>> 0.0 (TID 210) on executor 10.149.11.81: java.lang.RuntimeException
>> (org.apache.spark.storage.BlockNotFoundException: Block rdd_1_197 not found
>>
>>
>> Can any one give me some suggestions? Thanks a lot!
>>
>>
>> Best Regards,
>> Jia
>>
>
>


java.lang.NoClassDefFoundError even when use sc.addJar

2016-01-09 Thread rayqiu
Code:

val sc = new SparkContext(sparkConf)
   
sc.addJar("/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar")

>spark-submit --class "GeoIP" target/scala-2.10/geoip-assembly-1.0.jar

Show jar added:
16/01/09 16:05:20 INFO SparkContext: Added JAR
/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-streaming-kafka-assembly_2.10-1.6.0.jar
at
http://192.168.8.107:59070/jars/spark-streaming-kafka-assembly_2.10-1.6.0.jar
with timestamp 1452384320186

But still gave an error later:
Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/kafka/KafkaUtils$

While adding --jars to command line option ran fine without any problem:

> spark-submit --class "GeoIP" --jars
> spark-streaming-kafka-assembly_2.10-1.6.0.jar
> target/scala-2.10/geoip-assembly-1.0.jar

I understand that addJar does not help the spark shell, but this is not the
case.  Can someone please help to explain this?  Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-NoClassDefFoundError-even-when-use-sc-addJar-tp25928.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



StandardScaler in spark.ml.feature requires vector input?

2016-01-09 Thread Kristina Rogale Plazonic
Hi,

The code below gives me an unexpected result. I expected that
StandardScaler (in ml, not mllib) will take a specified column of an input
dataframe and subtract the mean of the column and divide the difference by
the standard deviation of the dataframe column.

However, Spark gives me the error that the input column must be of type
vector. This shouldn't be the case, as the StandardScaler should transform
a numeric column (not vector column) to numeric column, right?  (The
offending line in Spark source code
).
Am I missing something?

Reproducing the error (python's sklearn example
):

val ccdf = sqlContext.createDataFrame( Seq(
  ( 1.0, -1.0,  2.0),
  ( 2.0,  0.0,  0.0),
  ( 0.0,  1.0, -1.0)
  )).toDF("c1", "c2", "c3")

val newccdf = new StandardScaler()
  .setInputCol("c1")
  .setOutputCol("c1_norm")
  .setWithMean(true)
  .setWithStd(true)
  .fit(ccdf)
  .transform(ccdf)

The error output: (spark-shell, Spark 1.5.2)

java.lang.IllegalArgumentException: requirement failed: Input column c1
must be a vector column
(.)

Thanks!
Kristina


Re: pyspark: conditionals inside functions

2016-01-09 Thread Franc Carter
Got it, I needed to use the when/otherwise construct - code below

def getSunday(day):

day = day.cast("date")

sun = next_day(day, "Sunday")

n = datediff(sun,day)

x = when(n==7,day).otherwise(sun)

return x


On 10 January 2016 at 08:41, Franc Carter  wrote:

>
> My Python is not particularly good, so I'm afraid I don't understand what
> that mean
>
> cheers
>
>
> On 9 January 2016 at 14:45, Franc Carter  wrote:
>
>>
>> Hi,
>>
>> I'm trying to write a short function that returns the last sunday of the
>> week of a given date, code below
>>
>> def getSunday(day):
>>
>> day = day.cast("date")
>>
>> sun = next_day(day, "Sunday")
>>
>> n = datediff(sun,day)
>>
>> if (n == 7):
>>
>> return day
>>
>> else:
>>
>> return sun
>>
>>
>> this gives me
>>
>> ValueError: Cannot convert column into bool:
>>
>>
>> Can someone point out what I am doing wrong
>>
>> thanks
>>
>>
>> --
>> Franc
>>
>
>
>
> --
> Franc
>



-- 
Franc


Re: How to merge two large table and remove duplicates?

2016-01-09 Thread Gavin Yue
I saw in the document, the value is LZO.Is it LZO or LZ4?

https://github.com/Cyan4973/lz4

Based on this benchmark, they differ quite a lot.



On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu  wrote:

> gzip is relatively slow. It consumes much CPU.
>
> snappy is faster.
>
> LZ4 is faster than GZIP and smaller than Snappy.
>
> Cheers
>
> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue  wrote:
>
>> Thank you .
>>
>> And speaking of compression, is there big difference on performance
>> between gzip and snappy? And why parquet is using gzip by default?
>>
>> Thanks.
>>
>>
>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu  wrote:
>>
>>> Cycling old bits:
>>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>>
>>> Gavin:
>>> Which release of hbase did you play with ?
>>>
>>> HBase has been evolving and is getting more stable.
>>>
>>> Cheers
>>>
>>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue 
>>> wrote:
>>>
 I used to maintain a HBase cluster. The experience with it was not
 happy.

 I just tried query the data  from each day's first and dedup with
 smaller set, the performance is acceptable.  So I guess I will use this
 method.

 Again, could anyone give advice about:

- Automatically determine the number of reducers for joins and
groupbys: Currently in Spark SQL, you need to control the degree of
parallelism post-shuffle using “SET
spark.sql.shuffle.partitions=[num_tasks];”.

 Thanks.

 Gavin




 On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu  wrote:

> bq. in an noSQL db such as Hbase
>
> +1 :-)
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha  wrote:
>
>> One option you may want to explore is writing event table in an noSQL
>> db such as Hbase. One inherent problem in your approach is you always 
>> need
>> to load either full data set or a defined number of partitions to see if
>> the event has already come (and no gurantee it is full proof, but lead to
>> unnecessary loading in most cases).
>>
>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue 
>> wrote:
>>
>>> Hey,
>>> Thank you for the answer. I checked the setting you mentioend they
>>> are all correct.  I noticed that in the job, there are always only 200
>>> reducers for shuffle read, I believe it is setting in the sql shuffle
>>> parallism.
>>>
>>> In the doc, it mentions:
>>>
>>>- Automatically determine the number of reducers for joins and
>>>groupbys: Currently in Spark SQL, you need to control the degree of
>>>parallelism post-shuffle using “SET
>>>spark.sql.shuffle.partitions=[num_tasks];”.
>>>
>>>
>>> What would be the ideal number for this setting? Is it based on the
>>> hardware of cluster?
>>>
>>>
>>> Thanks,
>>>
>>> Gavin
>>>
>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang 
>>> wrote:
>>>

- I assume your parquet files are compressed. Gzip or Snappy?
- What spark version did you use? It seems at least 1.4. If you
use spark-sql and tungsten, you might have better performance. but 
 spark
1.5.2 gave me a wrong result when the data was about 300~400GB, 
 just for a
simple group-by and aggregate.
- Did you use kyro serialization?
- you should have spark.shuffle.compress=true, verify it.
- How many tasks did you use? spark.default.parallelism=?
- What about this:
   - Read the data day by day
   - compute a bucket id from timestamp, e.g., the date and hour
   - Write into different buckets (you probably need a special
   writer to write data efficiently without shuffling the data).
   - distinct for each bucket. Because each bucket is small,
   spark can get it done faster than having everything in one run.
   - I think using groupBy (userId, timestamp) might be better
   than distinct. I guess distinct() will compare every field.


 On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue 
 wrote:

> And the most frequent operation I am gonna do is find the UserID
> who have some events, then retrieve all the events associted with the
> UserID.
>
> In this case, how should I partition to speed up the process?
>
> Thanks.
>
> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue 
> wrote:
>
>> hey Ted,
>>
>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>> MetaData.  I just parse it from Json and 

Re: How to merge two large table and remove duplicates?

2016-01-09 Thread Gavin Yue
So I tried to set the parquet compression codec to lzo, but hadoop does not
have the lzo natives, while lz4 does included.
But I could set the code to lz4, it only accepts lzo.

Any solution here?

Thank,
Gavin



On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue  wrote:

> I saw in the document, the value is LZO.Is it LZO or LZ4?
>
> https://github.com/Cyan4973/lz4
>
> Based on this benchmark, they differ quite a lot.
>
>
>
> On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu  wrote:
>
>> gzip is relatively slow. It consumes much CPU.
>>
>> snappy is faster.
>>
>> LZ4 is faster than GZIP and smaller than Snappy.
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue  wrote:
>>
>>> Thank you .
>>>
>>> And speaking of compression, is there big difference on performance
>>> between gzip and snappy? And why parquet is using gzip by default?
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu  wrote:
>>>
 Cycling old bits:
 http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ

 Gavin:
 Which release of hbase did you play with ?

 HBase has been evolving and is getting more stable.

 Cheers

 On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue 
 wrote:

> I used to maintain a HBase cluster. The experience with it was not
> happy.
>
> I just tried query the data  from each day's first and dedup with
> smaller set, the performance is acceptable.  So I guess I will use this
> method.
>
> Again, could anyone give advice about:
>
>- Automatically determine the number of reducers for joins and
>groupbys: Currently in Spark SQL, you need to control the degree of
>parallelism post-shuffle using “SET
>spark.sql.shuffle.partitions=[num_tasks];”.
>
> Thanks.
>
> Gavin
>
>
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu  wrote:
>
>> bq. in an noSQL db such as Hbase
>>
>> +1 :-)
>>
>>
>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha 
>> wrote:
>>
>>> One option you may want to explore is writing event table in an
>>> noSQL db such as Hbase. One inherent problem in your approach is you 
>>> always
>>> need to load either full data set or a defined number of partitions to 
>>> see
>>> if the event has already come (and no gurantee it is full proof, but 
>>> lead
>>> to unnecessary loading in most cases).
>>>
>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue 
>>> wrote:
>>>
 Hey,
 Thank you for the answer. I checked the setting you mentioend they
 are all correct.  I noticed that in the job, there are always only 200
 reducers for shuffle read, I believe it is setting in the sql shuffle
 parallism.

 In the doc, it mentions:

- Automatically determine the number of reducers for joins and
groupbys: Currently in Spark SQL, you need to control the degree of
parallelism post-shuffle using “SET
spark.sql.shuffle.partitions=[num_tasks];”.


 What would be the ideal number for this setting? Is it based on the
 hardware of cluster?


 Thanks,

 Gavin

 On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang 
 wrote:

>
>- I assume your parquet files are compressed. Gzip or Snappy?
>- What spark version did you use? It seems at least 1.4. If
>you use spark-sql and tungsten, you might have better performance. 
> but
>spark 1.5.2 gave me a wrong result when the data was about 
> 300~400GB, just
>for a simple group-by and aggregate.
>- Did you use kyro serialization?
>- you should have spark.shuffle.compress=true, verify it.
>- How many tasks did you use? spark.default.parallelism=?
>- What about this:
>   - Read the data day by day
>   - compute a bucket id from timestamp, e.g., the date and
>   hour
>   - Write into different buckets (you probably need a special
>   writer to write data efficiently without shuffling the data).
>   - distinct for each bucket. Because each bucket is small,
>   spark can get it done faster than having everything in one run.
>   - I think using groupBy (userId, timestamp) might be better
>   than distinct. I guess distinct() will compare every field.
>
>
> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue 
> wrote:
>
>> And the most frequent operation I am gonna do is find the UserID
>> 

Re: How to merge two large table and remove duplicates?

2016-01-09 Thread Sayan Sanyal
Unsubscribe

Sent from Outlook Mobile

_
From: Gavin Yue 
Sent: Saturday, January 9, 2016 14:33
Subject: Re: How to merge two large table and remove duplicates?
To: Ted Yu 
Cc: Benyi Wang , user , ayan guha 



 So I tried to set the parquet compression codec to 
lzo, but hadoop does not have the lzo natives, while lz4 does included.
  But I could set the code to lz4, it only accepts lzo.   
  
 Any solution here? 
 
Thank,
   Gavin   

  
 
   On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue  
wrote:
   I saw in the document, the value is LZO. 
   Is it LZO or LZ4?  

  https://github.com/Cyan4973/lz4   
   
  Based on this benchmark, they differ quite a lot.   
  
  
   
 On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu  
 wrote: 
  gzip is relatively slow. It consumes much CPU.
   
  snappy is faster. 
 
  LZ4 is faster than GZIP   
  and smaller than Snappy.
  
  Cheers
   
   On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue
    wrote:  

Thank you .  
  
 And speaking of compression, is there big difference on 
performance between gzip and snappy? And why parquet is using gzip by default?  
   
 
Thanks.


 
 On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu 
    wrote:   
Cycling old bits:   

http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ  

Gavin:  
  Which release of hbase 
did you play with ? 
   
HBase has been 
evolving and is getting more stable.

Cheers  

 
   On Fri, Jan 8, 2016 at 6:29 PM, 
Gavin Yue  wrote:   
 

  I used to maintain a HBase 
cluster. The experience with it was not happy. 

   I just tried query the data  from each day's first 
and dedup with smaller set, the performance is acceptable.  So I guess I will 
use this method.
   
  Again, could anyone give advice about:
   

 Automatically determine the number of reducers for joins and groupbys: 
Currently in Spark SQL, you need to control the degree of parallelism 
post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”. 


Thanks.  

Gavin




   
 On Fri, Jan 8, 2016 at 
6:25 PM, Ted Yu   wrote:   
  

  bq.    in an noSQL db such as Hbase