Have you considered using PairRDDFunctions.aggregateByKey
or PairRDDFunctions.reduceByKey in place of the groupBy to achieve better
performance ?

Cheers

On Sat, Apr 9, 2016 at 2:00 PM, SURAJ SHETH <shet...@gmail.com> wrote:

> Hi,
> I am using Spark 1.5.2
>
> The file contains 900K rows each with twelve fields (tab separated):
> The first 11 fields are Strings with a maximum of 20 chars each. The last
> field is a comma separated array of floats with 8,192 values.
>
> It works perfectly if I change the below code for groupBy from
> "x[0].split('\t')[1]" to "x[0]".
> The reason seems to be due to the limit of the number of values for a
> particular key in groupby. In the below code, I am expecting 500 keys with
> tens of thousands of values in a few of them. The largest key value
> pair(from groupByKey) has 53K values each having a numpy array of 8192
> floats.
> In the changed version, i.e. "groupBy(lambda x : x[0]).mapValues(", we get
> 900K keys and one value for each of them which works flawlessly.
>
> Do we have any limit on the amount of data we get for a key in groupBy?
>
> The total file size is 16 GB.
>
> The snippet is :
>
> import hashlib,re, numpy as np
>
> def getRows(z):
>     return np.asfortranarray([float(g) for g in z.split(',')])
>
> text1 = sc.textFile('/textFile.txt',480).filter(lambda x : len(x)>1000)\
>         .map(lambda x : x.rsplit('\t',1)).map(lambda x :
> [x[0],getRows(x[1])]).cache()\
>         .groupBy(lambda x : x[0].split('\t')[1]).mapValues(lambda x :
> list(x)).cache()
>
> text1.count()
>
> Thanks and Regards,
> Suraj Sheth
>
> On Sun, Apr 10, 2016 at 1:19 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> The value was out of the range of integer.
>>
>> Which Spark release are you using ?
>>
>> Can you post snippet of code which can reproduce the error ?
>>
>> Thanks
>>
>> On Sat, Apr 9, 2016 at 12:25 PM, SURAJ SHETH <shet...@gmail.com> wrote:
>>
>>> I am trying to perform some processing and cache and count the RDD.
>>> Any solutions?
>>>
>>> Seeing a weird error :
>>>
>>> File 
>>> "/mnt/yarn/usercache/hadoop/appcache/application_1456909219314_0014/container_1456909219314_0014_01_000004/pyspark.zip/pyspark/serializers.py",
>>>  line 550, in write_int
>>>     stream.write(struct.pack("!i", value))
>>> error: 'i' format requires -2147483648 <= number <= 2147483647
>>>
>>>     at 
>>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>>>     at 
>>> org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
>>>     at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>>>     at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
>>>
>>>
>>> Thanks and Regards,
>>>
>>> Suraj Sheth
>>>
>>>
>>
>

Reply via email to