Re: Replacing groupBykey() with reduceByKey()

2018-08-08 Thread Biplob Biswas
Hi Santhosh,

My name is not Bipin, its Biplob as is clear from my Signature.

Regarding your question, I have no clue what your map operation is doing on
the grouped data, so I can only suggest you to do :

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x:
(x[0],x)).reduceByKey(build_edges, 25)

Although based on the return type you would have to modify your build_edges
function.

Thanks & Regards
Biplob Biswas


On Mon, Aug 6, 2018 at 6:28 PM Bathi CCDB  wrote:

> Hey Bipin,
> Thanks for the reply, I am actually aggregating after the groupByKey() 
> operation,
> I have posted the wrong code snippet in my first email. Here is what I am
> doing
>
> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: 
> (x[0],x)).groupByKey(25).map(build_edges)
>
> Can we replace reduceByKey() in this context ?
>
> Santhosh
>
>
> On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas 
> wrote:
>
>> Hi Santhosh,
>>
>> If you are not performing any aggregation, then I don't think you can
>> replace your groupbykey with a reducebykey, and as I see you are only
>> grouping and taking 2 values of the result, thus I believe you can't just
>> replace your groupbykey with that.
>>
>> Thanks & Regards
>> Biplob Biswas
>>
>>
>> On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB  wrote:
>>
>>> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark
>>> and python newbie and I am having a hard time figuring out the lambda
>>> function for the reduceByKey() operation.
>>>
>>> Here is the code
>>>
>>> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: 
>>> (x[0],x)).groupByKey(25).take(2)
>>>
>>> Here is the return value
>>>
>>> >>> dd[(u'KEY_1', >> >>> 0x107be0c50>), (u'KEY_2', >> >>> at 0x107be0c10>)]
>>>
>>> and Here are the iterable contents dd[0][1]
>>>
>>> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', 
>>> value=u'e7dc1f2a')Row(key=u'KEY_1', 
>>> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', 
>>> value=u'fb0bc953')...Row(key=u'KEY_1', 
>>> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', 
>>> value=u'd39714d3')Row(key=u'KEY_1', 
>>> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')
>>>
>>> My question is how do replace with reduceByKey() and get the same
>>> output as above?
>>>
>>> Santhosh
>>>
>>
>


Re: Replacing groupBykey() with reduceByKey()

2018-08-06 Thread Bathi CCDB
Hey Bipin,
Thanks for the reply, I am actually aggregating after the groupByKey()
operation,
I have posted the wrong code snippet in my first email. Here is what I am
doing

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x:
(x[0],x)).groupByKey(25).map(build_edges)

Can we replace reduceByKey() in this context ?

Santhosh


On Mon, Aug 6, 2018 at 1:20 AM, Biplob Biswas 
wrote:

> Hi Santhosh,
>
> If you are not performing any aggregation, then I don't think you can
> replace your groupbykey with a reducebykey, and as I see you are only
> grouping and taking 2 values of the result, thus I believe you can't just
> replace your groupbykey with that.
>
> Thanks & Regards
> Biplob Biswas
>
>
> On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB  wrote:
>
>> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark
>> and python newbie and I am having a hard time figuring out the lambda
>> function for the reduceByKey() operation.
>>
>> Here is the code
>>
>> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: 
>> (x[0],x)).groupByKey(25).take(2)
>>
>> Here is the return value
>>
>> >>> dd[(u'KEY_1', > >>> 0x107be0c50>), (u'KEY_2', > >>> at 0x107be0c10>)]
>>
>> and Here are the iterable contents dd[0][1]
>>
>> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', 
>> value=u'e7dc1f2a')Row(key=u'KEY_1', 
>> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', 
>> value=u'fb0bc953')...Row(key=u'KEY_1', 
>> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', 
>> value=u'd39714d3')Row(key=u'KEY_1', 
>> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')
>>
>> My question is how do replace with reduceByKey() and get the same output
>> as above?
>>
>> Santhosh
>>
>


Re: Replacing groupBykey() with reduceByKey()

2018-08-06 Thread Biplob Biswas
Hi Santhosh,

If you are not performing any aggregation, then I don't think you can
replace your groupbykey with a reducebykey, and as I see you are only
grouping and taking 2 values of the result, thus I believe you can't just
replace your groupbykey with that.

Thanks & Regards
Biplob Biswas


On Sat, Aug 4, 2018 at 12:05 AM Bathi CCDB  wrote:

> I am trying to replace groupByKey() with reudceByKey(), I am a pyspark
> and python newbie and I am having a hard time figuring out the lambda
> function for the reduceByKey() operation.
>
> Here is the code
>
> dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x: 
> (x[0],x)).groupByKey(25).take(2)
>
> Here is the return value
>
> >>> dd[(u'KEY_1',  >>> 0x107be0c50>), (u'KEY_2',  >>> at 0x107be0c10>)]
>
> and Here are the iterable contents dd[0][1]
>
> Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79', 
> value=u'e7dc1f2a')Row(key=u'KEY_1', 
> hash_fn=u'f8891048a9ef8331227b4af080ecd28a', 
> value=u'fb0bc953')...Row(key=u'KEY_1', 
> hash_fn=u'1b9d2bb2db28603ff21052efcd13f242', 
> value=u'd39714d3')Row(key=u'KEY_1', 
> hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')
>
> My question is how do replace with reduceByKey() and get the same output
> as above?
>
> Santhosh
>


Replacing groupBykey() with reduceByKey()

2018-08-03 Thread Bathi CCDB
I am trying to replace groupByKey() with reudceByKey(), I am a pyspark and
python newbie and I am having a hard time figuring out the lambda function
for the reduceByKey() operation.

Here is the code

dd = hive_context.read.orc(orcfile_dir).rdd.map(lambda x:
(x[0],x)).groupByKey(25).take(2)

Here is the return value

>>> dd[(u'KEY_1', >> 0x107be0c50>), (u'KEY_2', >> 0x107be0c10>)]

and Here are the iterable contents dd[0][1]

Row(key=u'KEY_1', hash_fn=u'deec95d65ca6b3b4f2e1ef259040aa79',
value=u'e7dc1f2a')Row(key=u'KEY_1',
hash_fn=u'f8891048a9ef8331227b4af080ecd28a',
value=u'fb0bc953')...Row(key=u'KEY_1',
hash_fn=u'1b9d2bb2db28603ff21052efcd13f242',
value=u'd39714d3')Row(key=u'KEY_1',
hash_fn=u'c41b0269706ac423732a6bab24bf8a6a', value=u'ab58db92')

My question is how do replace with reduceByKey() and get the same output as
above?

Santhosh


Re: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

2018-07-27 Thread Dinesh Dharme
Yeah, you are right. I ran the experiments locally not on YARN.

On Fri, Jul 27, 2018 at 11:54 PM, Vadim Semenov  wrote:

> `spark.worker.cleanup.enabled=true` doesn't work for YARN.
> On Fri, Jul 27, 2018 at 8:52 AM dineshdharme 
> wrote:
> >
> > I am trying to do few (union + reduceByKey) operations on a hiearchical
> > dataset in a iterative fashion in rdd. The first few loops run fine but
> on
> > the subsequent loops, the operations ends up using the whole scratch
> space
> > provided to it.
> >
> > I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one
> > having 100 GB space.
> > The heirarchical dataset, whose size is (< 400kB), remains constant
> > throughout the iterations.
> > I have tried the worker cleanup flag but it has no effect i.e.
> > "spark.worker.cleanup.enabled=true"
> >
> >
> >
> > Error :
> > Caused by: java.io.IOException: No space left on device
> > at java.io.FileOutputStream.writeBytes(Native Method)
> > at java.io.FileOutputStream.write(FileOutputStream.java:326)
> > at java.io.BufferedOutputStream.flushBuffer(
> BufferedOutputStream.java:82)
> > at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> > at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(
> IndexShuffleBlockResolver.scala:151)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(
> IndexShuffleBlockResolver.scala:149)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(
> IndexShuffleBlockResolver.scala:149)
> > at
> > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.
> scala:33)
> > at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.
> scala:149)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$
> writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> > at
> > org.apache.spark.shuffle.IndexShuffleBlockResolver.
> writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
> > at
> > org.apache.spark.shuffle.sort.SortShuffleWriter.write(
> SortShuffleWriter.scala:73)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:96)
> > at
> > org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:53)
> > at org.apache.spark.scheduler.Task.run(Task.scala:109)
> > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> > at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> >
> >
> > What I am trying to do (High Level):
> >
> > I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21,
> > Child22 ) which are related in a hierarchical fashion as shown below.
> >
> > Parent-> Child1 -> Child2  -> Child21
> >
> > Parent-> Child1 -> Child2  -> Child22
> >
> > Each element in the tree has 14 columns (elementid, parentelement_id,
> cat1,
> > cat2, num1, num2,., num10)
> >
> > I am trying to aggregate the values of one column of Child21 into Child1
> > (i.e. 2 levels up). I am doing the same for another column value of
> Child22
> > into Child1. Then I am merging these aggregated values at the same Child1
> > level.
> >
> > This is present in the code at location :
> >
> > spark.rddexample.dummyrdd.tree.child1.events.Function1
> >
> >
> > Code which replicates the issue:
> >
> > 1] https://github.com/dineshdharme/SparkRddShuffleIssue
> >
> >
> >
> > Steps to reproduce the issue :
> >
> > 1] Clone the above repository.
> >
> > 2] Put the csvs in the "issue-data" folder in the above repository at a
> > hadoop location "hdfs:///tr

Re: Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

2018-07-27 Thread Vadim Semenov
`spark.worker.cleanup.enabled=true` doesn't work for YARN.
On Fri, Jul 27, 2018 at 8:52 AM dineshdharme  wrote:
>
> I am trying to do few (union + reduceByKey) operations on a hiearchical
> dataset in a iterative fashion in rdd. The first few loops run fine but on
> the subsequent loops, the operations ends up using the whole scratch space
> provided to it.
>
> I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one
> having 100 GB space.
> The heirarchical dataset, whose size is (< 400kB), remains constant
> throughout the iterations.
> I have tried the worker cleanup flag but it has no effect i.e.
> "spark.worker.cleanup.enabled=true"
>
>
>
> Error :
> Caused by: java.io.IOException: No space left on device
> at java.io.FileOutputStream.writeBytes(Native Method)
> at java.io.FileOutputStream.write(FileOutputStream.java:326)
> at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
> at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
> at
> org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
> What I am trying to do (High Level):
>
> I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21,
> Child22 ) which are related in a hierarchical fashion as shown below.
>
> Parent-> Child1 -> Child2  -> Child21
>
> Parent-> Child1 -> Child2  -> Child22
>
> Each element in the tree has 14 columns (elementid, parentelement_id, cat1,
> cat2, num1, num2,., num10)
>
> I am trying to aggregate the values of one column of Child21 into Child1
> (i.e. 2 levels up). I am doing the same for another column value of Child22
> into Child1. Then I am merging these aggregated values at the same Child1
> level.
>
> This is present in the code at location :
>
> spark.rddexample.dummyrdd.tree.child1.events.Function1
>
>
> Code which replicates the issue:
>
> 1] https://github.com/dineshdharme/SparkRddShuffleIssue
>
>
>
> Steps to reproduce the issue :
>
> 1] Clone the above repository.
>
> 2] Put the csvs in the "issue-data" folder in the above repository at a
> hadoop location "hdfs:///tree/dummy/data/"
>
> 3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has
> large space. (> 100 GB)
>
> 4] Run "sbt assembly"
>
> 5] Run the following command at the project location
>
> /path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
> --class spark.rddexample.dummyrdd.FunctionExecutor \
> --master local[2] \
> --deploy-mode client \
> --executor-memory 2G \
> --driver-memory 2G \
> target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
> 20 \
> hdfs:///tree/dummy/data/ \
> hdfs:///tree/dummy/results/
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Iterative rdd union + reduceByKey operations on small dataset leads to "No space left on device" error on account of lot of shuffle spill.

2018-07-27 Thread dineshdharme
I am trying to do few (union + reduceByKey) operations on a hiearchical
dataset in a iterative fashion in rdd. The first few loops run fine but on
the subsequent loops, the operations ends up using the whole scratch space
provided to it. 

I have set the spark scratch directory, i.e. SPARK_LOCAL_DIRS , to be one
having 100 GB space.
The heirarchical dataset, whose size is (< 400kB), remains constant
throughout the iterations.
I have tried the worker cleanup flag but it has no effect i.e.
"spark.worker.cleanup.enabled=true"

 

Error : 
Caused by: java.io.IOException: No space left on device
at java.io.FileOutputStream.writeBytes(Native Method)
at java.io.FileOutputStream.write(FileOutputStream.java:326)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.writeLong(DataOutputStream.java:224)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply$mcVJ$sp(IndexShuffleBlockResolver.scala:151)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1$$anonfun$apply$mcV$sp$1.apply(IndexShuffleBlockResolver.scala:149)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:246)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply$mcV$sp(IndexShuffleBlockResolver.scala:149)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver$$anonfun$writeIndexFileAndCommit$1.apply(IndexShuffleBlockResolver.scala:145)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at
org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:153)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 

What I am trying to do (High Level):

I have a dataset of 5 different csv ( Parent, Child1, Child2, Child21,
Child22 ) which are related in a hierarchical fashion as shown below. 

Parent-> Child1 -> Child2  -> Child21 

Parent-> Child1 -> Child2  -> Child22 

Each element in the tree has 14 columns (elementid, parentelement_id, cat1,
cat2, num1, num2,., num10)

I am trying to aggregate the values of one column of Child21 into Child1
(i.e. 2 levels up). I am doing the same for another column value of Child22
into Child1. Then I am merging these aggregated values at the same Child1
level.

This is present in the code at location : 

spark.rddexample.dummyrdd.tree.child1.events.Function1
 

Code which replicates the issue: 

1] https://github.com/dineshdharme/SparkRddShuffleIssue

 

Steps to reproduce the issue : 

1] Clone the above repository.

2] Put the csvs in the "issue-data" folder in the above repository at a
hadoop location "hdfs:///tree/dummy/data/"

3] Set the spark scratch directory (SPARK_LOCAL_DIRS) to a folder which has
large space. (> 100 GB)

4] Run "sbt assembly"

5] Run the following command at the project location 

/path/to/spark-2.3.0-bin-hadoop2.7/bin/spark-submit \
--class spark.rddexample.dummyrdd.FunctionExecutor \
--master local[2] \
--deploy-mode client \
--executor-memory 2G \
--driver-memory 2G \
target/scala-2.11/rdd-shuffle-assembly-0.1.0.jar \
20 \
hdfs:///tree/dummy/data/ \
hdfs:///tree/dummy/results/   



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Structured Stream equivalent of reduceByKey

2017-11-06 Thread Michael Armbrust
Hmmm, I see.  You could output the delta using flatMapGroupsWithState
<https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/sql/KeyValueGroupedDataset.html#flatMapGroupsWithState-org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction-org.apache.spark.sql.streaming.OutputMode-org.apache.spark.sql.Encoder-org.apache.spark.sql.Encoder-org.apache.spark.sql.streaming.GroupStateTimeout->
probably.

On Thu, Oct 26, 2017 at 10:11 PM, Piyush Mukati <piyush.muk...@gmail.com>
wrote:

> Thanks, Michael
> I have explored Aggregator
> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>
>  with
> update mode. The problem is it will give the overall aggregated value for
> the changed. while I only want the delta change in the group as the
> aggregation we are doing at sink level too.
>
> Below is the plan generated with count Aggregator.
>
> *HashAggregate
> StateStoreSave
> *HashAggregate,
> StateStoreRestore
> *HashAggregate,
> Exchange
> *HashAggregate,
> *Project
> StreamingRelation
>
> we are looking for some aggregation which will avoid state
> store interactions.
>
> Also anyone aware of any design doc or some example about how we can add
> new operation on dataSet and corresponding physical plan.
>
>
>
> On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> - dev
>>
>> I think you should be able to write an Aggregator
>> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>.
>> You probably want to run in update mode if you are looking for it to output
>> any group that has changed in the batch.
>>
>> On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <piyush.muk...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> we are migrating some jobs from Dstream to Structured Stream.
>>>
>>> Currently to handle aggregations we call map and reducebyKey on each RDD
>>> like
>>> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>>>
>>> The final output of each RDD is merged to the sink with support for
>>> aggregation at the sink( Like co-processor at HBase ).
>>>
>>> In the new DataSet API, I am not finding any suitable API to aggregate
>>> over the micro-batch.
>>> Most of the aggregation API uses state-store and provide global
>>> aggregations. ( with append mode it does not give the change in existing
>>> buckets )
>>> Problems we are suspecting are :
>>>  1) state-store is tightly linked to the job definitions. while in our
>>> case we want may edit the job while keeping the older calculated aggregate
>>> as it is.
>>>
>>> The desired result can be achieved with below dataset APIs.
>>> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) =>
>>> merge(valueItr))
>>> while on observing the physical plan it does not call any merge before
>>> sort.
>>>
>>>  Anyone aware of API or other workarounds to get the desired result?
>>>
>>
>>
>


Re: Structured Stream equivalent of reduceByKey

2017-10-26 Thread Piyush Mukati
Thanks, Michael
I have explored Aggregator
<https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>
with
update mode. The problem is it will give the overall aggregated value for
the changed. while I only want the delta change in the group as the
aggregation we are doing at sink level too.

Below is the plan generated with count Aggregator.

*HashAggregate
StateStoreSave
*HashAggregate,
StateStoreRestore
*HashAggregate,
Exchange
*HashAggregate,
*Project
StreamingRelation

we are looking for some aggregation which will avoid state
store interactions.

Also anyone aware of any design doc or some example about how we can add
new operation on dataSet and corresponding physical plan.



On Thu, Oct 26, 2017 at 5:54 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> - dev
>
> I think you should be able to write an Aggregator
> <https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>.
> You probably want to run in update mode if you are looking for it to output
> any group that has changed in the batch.
>
> On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <piyush.muk...@gmail.com>
> wrote:
>
>> Hi,
>> we are migrating some jobs from Dstream to Structured Stream.
>>
>> Currently to handle aggregations we call map and reducebyKey on each RDD
>> like
>> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>>
>> The final output of each RDD is merged to the sink with support for
>> aggregation at the sink( Like co-processor at HBase ).
>>
>> In the new DataSet API, I am not finding any suitable API to aggregate
>> over the micro-batch.
>> Most of the aggregation API uses state-store and provide global
>> aggregations. ( with append mode it does not give the change in existing
>> buckets )
>> Problems we are suspecting are :
>>  1) state-store is tightly linked to the job definitions. while in our
>> case we want may edit the job while keeping the older calculated aggregate
>> as it is.
>>
>> The desired result can be achieved with below dataset APIs.
>> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) =>
>> merge(valueItr))
>> while on observing the physical plan it does not call any merge before
>> sort.
>>
>>  Anyone aware of API or other workarounds to get the desired result?
>>
>
>


Re: Structured Stream equivalent of reduceByKey

2017-10-26 Thread Michael Armbrust
- dev

I think you should be able to write an Aggregator
<https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.Aggregator>.
You probably want to run in update mode if you are looking for it to output
any group that has changed in the batch.

On Wed, Oct 25, 2017 at 5:52 PM, Piyush Mukati <piyush.muk...@gmail.com>
wrote:

> Hi,
> we are migrating some jobs from Dstream to Structured Stream.
>
> Currently to handle aggregations we call map and reducebyKey on each RDD
> like
> rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))
>
> The final output of each RDD is merged to the sink with support for
> aggregation at the sink( Like co-processor at HBase ).
>
> In the new DataSet API, I am not finding any suitable API to aggregate
> over the micro-batch.
> Most of the aggregation API uses state-store and provide global
> aggregations. ( with append mode it does not give the change in existing
> buckets )
> Problems we are suspecting are :
>  1) state-store is tightly linked to the job definitions. while in our
> case we want may edit the job while keeping the older calculated aggregate
> as it is.
>
> The desired result can be achieved with below dataset APIs.
> dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr))
> while on observing the physical plan it does not call any merge before
> sort.
>
>  Anyone aware of API or other workarounds to get the desired result?
>


Structured Stream equivalent of reduceByKey

2017-10-25 Thread Piyush Mukati
Hi,
we are migrating some jobs from Dstream to Structured Stream.

Currently to handle aggregations we call map and reducebyKey on each RDD
like
rdd.map(event => (event._1, event)).reduceByKey((a, b) => merge(a, b))

The final output of each RDD is merged to the sink with support for
aggregation at the sink( Like co-processor at HBase ).

In the new DataSet API, I am not finding any suitable API to aggregate over
the micro-batch.
Most of the aggregation API uses state-store and provide global
aggregations. ( with append mode it does not give the change in existing
buckets )
Problems we are suspecting are :
 1) state-store is tightly linked to the job definitions. while in our case
we want may edit the job while keeping the older calculated aggregate as it
is.

The desired result can be achieved with below dataset APIs.
dataset.groupByKey(a=>a._1).mapGroups( (key, valueItr) => merge(valueItr))
while on observing the physical plan it does not call any merge before sort.

 Anyone aware of API or other workarounds to get the desired result?


Re: reducebykey

2017-04-07 Thread Ankur Srivastava
Hi Stephen,

If you use aggregate functions or reduceGroup on KeyValueGroupedDataset it
behaves as reduceByKey on RDD.

Only if you use flatMapGroups and mapGroups  it behaves as groupByKey on
RDD and if you read the API documentation it warns of using the API.

Hope this helps.

Thanks
Ankur

On Fri, Apr 7, 2017 at 7:26 AM, Stephen Fletcher <stephen.fletc...@gmail.com
> wrote:

> Are there plans to add reduceByKey to dataframes, Since switching over to
> spark 2 I find myself increasing dissatisfied with the idea of converting
> dataframes to RDD to do procedural programming on grouped data(both from a
> ease of programming stance and performance stance). So I've been using
> Dataframe's experimental groupByKey and flatMapGroups which perform
> extremely well, I'm guessing because of the encoders, but the amount of
> data being transfers is a little excessive. Is there any plans to port
> reduceByKey ( and additionally a reduceByKeyleft and right)?
>


reducebykey

2017-04-07 Thread Stephen Fletcher
Are there plans to add reduceByKey to dataframes, Since switching over to
spark 2 I find myself increasing dissatisfied with the idea of converting
dataframes to RDD to do procedural programming on grouped data(both from a
ease of programming stance and performance stance). So I've been using
Dataframe's experimental groupByKey and flatMapGroups which perform
extremely well, I'm guessing because of the encoders, but the amount of
data being transfers is a little excessive. Is there any plans to port
reduceByKey ( and additionally a reduceByKeyleft and right)?


[Spark Core]: flatMap/reduceByKey seems to be quite slow with Long keys on some distributions

2017-04-01 Thread Richard Tsai
Hi all,

I'm using Spark to process some corpora and I need to count the occurrence
of each 2-gram. I started with counting tuples (wordID1, wordID2) and it
worked fine except the large memory usage and gc overhead due to the
substantial number of small tuple objects. Then I tried to pack a pair of
Int into a Long, and the gc overhead did reduce greatly, but the run time
also increased several times.

I ran some small experiments with random data on different distributions.
It seems that the performance issue only occurs on exponential distributed
data. The example code is attached.

The job is split into two stages, flatMap() and count(). When counting
Tuples, flatMap() takes about 6s and count() takes about 2s, while when
counting Longs, flatMap() takes 18s and count() takes 10s.

I haven't look into Spark's implementation of flatMap/reduceByKey, but I
guess Spark has some specializations for Long keys which happen to perform
not very well on some specific distributions. Does anyone have ideas about
this?

Best wishes,
Richard

// lines of word IDs
val data = (1 to 5000).par.map({ _ =>
  (1 to 1000) map { _ => (-1000 * Math.log(Random.nextDouble)).toInt }
}).seq

// count Tuples, fast
sc parallelize(data) flatMap { line =>
  val first = line.iterator
  val second = line.iterator.drop(1)
  for (pair <- first zip(second))
yield (pair, 1L)
} reduceByKey { _ + _ } count()

// count Long, slow
sc parallelize(data) flatMap { line =>
  val first = line.iterator
  val second = line.iterator.drop(1)
  for ((a, b) <- first zip(second))
yield ((a.toLong << 32) | b, 1L)
} reduceByKey { _ + _ } count()


groupByKey vs reduceByKey

2016-12-09 Thread Appu K
Hi,

Read somewhere that

groupByKey() in RDD disables map-side aggregation as the aggregation
function (appending to a list) does not save any space.


However from my understanding, using something like reduceByKey or
 (CombineByKey + a combiner function,) we could reduce the data shuffled
around.

Wondering why map-side aggregation is disabled for groupByKey() and why it
wouldn’t save space at the executor where data is received after the
shuffle.


cheers
Appu


Access broadcast variable from within function passed to reduceByKey

2016-11-15 Thread coolgar
For example:

rows.reduceByKey(reduceKeyMapFunction)

reduceKeyMapFunction(log1: Map[String, Long], log2: Map[String, Long]):
Map[String,Long] = {
val bcast = broadcastv.value
val countFields = dbh.getCountFields
val aggs: Map[String, Long] = Map()
countFields.foreach { f => 
  val valueSum = aggLog1(f) + aggLog2(f)
  aggs ++ Map(f -> valueSum)
}
aggs
}

I can't pass broadcast to the reduceKeyMapFunction. I create the broadcast
variable (broadcastv) in the driver but I fear it will not be initialized on
the workers where the reduceKeyMapFunction runs. I've tried this but when
accessing broadcastv a NPE is thrown.

I can't pass it to the reduceKeyMapFunction because it can only accept two
params (log1, log2). 
Any ideas?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Access-broadcast-variable-from-within-function-passed-to-reduceByKey-tp28082.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread dev loper
I haven't tried rdd.distinct. I thought since redcuceByKey itself is not
helping me even with a sliding window here ,so i thought rdd.distinct might
not  help . I will write a minimal code for reproducing the issue and share
it with you guys. One another point I want to bring in is that I am unable
to reproduce the issue  when I am running on my local box , but when I
deploy the code in yarn cluster  with 34 executors the problem is easily
reproduced . Similarly when I am using Spark. CreateStream with one
partition the issue is not reproduced and when I am using spark
DirectStream to consume kafka with 100 partitions the issue can be easily
reproduced. The duplicates are not happening on the same executor as per
log print, its happening on different executors . I don't know whether last
point helps.

On Sun, Nov 13, 2016 at 5:22 AM, ayan guha <guha.a...@gmail.com> wrote:

> Have you tried rdd.distinc?
>
> On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Can you come up with a minimal reproducible example?
>>
>> Probably unrelated, but why are you doing a union of 3 streams?
>>
>> On Sat, Nov 12, 2016 at 10:29 AM, dev loper <spark...@gmail.com> wrote:
>> > There are no failures or errors.  Irrespective of that I am seeing
>> > duplicates. The steps and stages are all successful and even the
>> speculation
>> > is turned off .
>> >
>> > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> >>
>> >> Are you certain you aren't getting any failed tasks or other errors?
>> >> Output actions like foreach aren't exactly once and will be retried on
>> >> failures.
>> >>
>> >>
>> >> On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote:
>> >>>
>> >>> Dear fellow Spark Users,
>> >>>
>> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>> >>> listens to Campaigns based on live stock feeds and the batch duration
>> is 5
>> >>> seconds. The applications uses Kafka DirectStream and based on the
>> feed
>> >>> source there are three streams. As given in the code snippet I am
>> doing a
>> >>> union of three streams and I am trying to remove the duplicate
>> campaigns
>> >>> received using reduceByKey based on the customer and campaignId. I
>> could see
>> >>> lot of duplicate email being send out for the same key in the same
>> batch.I
>> >>> was expecting reduceByKey to remove the duplicate campaigns in a
>> batch based
>> >>> on customer and campaignId. In logs I am even printing the the
>> key,batch
>> >>> time before sending the email and I could clearly see duplicates. I
>> could
>> >>> see some duplicates getting removed after adding log in reduceByKey
>> >>> Function, but its not eliminating completely .
>> >>>
>> >>> JavaDStream matchedCampaigns =
>> >>> stream1.transform(CmpManager::getMatchedCampaigns)
>> >>> .union(stream2).union(stream3).cache();
>> >>>
>> >>> JavaPairDStream<String, Campaign> uniqueCampaigns =
>> >>> matchedCampaigns.mapToPair(campaign->{
>> >>> String key=campaign.getCustomer()+"_"+campaign.getId();
>> >>> return new Tuple2<String, Campaigns>(key, campaign);
>> >>> })
>> >>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
>> >>>
>> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>> >>>
>> >>> I am not able to figure out where I am going wrong here . Please help
>> me
>> >>> here to get rid of this weird problem. Previously we were using
>> createStream
>> >>> for listening to Kafka Queue (number of partitions 1) , there we
>> didn't face
>> >>> this issue. But when we moved to directStream (number of partitions
>> 100) we
>> >>> could easily reproduce this issue on high load .
>> >>>
>> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
>> >>> instead of reduceByKey Operation, But even that didn't
>> >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1,
>> Durations.Seconds(5),
>> >>> Durations.Seconds(5))
>> >>>
>> >>> I have even requested for help on Stackoverflow , But I haven't
>> received
>> >>> any solutions to this issue.
>> >>>
>> >>> Stack Overflow Link
>> >>> 
>> >>>
>> >>> https://stackoverflow.com/questions/40559858/spark-streaming
>> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>> >>>
>> >>>
>> >>> Thanks and Regards
>> >>> Dev
>> >
>> >
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread ayan guha
Have you tried rdd.distinc?

On Sun, Nov 13, 2016 at 8:28 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Can you come up with a minimal reproducible example?
>
> Probably unrelated, but why are you doing a union of 3 streams?
>
> On Sat, Nov 12, 2016 at 10:29 AM, dev loper <spark...@gmail.com> wrote:
> > There are no failures or errors.  Irrespective of that I am seeing
> > duplicates. The steps and stages are all successful and even the
> speculation
> > is turned off .
> >
> > On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Are you certain you aren't getting any failed tasks or other errors?
> >> Output actions like foreach aren't exactly once and will be retried on
> >> failures.
> >>
> >>
> >> On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote:
> >>>
> >>> Dear fellow Spark Users,
> >>>
> >>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
> >>> listens to Campaigns based on live stock feeds and the batch duration
> is 5
> >>> seconds. The applications uses Kafka DirectStream and based on the feed
> >>> source there are three streams. As given in the code snippet I am
> doing a
> >>> union of three streams and I am trying to remove the duplicate
> campaigns
> >>> received using reduceByKey based on the customer and campaignId. I
> could see
> >>> lot of duplicate email being send out for the same key in the same
> batch.I
> >>> was expecting reduceByKey to remove the duplicate campaigns in a batch
> based
> >>> on customer and campaignId. In logs I am even printing the the
> key,batch
> >>> time before sending the email and I could clearly see duplicates. I
> could
> >>> see some duplicates getting removed after adding log in reduceByKey
> >>> Function, but its not eliminating completely .
> >>>
> >>> JavaDStream matchedCampaigns =
> >>> stream1.transform(CmpManager::getMatchedCampaigns)
> >>> .union(stream2).union(stream3).cache();
> >>>
> >>> JavaPairDStream<String, Campaign> uniqueCampaigns =
> >>> matchedCampaigns.mapToPair(campaign->{
> >>> String key=campaign.getCustomer()+"_"+campaign.getId();
> >>> return new Tuple2<String, Campaigns>(key, campaign);
> >>> })
> >>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
> >>>
> >>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
> >>>
> >>> I am not able to figure out where I am going wrong here . Please help
> me
> >>> here to get rid of this weird problem. Previously we were using
> createStream
> >>> for listening to Kafka Queue (number of partitions 1) , there we
> didn't face
> >>> this issue. But when we moved to directStream (number of partitions
> 100) we
> >>> could easily reproduce this issue on high load .
> >>>
> >>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
> >>> instead of reduceByKey Operation, But even that didn't
> >>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1,
> Durations.Seconds(5),
> >>> Durations.Seconds(5))
> >>>
> >>> I have even requested for help on Stackoverflow , But I haven't
> received
> >>> any solutions to this issue.
> >>>
> >>> Stack Overflow Link
> >>> 
> >>>
> >>> https://stackoverflow.com/questions/40559858/spark-
> streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
> >>>
> >>>
> >>> Thanks and Regards
> >>> Dev
> >
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Best Regards,
Ayan Guha


Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread Cody Koeninger
Can you come up with a minimal reproducible example?

Probably unrelated, but why are you doing a union of 3 streams?

On Sat, Nov 12, 2016 at 10:29 AM, dev loper <spark...@gmail.com> wrote:
> There are no failures or errors.  Irrespective of that I am seeing
> duplicates. The steps and stages are all successful and even the speculation
> is turned off .
>
> On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> Are you certain you aren't getting any failed tasks or other errors?
>> Output actions like foreach aren't exactly once and will be retried on
>> failures.
>>
>>
>> On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote:
>>>
>>> Dear fellow Spark Users,
>>>
>>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>>> listens to Campaigns based on live stock feeds and the batch duration is 5
>>> seconds. The applications uses Kafka DirectStream and based on the feed
>>> source there are three streams. As given in the code snippet I am doing a
>>> union of three streams and I am trying to remove the duplicate campaigns
>>> received using reduceByKey based on the customer and campaignId. I could see
>>> lot of duplicate email being send out for the same key in the same batch.I
>>> was expecting reduceByKey to remove the duplicate campaigns in a batch based
>>> on customer and campaignId. In logs I am even printing the the key,batch
>>> time before sending the email and I could clearly see duplicates. I could
>>> see some duplicates getting removed after adding log in reduceByKey
>>> Function, but its not eliminating completely .
>>>
>>> JavaDStream matchedCampaigns =
>>> stream1.transform(CmpManager::getMatchedCampaigns)
>>> .union(stream2).union(stream3).cache();
>>>
>>> JavaPairDStream<String, Campaign> uniqueCampaigns =
>>> matchedCampaigns.mapToPair(campaign->{
>>> String key=campaign.getCustomer()+"_"+campaign.getId();
>>> return new Tuple2<String, Campaigns>(key, campaign);
>>> })
>>> .reduceByKey((campaign1, campaign2)->{return campaign1;});
>>>
>>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>>>
>>> I am not able to figure out where I am going wrong here . Please help me
>>> here to get rid of this weird problem. Previously we were using createStream
>>> for listening to Kafka Queue (number of partitions 1) , there we didn't face
>>> this issue. But when we moved to directStream (number of partitions 100) we
>>> could easily reproduce this issue on high load .
>>>
>>> Note: I even tried reduceByKeyAndWindow with duration of 5 seconds
>>> instead of reduceByKey Operation, But even that didn't
>>> help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
>>> Durations.Seconds(5))
>>>
>>> I have even requested for help on Stackoverflow , But I haven't received
>>> any solutions to this issue.
>>>
>>> Stack Overflow Link
>>> 
>>>
>>> https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>>>
>>>
>>> Thanks and Regards
>>> Dev
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread dev loper
There are no failures or errors.  Irrespective of that I am seeing
duplicates. The steps and stages are all successful and even the
speculation is turned off .

On Sat, Nov 12, 2016 at 9:55 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Are you certain you aren't getting any failed tasks or other errors?
> Output actions like foreach aren't exactly once and will be retried on
> failures.
>
> On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote:
>
>> Dear fellow Spark Users,
>>
>> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
>> listens to Campaigns based on live stock feeds and the batch duration is 5
>> seconds. The applications uses Kafka DirectStream and based on the feed
>> source there are three streams. As given in the code snippet I am doing a
>> union of three streams and I am trying to remove the duplicate campaigns
>> received using reduceByKey based on the customer and campaignId. I could
>> see lot of duplicate email being send out for the same key in the same
>> batch.I was expecting reduceByKey to remove the duplicate campaigns in a
>> batch based on customer and campaignId. In logs I am even printing the the
>> key,batch time before sending the email and I could clearly see duplicates.
>> I could see some duplicates getting removed after adding log in reduceByKey
>> Function, but its not eliminating completely .
>>
>> JavaDStream matchedCampaigns = 
>> stream1.transform(CmpManager::getMatchedCampaigns)
>> .union(stream2).union(stream3).cache();
>> JavaPairDStream<String, Campaign> uniqueCampaigns = 
>> matchedCampaigns.mapToPair(campaign->{
>> String key=campaign.getCustomer()+"_"+campaign.getId();
>> return new Tuple2<String, Campaigns>(key, campaign);
>> }).reduceByKey((campaign1, campaign2)->{return campaign1;});
>>
>> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>>
>> I am not able to figure out where I am going wrong here . Please help me
>> here to get rid of this weird problem. Previously we were using
>> createStream for listening to Kafka Queue (number of partitions 1) , there
>> we didn't face this issue. But when we moved to directStream (number of
>> partitions 100) we could easily reproduce this issue on high load .
>>
>> *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds
>> instead of reduceByKey Operation, But even that didn't help.
>> uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
>> Durations.Seconds(5))
>> I have even requested for help on Stackoverflow , But I haven't received
>> any solutions to this issue.
>>
>>
>> *Stack Overflow Link*
>> https://stackoverflow.com/questions/40559858/spark-streaming
>> -reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>>
>>
>> Thanks and Regards
>> Dev
>>
>


Re: Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread Cody Koeninger
Are you certain you aren't getting any failed tasks or other errors?
Output actions like foreach aren't exactly once and will be retried on
failures.

On Nov 12, 2016 06:36, "dev loper" <spark...@gmail.com> wrote:

> Dear fellow Spark Users,
>
> My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
> listens to Campaigns based on live stock feeds and the batch duration is 5
> seconds. The applications uses Kafka DirectStream and based on the feed
> source there are three streams. As given in the code snippet I am doing a
> union of three streams and I am trying to remove the duplicate campaigns
> received using reduceByKey based on the customer and campaignId. I could
> see lot of duplicate email being send out for the same key in the same
> batch.I was expecting reduceByKey to remove the duplicate campaigns in a
> batch based on customer and campaignId. In logs I am even printing the the
> key,batch time before sending the email and I could clearly see duplicates.
> I could see some duplicates getting removed after adding log in reduceByKey
> Function, but its not eliminating completely .
>
> JavaDStream matchedCampaigns = 
> stream1.transform(CmpManager::getMatchedCampaigns)
> .union(stream2).union(stream3).cache();
> JavaPairDStream<String, Campaign> uniqueCampaigns = 
> matchedCampaigns.mapToPair(campaign->{
> String key=campaign.getCustomer()+"_"+campaign.getId();
> return new Tuple2<String, Campaigns>(key, campaign);
> }).reduceByKey((campaign1, campaign2)->{return campaign1;});
>
> uniqueCampaigns.foreachRDD(CmpManager::sendEmail);
>
> I am not able to figure out where I am going wrong here . Please help me
> here to get rid of this weird problem. Previously we were using
> createStream for listening to Kafka Queue (number of partitions 1) , there
> we didn't face this issue. But when we moved to directStream (number of
> partitions 100) we could easily reproduce this issue on high load .
>
> *Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds
> instead of reduceByKey Operation, But even that didn't help.
> uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1, Durations.Seconds(5),
> Durations.Seconds(5))
> I have even requested for help on Stackoverflow , But I haven't received
> any solutions to this issue.
>
>
> *Stack Overflow Link*
> https://stackoverflow.com/questions/40559858/spark-
> streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch
>
>
> Thanks and Regards
> Dev
>


Spark Streaming- ReduceByKey not removing Duplicates for the same key in a Batch

2016-11-12 Thread dev loper
Dear fellow Spark Users,

My Spark Streaming application (Spark 2.0 , on AWS EMR yarn cluster)
listens to Campaigns based on live stock feeds and the batch duration is 5
seconds. The applications uses Kafka DirectStream and based on the feed
source there are three streams. As given in the code snippet I am doing a
union of three streams and I am trying to remove the duplicate campaigns
received using reduceByKey based on the customer and campaignId. I could
see lot of duplicate email being send out for the same key in the same
batch.I was expecting reduceByKey to remove the duplicate campaigns in a
batch based on customer and campaignId. In logs I am even printing the the
key,batch time before sending the email and I could clearly see duplicates.
I could see some duplicates getting removed after adding log in reduceByKey
Function, but its not eliminating completely .

JavaDStream matchedCampaigns =
stream1.transform(CmpManager::getMatchedCampaigns)
.union(stream2).union(stream3).cache();
JavaPairDStream<String, Campaign> uniqueCampaigns =
matchedCampaigns.mapToPair(campaign->{
String key=campaign.getCustomer()+"_"+campaign.getId();
return new Tuple2<String, Campaigns>(key, campaign);
}).reduceByKey((campaign1, campaign2)->{return campaign1;});

uniqueCampaigns.foreachRDD(CmpManager::sendEmail);

I am not able to figure out where I am going wrong here . Please help me
here to get rid of this weird problem. Previously we were using
createStream for listening to Kafka Queue (number of partitions 1) , there
we didn't face this issue. But when we moved to directStream (number of
partitions 100) we could easily reproduce this issue on high load .

*Note:* I even tried reduceByKeyAndWindow with duration of 5 seconds
instead of reduceByKey Operation, But even that didn't
help.uniqueCampaigns.reduceByKeyAndWindow((c1,c2)=>c1,
Durations.Seconds(5), Durations.Seconds(5))
I have even requested for help on Stackoverflow , But I haven't received
any solutions to this issue.


*Stack Overflow Link*
https://stackoverflow.com/questions/40559858/spark-streaming-reducebykey-not-removing-duplicates-for-the-same-key-in-a-batch


Thanks and Regards
Dev


Re: Is there a reduceByKey functionality in DataFrame API?

2016-08-29 Thread Marius Soutier
Not really, a grouped DataFrame only provides SQL-like functions like sum and 
avg (at least in 1.5).

> On 29.08.2016, at 14:56, ayan guha <guha.a...@gmail.com> wrote:
> 
> If you are confused because of  the name of two APIs. I think DF API name 
> groupBy came from SQL, but it works similarly as reducebykey.
> 
> On 29 Aug 2016 20:57, "Marius Soutier" <mps@gmail.com 
> <mailto:mps@gmail.com>> wrote:
> In DataFrames (and thus in 1.5 in general) this is not possible, correct?
> 
>> On 11.08.2016, at 05:42, Holden Karau <hol...@pigscanfly.ca 
>> <mailto:hol...@pigscanfly.ca>> wrote:
>> 
>> Hi Luis,
>> 
>> You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you 
>> can do groupBy followed by a reduce on the GroupedDataset ( 
>> http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset
>>  
>> <http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset>
>>  ) - this works on a per-key basis despite the different name. In Spark 2.0 
>> you would use groupByKey on the Dataset followed by reduceGroups ( 
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset
>>  
>> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset>
>>  ).
>> 
>> Cheers,
>> 
>> Holden :)
>> 
>> On Wed, Aug 10, 2016 at 5:15 PM, luismattor <luismat...@gmail.com 
>> <mailto:luismat...@gmail.com>> wrote:
>> Hi everyone,
>> 
>> Consider the following code:
>> 
>> val result = df.groupBy("col1").agg(min("col2"))
>> 
>> I know that rdd.reduceByKey(func) produces the same RDD as
>> rdd.groupByKey().mapValues(value => value.reduce(func)) However reducerByKey
>> is more efficient as it avoids shipping each value to the reducer doing the
>> aggregation (it ships partial aggregations instead).
>> 
>> I wonder whether the DataFrame API optimizes the code doing something
>> similar to what RDD.reduceByKey does.
>> 
>> I am using Spark 1.6.2.
>> 
>> Regards,
>> Luis
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html
>>  
>> <http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html>
>> Sent from the Apache Spark User List mailing list archive at Nabble.com 
>> <http://nabble.com/>.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> <mailto:user-unsubscr...@spark.apache.org>
>> 
>> 
>> 
>> 
>> -- 
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>



Re: Is there a reduceByKey functionality in DataFrame API?

2016-08-29 Thread ayan guha
If you are confused because of  the name of two APIs. I think DF API name
groupBy came from SQL, but it works similarly as reducebykey.
On 29 Aug 2016 20:57, "Marius Soutier" <mps@gmail.com> wrote:

> In DataFrames (and thus in 1.5 in general) this is not possible, correct?
>
> On 11.08.2016, at 05:42, Holden Karau <hol...@pigscanfly.ca> wrote:
>
> Hi Luis,
>
> You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you
> can do groupBy followed by a reduce on the GroupedDataset (
> http://spark.apache.org/docs/1.6.2/api/scala/index.
> html#org.apache.spark.sql.GroupedDataset ) - this works on a per-key
> basis despite the different name. In Spark 2.0 you would use groupByKey on
> the Dataset followed by reduceGroups ( http://spark.apache.org/
> docs/latest/api/scala/index.html#org.apache.spark.sql.
> KeyValueGroupedDataset ).
>
> Cheers,
>
> Holden :)
>
> On Wed, Aug 10, 2016 at 5:15 PM, luismattor <luismat...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> Consider the following code:
>>
>> val result = df.groupBy("col1").agg(min("col2"))
>>
>> I know that rdd.reduceByKey(func) produces the same RDD as
>> rdd.groupByKey().mapValues(value => value.reduce(func)) However
>> reducerByKey
>> is more efficient as it avoids shipping each value to the reducer doing
>> the
>> aggregation (it ships partial aggregations instead).
>>
>> I wonder whether the DataFrame API optimizes the code doing something
>> similar to what RDD.reduceByKey does.
>>
>> I am using Spark 1.6.2.
>>
>> Regards,
>> Luis
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-
>> in-DataFrame-API-tp27508.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> <http://nabble.com>.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>
>


Re: Is there a reduceByKey functionality in DataFrame API?

2016-08-29 Thread Marius Soutier
In DataFrames (and thus in 1.5 in general) this is not possible, correct?

> On 11.08.2016, at 05:42, Holden Karau <hol...@pigscanfly.ca> wrote:
> 
> Hi Luis,
> 
> You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you 
> can do groupBy followed by a reduce on the GroupedDataset ( 
> http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset
>  
> <http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset>
>  ) - this works on a per-key basis despite the different name. In Spark 2.0 
> you would use groupByKey on the Dataset followed by reduceGroups ( 
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset
>  
> <http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset>
>  ).
> 
> Cheers,
> 
> Holden :)
> 
> On Wed, Aug 10, 2016 at 5:15 PM, luismattor <luismat...@gmail.com 
> <mailto:luismat...@gmail.com>> wrote:
> Hi everyone,
> 
> Consider the following code:
> 
> val result = df.groupBy("col1").agg(min("col2"))
> 
> I know that rdd.reduceByKey(func) produces the same RDD as
> rdd.groupByKey().mapValues(value => value.reduce(func)) However reducerByKey
> is more efficient as it avoids shipping each value to the reducer doing the
> aggregation (it ships partial aggregations instead).
> 
> I wonder whether the DataFrame API optimizes the code doing something
> similar to what RDD.reduceByKey does.
> 
> I am using Spark 1.6.2.
> 
> Regards,
> Luis
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html
>  
> <http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> <mailto:user-unsubscr...@spark.apache.org>
> 
> 
> 
> 
> -- 
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau <https://twitter.com/holdenkarau>


Re: Is there a reduceByKey functionality in DataFrame API?

2016-08-10 Thread Holden Karau
Hi Luis,

You might want to consider upgrading to Spark 2.0 - but in Spark 1.6.2 you
can do groupBy followed by a reduce on the GroupedDataset (
http://spark.apache.org/docs/1.6.2/api/scala/index.html#org.apache.spark.sql.GroupedDataset
) - this works on a per-key basis despite the different name. In Spark 2.0
you would use groupByKey on the Dataset followed by reduceGroups (
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.KeyValueGroupedDataset
).

Cheers,

Holden :)

On Wed, Aug 10, 2016 at 5:15 PM, luismattor <luismat...@gmail.com> wrote:

> Hi everyone,
>
> Consider the following code:
>
> val result = df.groupBy("col1").agg(min("col2"))
>
> I know that rdd.reduceByKey(func) produces the same RDD as
> rdd.groupByKey().mapValues(value => value.reduce(func)) However
> reducerByKey
> is more efficient as it avoids shipping each value to the reducer doing the
> aggregation (it ships partial aggregations instead).
>
> I wonder whether the DataFrame API optimizes the code doing something
> similar to what RDD.reduceByKey does.
>
> I am using Spark 1.6.2.
>
> Regards,
> Luis
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-
> API-tp27508.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Is there a reduceByKey functionality in DataFrame API?

2016-08-10 Thread luismattor
Hi everyone,

Consider the following code:

val result = df.groupBy("col1").agg(min("col2"))

I know that rdd.reduceByKey(func) produces the same RDD as
rdd.groupByKey().mapValues(value => value.reduce(func)) However reducerByKey
is more efficient as it avoids shipping each value to the reducer doing the
aggregation (it ships partial aggregations instead).

I wonder whether the DataFrame API optimizes the code doing something
similar to what RDD.reduceByKey does. 

I am using Spark 1.6.2. 

Regards,
Luis



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-a-reduceByKey-functionality-in-DataFrame-API-tp27508.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-23 Thread Nirav Patel
SO it was indeed my merge function. I created new result object for every
merge and its working now.

Thanks

On Wed, Jun 22, 2016 at 3:46 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> PS. In my reduceByKey operation I have two mutable object. What I do is
> merge mutable2 into mutable1 and return mutable1. I read that it works for
> aggregateByKey so thought it will work for reduceByKey as well. I might be
> wrong here. Can someone verify if this will work or be un predictable?
>
> On Wed, Jun 22, 2016 at 11:52 AM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> Hi,
>>
>> I do not see any indication of errors or executor getting killed in spark
>> UI - jobs, stages, event timelines. No task failures. I also don't see any
>> errors in executor logs.
>>
>> Thanks
>>
>> On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> For the run which returned incorrect result, did you observe any error
>>> (on workers) ?
>>>
>>> Cheers
>>>
>>> On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com>
>>> wrote:
>>>
>>>> I have an RDD[String, MyObj] which is a result of Join + Map operation.
>>>> It has no partitioner info. I run reduceByKey without passing any
>>>> Partitioner or partition counts.  I observed that output aggregation result
>>>> for given key is incorrect sometime. like 1 out of 5 times. It looks like
>>>> reduce operation is joining values from two different keys. There is no
>>>> configuration change between multiple runs. I am scratching my head over
>>>> this. I verified results by printing out RDD before and after reduce
>>>> operation; collecting subset at driver.
>>>>
>>>> Besides shuffle and storage memory fraction I use following options:
>>>>
>>>> sparkConf.set("spark.driver.userClassPathFirst","true")
>>>> sparkConf.set("spark.unsafe.offHeap","true")
>>>> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
>>>> sparkConf.set("spark.serializer",
>>>> "org.apache.spark.serializer.KryoSerializer")
>>>>
>>>>
>>>>
>>>> [image: What's New with Xactly]
>>>> <http://www.xactlycorp.com/email-click/>
>>>>
>>>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>>>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>>>> <https://twitter.com/Xactly>  [image: Facebook]
>>>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>>>> <http://www.youtube.com/xactlycorporation>
>>>
>>>
>>>
>>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Nirav Patel
PS. In my reduceByKey operation I have two mutable object. What I do is
merge mutable2 into mutable1 and return mutable1. I read that it works for
aggregateByKey so thought it will work for reduceByKey as well. I might be
wrong here. Can someone verify if this will work or be un predictable?

On Wed, Jun 22, 2016 at 11:52 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> Hi,
>
> I do not see any indication of errors or executor getting killed in spark
> UI - jobs, stages, event timelines. No task failures. I also don't see any
> errors in executor logs.
>
> Thanks
>
> On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> For the run which returned incorrect result, did you observe any error
>> (on workers) ?
>>
>> Cheers
>>
>> On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com>
>> wrote:
>>
>>> I have an RDD[String, MyObj] which is a result of Join + Map operation.
>>> It has no partitioner info. I run reduceByKey without passing any
>>> Partitioner or partition counts.  I observed that output aggregation result
>>> for given key is incorrect sometime. like 1 out of 5 times. It looks like
>>> reduce operation is joining values from two different keys. There is no
>>> configuration change between multiple runs. I am scratching my head over
>>> this. I verified results by printing out RDD before and after reduce
>>> operation; collecting subset at driver.
>>>
>>> Besides shuffle and storage memory fraction I use following options:
>>>
>>> sparkConf.set("spark.driver.userClassPathFirst","true")
>>> sparkConf.set("spark.unsafe.offHeap","true")
>>> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
>>> sparkConf.set("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>>
>>>
>>>
>>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>>
>>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>>> <https://twitter.com/Xactly>  [image: Facebook]
>>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>>> <http://www.youtube.com/xactlycorporation>
>>
>>
>>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Nirav Patel
Hi,

I do not see any indication of errors or executor getting killed in spark
UI - jobs, stages, event timelines. No task failures. I also don't see any
errors in executor logs.

Thanks

On Wed, Jun 22, 2016 at 2:32 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> For the run which returned incorrect result, did you observe any error (on
> workers) ?
>
> Cheers
>
> On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com>
> wrote:
>
>> I have an RDD[String, MyObj] which is a result of Join + Map operation.
>> It has no partitioner info. I run reduceByKey without passing any
>> Partitioner or partition counts.  I observed that output aggregation result
>> for given key is incorrect sometime. like 1 out of 5 times. It looks like
>> reduce operation is joining values from two different keys. There is no
>> configuration change between multiple runs. I am scratching my head over
>> this. I verified results by printing out RDD before and after reduce
>> operation; collecting subset at driver.
>>
>> Besides shuffle and storage memory fraction I use following options:
>>
>> sparkConf.set("spark.driver.userClassPathFirst","true")
>> sparkConf.set("spark.unsafe.offHeap","true")
>> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
>> sparkConf.set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>
>>
>>
>> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>>
>> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
>> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
>> <https://twitter.com/Xactly>  [image: Facebook]
>> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
>> <http://www.youtube.com/xactlycorporation>
>
>
>

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Ted Yu
For the run which returned incorrect result, did you observe any error (on
workers) ?

Cheers

On Tue, Jun 21, 2016 at 10:42 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> I have an RDD[String, MyObj] which is a result of Join + Map operation. It
> has no partitioner info. I run reduceByKey without passing any Partitioner
> or partition counts.  I observed that output aggregation result for given
> key is incorrect sometime. like 1 out of 5 times. It looks like reduce
> operation is joining values from two different keys. There is no
> configuration change between multiple runs. I am scratching my head over
> this. I verified results by printing out RDD before and after reduce
> operation; collecting subset at driver.
>
> Besides shuffle and storage memory fraction I use following options:
>
> sparkConf.set("spark.driver.userClassPathFirst","true")
> sparkConf.set("spark.unsafe.offHeap","true")
> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>


Re: Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-22 Thread Takeshi Yamamuro
Hi,

Could you check the issue also occurs in v1.6.1 and v2.0?

// maropu

On Wed, Jun 22, 2016 at 2:42 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> I have an RDD[String, MyObj] which is a result of Join + Map operation. It
> has no partitioner info. I run reduceByKey without passing any Partitioner
> or partition counts.  I observed that output aggregation result for given
> key is incorrect sometime. like 1 out of 5 times. It looks like reduce
> operation is joining values from two different keys. There is no
> configuration change between multiple runs. I am scratching my head over
> this. I verified results by printing out RDD before and after reduce
> operation; collecting subset at driver.
>
> Besides shuffle and storage memory fraction I use following options:
>
> sparkConf.set("spark.driver.userClassPathFirst","true")
> sparkConf.set("spark.unsafe.offHeap","true")
> sparkConf.set("spark.reducer.maxSizeInFlight","128m")
> sparkConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>




-- 
---
Takeshi Yamamuro


Spark 1.5.2 - Different results from reduceByKey over multiple iterations

2016-06-21 Thread Nirav Patel
I have an RDD[String, MyObj] which is a result of Join + Map operation. It
has no partitioner info. I run reduceByKey without passing any Partitioner
or partition counts.  I observed that output aggregation result for given
key is incorrect sometime. like 1 out of 5 times. It looks like reduce
operation is joining values from two different keys. There is no
configuration change between multiple runs. I am scratching my head over
this. I verified results by printing out RDD before and after reduce
operation; collecting subset at driver.

Besides shuffle and storage memory fraction I use following options:

sparkConf.set("spark.driver.userClassPathFirst","true")
sparkConf.set("spark.unsafe.offHeap","true")
sparkConf.set("spark.reducer.maxSizeInFlight","128m")
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")

-- 


[image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>

<https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn] 
<https://www.linkedin.com/company/xactly-corporation>  [image: Twitter] 
<https://twitter.com/Xactly>  [image: Facebook] 
<https://www.facebook.com/XactlyCorp>  [image: YouTube] 
<http://www.youtube.com/xactlycorporation>


Re: Dataset - reduceByKey

2016-06-07 Thread Jacek Laskowski
Hi Bryan,

What about groupBy [1] and agg [2]? What about UserDefinedAggregateFunction [3]?

[1] 
https://home.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset@groupBy(col1:String,cols:String*):org.apache.spark.sql.RelationalGroupedDataset
[2] 
https://home.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/scala/index.html#org.apache.spark.sql.RelationalGroupedDataset
[3] 
https://home.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/api/scala/index.html#org.apache.spark.sql.expressions.UserDefinedAggregateFunction

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Tue, Jun 7, 2016 at 8:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
> Hello.
>
> I am looking at the option of moving RDD based operations to Dataset based
> operations.  We are calling 'reduceByKey' on some pair RDDs we have.  What
> would the equivalent be in the Dataset interface - I do not see a simple
> reduceByKey replacement.
>
> Regards,
>
> Bryan Jeffrey
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dataset - reduceByKey

2016-06-07 Thread Bryan Jeffrey
All,

Thank you for the replies.  It seems as though the Dataset API is still far
behind the RDD API.  This is unfortunate as the Dataset API potentially
provides a number of performance benefits.  I will move to using it in a
more limited set of cases for the moment.

Thank you!

Bryan Jeffrey

On Tue, Jun 7, 2016 at 2:50 PM, Richard Marscher <rmarsc...@localytics.com>
wrote:

> There certainly are some gaps between the richness of the RDD API and the
> Dataset API. I'm also migrating from RDD to Dataset and ran into
> reduceByKey and join scenarios.
>
> In the spark-dev list, one person was discussing reduceByKey being
> sub-optimal at the moment and it spawned this JIRA
> https://issues.apache.org/jira/browse/SPARK-15598. But you might be able
> to get by with groupBy().reduce() for now, check performance though.
>
> As for join, the approach would be using the joinWith function on Dataset.
> Although the API isn't as sugary as it was for RDD IMO, something which
> I've been discussing in a separate thread as well. I can't find a weblink
> for it but the thread subject is "Dataset Outer Join vs RDD Outer Join".
>
> On Tue, Jun 7, 2016 at 2:40 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
>> It would also be nice if there was a better example of joining two
>> Datasets. I am looking at the documentation here:
>> http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems
>> a little bit sparse - is there a better documentation source?
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>> On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
>> wrote:
>>
>>> Hello.
>>>
>>> I am looking at the option of moving RDD based operations to Dataset
>>> based operations.  We are calling 'reduceByKey' on some pair RDDs we have.
>>> What would the equivalent be in the Dataset interface - I do not see a
>>> simple reduceByKey replacement.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>>
>>
>
>
> --
> *Richard Marscher*
> Senior Software Engineer
> Localytics
> Localytics.com <http://localytics.com/> | Our Blog
> <http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
> Facebook <http://facebook.com/localytics> | LinkedIn
> <http://www.linkedin.com/company/1148792?trk=tyah>
>


Re: Dataset - reduceByKey

2016-06-07 Thread Richard Marscher
There certainly are some gaps between the richness of the RDD API and the
Dataset API. I'm also migrating from RDD to Dataset and ran into
reduceByKey and join scenarios.

In the spark-dev list, one person was discussing reduceByKey being
sub-optimal at the moment and it spawned this JIRA
https://issues.apache.org/jira/browse/SPARK-15598. But you might be able to
get by with groupBy().reduce() for now, check performance though.

As for join, the approach would be using the joinWith function on Dataset.
Although the API isn't as sugary as it was for RDD IMO, something which
I've been discussing in a separate thread as well. I can't find a weblink
for it but the thread subject is "Dataset Outer Join vs RDD Outer Join".

On Tue, Jun 7, 2016 at 2:40 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> It would also be nice if there was a better example of joining two
> Datasets. I am looking at the documentation here:
> http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems
> a little bit sparse - is there a better documentation source?
>
> Regards,
>
> Bryan Jeffrey
>
> On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
>> Hello.
>>
>> I am looking at the option of moving RDD based operations to Dataset
>> based operations.  We are calling 'reduceByKey' on some pair RDDs we have.
>> What would the equivalent be in the Dataset interface - I do not see a
>> simple reduceByKey replacement.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>>
>


-- 
*Richard Marscher*
Senior Software Engineer
Localytics
Localytics.com <http://localytics.com/> | Our Blog
<http://localytics.com/blog> | Twitter <http://twitter.com/localytics> |
Facebook <http://facebook.com/localytics> | LinkedIn
<http://www.linkedin.com/company/1148792?trk=tyah>


Re: Dataset - reduceByKey

2016-06-07 Thread Takeshi Yamamuro
Seems you can see docs for 2.0 for now;
https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/spark-2.0.0-SNAPSHOT-2016_06_07_07_01-1e2c931-docs/

// maropu

On Tue, Jun 7, 2016 at 11:40 AM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> It would also be nice if there was a better example of joining two
> Datasets. I am looking at the documentation here:
> http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems
> a little bit sparse - is there a better documentation source?
>
> Regards,
>
> Bryan Jeffrey
>
> On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
> wrote:
>
>> Hello.
>>
>> I am looking at the option of moving RDD based operations to Dataset
>> based operations.  We are calling 'reduceByKey' on some pair RDDs we have.
>> What would the equivalent be in the Dataset interface - I do not see a
>> simple reduceByKey replacement.
>>
>> Regards,
>>
>> Bryan Jeffrey
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Dataset - reduceByKey

2016-06-07 Thread Bryan Jeffrey
It would also be nice if there was a better example of joining two
Datasets. I am looking at the documentation here:
http://spark.apache.org/docs/latest/sql-programming-guide.html. It seems a
little bit sparse - is there a better documentation source?

Regards,

Bryan Jeffrey

On Tue, Jun 7, 2016 at 2:32 PM, Bryan Jeffrey <bryan.jeff...@gmail.com>
wrote:

> Hello.
>
> I am looking at the option of moving RDD based operations to Dataset based
> operations.  We are calling 'reduceByKey' on some pair RDDs we have.  What
> would the equivalent be in the Dataset interface - I do not see a simple
> reduceByKey replacement.
>
> Regards,
>
> Bryan Jeffrey
>
>


Dataset - reduceByKey

2016-06-07 Thread Bryan Jeffrey
Hello.

I am looking at the option of moving RDD based operations to Dataset based
operations.  We are calling 'reduceByKey' on some pair RDDs we have.  What
would the equivalent be in the Dataset interface - I do not see a simple
reduceByKey replacement.

Regards,

Bryan Jeffrey


How to change Spark DataFrame groupby("col1",..,"coln") into reduceByKey()?

2016-05-22 Thread unk1102
Hi I have Spark job which does group by and I cant avoid it because of my use
case. I have large dataset around 1 TB which I need to process/update in
DataFrame. Now my jobs shuffles huge data and slows things because of
shuffling and groupby. One reason I see is my data is skew some of my group
by keys are empty. How do I avoid empty group by keys in DataFrame? Does
DataFrame avoid empty group by key? I have around 8 keys on which I do group
by. 

sourceFrame.select("blabla").groupby("col1","col2","col3",..."col8").agg("bla
bla");

How do I change above code into using reduceByKey() can we apply aggregation
on reduceByKey()? Please guide. Thanks in advance.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-change-Spark-DataFrame-groupby-col1-coln-into-reduceByKey-tp26998.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Executor memory requirement for reduceByKey

2016-05-17 Thread Raghavendra Pandey
Even though it does not sound intuitive,  reduce by key expects all values
for a particular key for a partition to be loaded into memory. So once you
increase the partitions you can run the jobs.


Re: Executor memory requirement for reduceByKey

2016-05-13 Thread Sung Hwan Chung
Ok, so that worked flawlessly after I upped the number of partitions to 400
from 40.

Thanks!

On Fri, May 13, 2016 at 7:28 PM, Sung Hwan Chung <coded...@cs.stanford.edu>
wrote:

> I'll try that, as of now I have a small number of partitions in the order
> of 20~40.
>
> It would be great if there's some documentation on the memory requirement
> wrt the number of keys and the number of partitions per executor (i.e., the
> Spark's internal memory requirement outside of the user space).
>
> Otherwise, it's like shooting in the dark.
>
> On Fri, May 13, 2016 at 7:20 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Have you taken a look at SPARK-11293 ?
>>
>> Consider using repartition to increase the number of partitions.
>>
>> FYI
>>
>> On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung <
>> coded...@cs.stanford.edu> wrote:
>>
>>> Hello,
>>>
>>> I'm using Spark version 1.6.0 and have trouble with memory when trying
>>> to do reducebykey on a dataset with as many as 75 million keys. I.e. I get
>>> the following exception when I run the task.
>>>
>>> There are 20 workers in the cluster. It is running under the standalone
>>> mode with 12 GB assigned per executor and 4 cores per worker. The
>>> spark.memory.fraction is set to 0.5 and I'm not using any caching.
>>>
>>> What might be the problem here? Since I'm using the version 1.6.0, this
>>> doesn't seem to be related to  SPARK-12155. This problem always happens
>>> during the shuffle read phase.
>>>
>>> Is there a minimum  amount of memory required for executor
>>> (spark.memory.fraction) for shuffle read?
>>>
>>> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
>>> at 
>>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
>>> at 
>>> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
>>> at 
>>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
>>> at 
>>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
>>> at 
>>> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
>>> at 
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
>>> at 
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>>> at 
>>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>
>


Re: Executor memory requirement for reduceByKey

2016-05-13 Thread Sung Hwan Chung
I'll try that, as of now I have a small number of partitions in the order
of 20~40.

It would be great if there's some documentation on the memory requirement
wrt the number of keys and the number of partitions per executor (i.e., the
Spark's internal memory requirement outside of the user space).

Otherwise, it's like shooting in the dark.

On Fri, May 13, 2016 at 7:20 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Have you taken a look at SPARK-11293 ?
>
> Consider using repartition to increase the number of partitions.
>
> FYI
>
> On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung <
> coded...@cs.stanford.edu> wrote:
>
>> Hello,
>>
>> I'm using Spark version 1.6.0 and have trouble with memory when trying to
>> do reducebykey on a dataset with as many as 75 million keys. I.e. I get the
>> following exception when I run the task.
>>
>> There are 20 workers in the cluster. It is running under the standalone
>> mode with 12 GB assigned per executor and 4 cores per worker. The
>> spark.memory.fraction is set to 0.5 and I'm not using any caching.
>>
>> What might be the problem here? Since I'm using the version 1.6.0, this
>> doesn't seem to be related to  SPARK-12155. This problem always happens
>> during the shuffle read phase.
>>
>> Is there a minimum  amount of memory required for executor
>> (spark.memory.fraction) for shuffle read?
>>
>> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
>>  at 
>> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
>>  at 
>> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
>>  at 
>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
>>  at 
>> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
>>  at 
>> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
>>  at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
>>  at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>>  at 
>> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>  at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>  at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>>
>>
>


Re: Executor memory requirement for reduceByKey

2016-05-13 Thread Ted Yu
Have you taken a look at SPARK-11293 ?

Consider using repartition to increase the number of partitions.

FYI

On Fri, May 13, 2016 at 12:14 PM, Sung Hwan Chung <coded...@cs.stanford.edu>
wrote:

> Hello,
>
> I'm using Spark version 1.6.0 and have trouble with memory when trying to
> do reducebykey on a dataset with as many as 75 million keys. I.e. I get the
> following exception when I run the task.
>
> There are 20 workers in the cluster. It is running under the standalone
> mode with 12 GB assigned per executor and 4 cores per worker. The
> spark.memory.fraction is set to 0.5 and I'm not using any caching.
>
> What might be the problem here? Since I'm using the version 1.6.0, this
> doesn't seem to be related to  SPARK-12155. This problem always happens
> during the shuffle read phase.
>
> Is there a minimum  amount of memory required for executor
> (spark.memory.fraction) for shuffle read?
>
> java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
>   at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
>   at 
> org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
>   at 
> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
>   at 
> org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
>   at 
> org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
>   at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
>
>


Executor memory requirement for reduceByKey

2016-05-13 Thread Sung Hwan Chung
Hello,

I'm using Spark version 1.6.0 and have trouble with memory when trying to
do reducebykey on a dataset with as many as 75 million keys. I.e. I get the
following exception when I run the task.

There are 20 workers in the cluster. It is running under the standalone
mode with 12 GB assigned per executor and 4 cores per worker. The
spark.memory.fraction is set to 0.5 and I'm not using any caching.

What might be the problem here? Since I'm using the version 1.6.0, this
doesn't seem to be related to  SPARK-12155. This problem always happens
during the shuffle read phase.

Is there a minimum  amount of memory required for executor
(spark.memory.fraction) for shuffle read?

java.lang.OutOfMemoryError: Unable to acquire 262144 bytes of memory, got 0
at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:91)
at 
org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:735)
at 
org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:197)
at 
org.apache.spark.unsafe.map.BytesToBytesMap.(BytesToBytesMap.java:212)
at 
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.(UnsafeFixedWidthAggregationMap.java:103)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.(TungstenAggregationIterator.scala:483)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
at 
org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)


Re: reduceByKey as Action or Transformation

2016-04-25 Thread Sumedh Wale

  
  
On Monday 25 April 2016 11:28 PM,
  Weiping Qu wrote:


  
  Dear Ted,
  
  You are right. ReduceByKey is transformation. My fault.
  I would rephrase my question using following code snippet.
  object ScalaApp {
  
    def main(args: Array[String]): Unit ={
      val conf = new
  SparkConf().setAppName("ScalaApp").setMaster("local")
      val sc = new SparkContext(conf)
      //val textFile: RDD[String] =
      val file = sc.textFile("/home/usr/test.dat")
      val output = file.flatMap(line => line.split(" "))
    .map(word => (word, 1))
    .reduceByKey(_ + _)
  
      output.persist()
      output.count()
      output.collect()
  }
  
  It's a simple code snippet. 
  I realize that the first action count() would trigger the
  execution based on HadoopRDD, MapParititonRDD and the reduceByKey
  will take the ShuffleRDD as input to perform the count.


The count() will trigger both the execution as well as the
persistence of output RDD (as each partition is iterated).

 The second action collect just perform the collect
  over the same ShuffleRDD.


It will use the persisted ShuffleRDD blocks.

 I think the re-calculation will also be carried out
  over ShuffleRDD instead of re-executing preceding HadoopRDD and
  MapParitionRDD in case one partition of persisted output is
  missing.
  Am I right?


Since it is a partition of persisted ShuffleRDD that is missing, the
partition will have to be recreated from the base HadoopRDD. To
avoid it, one can checkpoint the ShuffleRDD if required.

 
  Thanks and Regards,
  Weiping
  
  
  
  


regards
-- 
Sumedh Wale
SnappyData (http://www.snappydata.io)
  


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: reduceByKey as Action or Transformation

2016-04-25 Thread Weiping Qu

Dear Ted,

You are right. ReduceByKey is transformation. My fault.
I would rephrase my question using following code snippet.
object ScalaApp {

  def main(args: Array[String]): Unit ={
val conf = new SparkConf().setAppName("ScalaApp").setMaster("local")
val sc = new SparkContext(conf)
//val textFile: RDD[String] =
val file = sc.textFile("/home/usr/test.dat")
val output = file.flatMap(line => line.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)

output.persist()
output.count()
output.collect()
}

It's a simple code snippet.
I realize that the first action count() would trigger the execution 
based on HadoopRDD, MapParititonRDD and the reduceByKey will take the 
ShuffleRDD as input to perform the count.

The second action collect just perform the collect over the same ShuffleRDD.
I think the re-calculation will also be carried out over ShuffleRDD 
instead of re-executing preceding HadoopRDD and MapParitionRDD in case 
one partition of persisted output is missing.

Am I right?

Thanks and Regards,
Weiping

On 25.04.2016 17:46, Ted Yu wrote:

Can you show snippet of your code which demonstrates what you observed ?

Thansk

On Mon, Apr 25, 2016 at 8:38 AM, Weiping Qu <q...@informatik.uni-kl.de 
<mailto:q...@informatik.uni-kl.de>> wrote:


Thanks.
I read that from the specification.
I thought the way people distinguish actions and transformations
depends on whether they are lazily executed or not.
As far as I saw from my codes, the reduceByKey will be executed
without any operations in the Action category.
Please correct me if I am wrong.

Thanks,
Regards,
Weiping

On 25.04.2016 17 <tel:25.04.2016%2017>:20, Chadha Pooja wrote:

Reduce By Key is a Transformation


http://spark.apache.org/docs/latest/programming-guide.html#transformations

Thanks

_

Pooja Chadha
Senior Architect
THE BOSTON CONSULTING GROUP
Mobile +1 617 794 3862 <tel:%2B1%20617%20794%203862>


_




-Original Message-
From: Weiping Qu [mailto:q...@informatik.uni-kl.de
<mailto:q...@informatik.uni-kl.de>]
Sent: Monday, April 25, 2016 11:05 AM
To: u...@spark.incubator.apache.org
    <mailto:u...@spark.incubator.apache.org>
Subject: reduceByKey as Action or Transformation

Hi,

I'd like just to verify that whether reduceByKey is
transformation or
actions.
As written in RDD papers, spark flow will not be triggered only if
actions are reached.
I tried and saw that the my flow will be executed once there is a
reduceByKey while it is categorized into transformations in
Spark 1.6.1
specification.

Thanks and Regards,
Weiping

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>


__
The Boston Consulting Group, Inc.
  This e-mail message may contain confidential and/or
privileged information.
If you are not an addressee or otherwise authorized to receive
this message,
you should not use, copy, disclose or take any action based on
this e-mail or
any information contained in the message. If you have received
this material
in error, please advise the sender immediately by reply e-mail
and delete this
message. Thank you.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: user-h...@spark.apache.org
<mailto:user-h...@spark.apache.org>






Re: reduceByKey as Action or Transformation

2016-04-25 Thread Ted Yu
Can you show snippet of your code which demonstrates what you observed ?

Thansk

On Mon, Apr 25, 2016 at 8:38 AM, Weiping Qu <q...@informatik.uni-kl.de> wrote:

> Thanks.
> I read that from the specification.
> I thought the way people distinguish actions and transformations depends
> on whether they are lazily executed or not.
> As far as I saw from my codes, the reduceByKey will be executed without
> any operations in the Action category.
> Please correct me if I am wrong.
>
> Thanks,
> Regards,
> Weiping
>
> On 25.04.2016 17:20, Chadha Pooja wrote:
>
>> Reduce By Key is a Transformation
>>
>> http://spark.apache.org/docs/latest/programming-guide.html#transformations
>>
>> Thanks
>>
>> _
>>
>> Pooja Chadha
>> Senior Architect
>> THE BOSTON CONSULTING GROUP
>> Mobile +1 617 794 3862
>>
>>
>> _
>>
>>
>>
>> -Original Message-----
>> From: Weiping Qu [mailto:q...@informatik.uni-kl.de]
>> Sent: Monday, April 25, 2016 11:05 AM
>> To: u...@spark.incubator.apache.org
>> Subject: reduceByKey as Action or Transformation
>>
>> Hi,
>>
>> I'd like just to verify that whether reduceByKey is transformation or
>> actions.
>> As written in RDD papers, spark flow will not be triggered only if
>> actions are reached.
>> I tried and saw that the my flow will be executed once there is a
>> reduceByKey while it is categorized into transformations in Spark 1.6.1
>> specification.
>>
>> Thanks and Regards,
>> Weiping
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>> __
>> The Boston Consulting Group, Inc.
>>   This e-mail message may contain confidential and/or privileged
>> information.
>> If you are not an addressee or otherwise authorized to receive this
>> message,
>> you should not use, copy, disclose or take any action based on this
>> e-mail or
>> any information contained in the message. If you have received this
>> material
>> in error, please advise the sender immediately by reply e-mail and delete
>> this
>> message. Thank you.
>>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: reduceByKey as Action or Transformation

2016-04-25 Thread Weiping Qu

Thanks.
I read that from the specification.
I thought the way people distinguish actions and transformations depends 
on whether they are lazily executed or not.
As far as I saw from my codes, the reduceByKey will be executed without 
any operations in the Action category.

Please correct me if I am wrong.

Thanks,
Regards,
Weiping

On 25.04.2016 17:20, Chadha Pooja wrote:

Reduce By Key is a Transformation

http://spark.apache.org/docs/latest/programming-guide.html#transformations

Thanks
_

Pooja Chadha
Senior Architect
THE BOSTON CONSULTING GROUP
Mobile +1 617 794 3862

_


-Original Message-
From: Weiping Qu [mailto:q...@informatik.uni-kl.de]
Sent: Monday, April 25, 2016 11:05 AM
To: u...@spark.incubator.apache.org
Subject: reduceByKey as Action or Transformation

Hi,

I'd like just to verify that whether reduceByKey is transformation or
actions.
As written in RDD papers, spark flow will not be triggered only if
actions are reached.
I tried and saw that the my flow will be executed once there is a
reduceByKey while it is categorized into transformations in Spark 1.6.1
specification.

Thanks and Regards,
Weiping

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

__
The Boston Consulting Group, Inc.
  
This e-mail message may contain confidential and/or privileged information.

If you are not an addressee or otherwise authorized to receive this message,
you should not use, copy, disclose or take any action based on this e-mail or
any information contained in the message. If you have received this material
in error, please advise the sender immediately by reply e-mail and delete this
message. Thank you.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



reduceByKey as Action or Transformation

2016-04-25 Thread Weiping Qu

Hi,

I'd like just to verify that whether reduceByKey is transformation or 
actions.
As written in RDD papers, spark flow will not be triggered only if 
actions are reached.
I tried and saw that the my flow will be executed once there is a 
reduceByKey while it is categorized into transformations in Spark 1.6.1 
specification.


Thanks and Regards,
Weiping

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



What is the relationship between reduceByKey and spark.driver.maxResultSize?

2015-12-11 Thread Tom Seddon
I have a job that is running into intermittent errors with  [SparkDriver]
java.lang.OutOfMemoryError: Java heap space.  Before I was getting this
error I was getting errors saying the result size exceed the
spark.driver.maxResultSize.
This does not make any sense to me, as there are no actions in my job that
send data to the driver - just a pull of data from S3, a map and
reduceByKey and then conversion to dataframe and saveAsTable action that
puts the results back on S3.

I've found a few references to reduceByKey and spark.driver.maxResultSize
having some importance, but cannot fathom how this setting could be related.

Would greatly appreciated any advice.

Thanks in advance,

Tom


Re: What is the relationship between reduceByKey and spark.driver.maxResultSize?

2015-12-11 Thread Zhan Zhang
I think you are fetching too many results to the driver. Typically, it is not 
recommended to collect much data to driver. But if you have to, you can 
increase the driver memory, when submitting jobs.

Thanks.

Zhan Zhang

On Dec 11, 2015, at 6:14 AM, Tom Seddon 
<mr.tom.sed...@gmail.com<mailto:mr.tom.sed...@gmail.com>> wrote:

I have a job that is running into intermittent errors with  [SparkDriver] 
java.lang.OutOfMemoryError: Java heap space.  Before I was getting this error I 
was getting errors saying the result size exceed the 
spark.driver.maxResultSize.  This does not make any sense to me, as there are 
no actions in my job that send data to the driver - just a pull of data from 
S3, a map and reduceByKey and then conversion to dataframe and saveAsTable 
action that puts the results back on S3.

I've found a few references to reduceByKey and spark.driver.maxResultSize 
having some importance, but cannot fathom how this setting could be related.

Would greatly appreciated any advice.

Thanks in advance,

Tom



Re: What is the relationship between reduceByKey and spark.driver.maxResultSize?

2015-12-11 Thread Eugen Cepoi
Do you have a large number of tasks? This can happen if you have a large
number of tasks and a small driver or if you use accumulators of lists like
datastructures.

2015-12-11 11:17 GMT-08:00 Zhan Zhang <zzh...@hortonworks.com>:

> I think you are fetching too many results to the driver. Typically, it is
> not recommended to collect much data to driver. But if you have to, you can
> increase the driver memory, when submitting jobs.
>
> Thanks.
>
> Zhan Zhang
>
> On Dec 11, 2015, at 6:14 AM, Tom Seddon <mr.tom.sed...@gmail.com> wrote:
>
> I have a job that is running into intermittent errors with  [SparkDriver]
> java.lang.OutOfMemoryError: Java heap space.  Before I was getting this
> error I was getting errors saying the result size exceed the 
> spark.driver.maxResultSize.
> This does not make any sense to me, as there are no actions in my job that
> send data to the driver - just a pull of data from S3, a map and
> reduceByKey and then conversion to dataframe and saveAsTable action that
> puts the results back on S3.
>
> I've found a few references to reduceByKey and spark.driver.maxResultSize
> having some importance, but cannot fathom how this setting could be related.
>
> Would greatly appreciated any advice.
>
> Thanks in advance,
>
> Tom
>
>
>


Re: Data in one partition after reduceByKey

2015-11-25 Thread Ruslan Dautkhanov
public long getTime()

Returns the number of milliseconds since January 1, 1970, 00:00:00 GMT
represented by this Date object.

http://docs.oracle.com/javase/7/docs/api/java/util/Date.html#getTime%28%29

Based on what you did i might be easier to get date partitioner from that.
Also, to get even more even distriubution you could use a hash function
from that not just a remainder.




-- 
Ruslan Dautkhanov

On Mon, Nov 23, 2015 at 6:35 AM, Patrick McGloin <mcgloin.patr...@gmail.com>
wrote:

> I will answer my own question, since I figured it out.  Here is my answer
> in case anyone else has the same issue.
>
> My DateTimes were all without seconds and milliseconds since I wanted to
> group data belonging to the same minute. The hashCode() for Joda DateTimes
> which are one minute apart is a constant:
>
> scala> val now = DateTime.now
> now: org.joda.time.DateTime = 2015-11-23T11:14:17.088Z
>
> scala> now.withSecondOfMinute(0).withMillisOfSecond(0).hashCode - 
> now.minusMinutes(1).withSecondOfMinute(0).withMillisOfSecond(0).hashCode
> res42: Int = 6
>
> As can be seen by this example, if the hashCode values are similarly
> spaced, they can end up in the same partition:
>
> scala> val nums = for(i <- 0 to 100) yield ((i*20 % 1000), i)
> nums: scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector((0,0), 
> (20,1), (40,2), (60,3), (80,4), (100,5), (120,6), (140,7), (160,8), (180,9), 
> (200,10), (220,11), (240,12), (260,13), (280,14), (300,15), (320,16), 
> (340,17), (360,18), (380,19), (400,20), (420,21), (440,22), (460,23), 
> (480,24), (500,25), (520,26), (540,27), (560,28), (580,29), (600,30), 
> (620,31), (640,32), (660,33), (680,34), (700,35), (720,36), (740,37), 
> (760,38), (780,39), (800,40), (820,41), (840,42), (860,43), (880,44), 
> (900,45), (920,46), (940,47), (960,48), (980,49), (0,50), (20,51), (40,52), 
> (60,53), (80,54), (100,55), (120,56), (140,57), (160,58), (180,59), (200,60), 
> (220,61), (240,62), (260,63), (280,64), (300,65), (320,66), (340,67), 
> (360,68), (380,69), (400,70), (420,71), (440,72), (460,73), (480,74), (500...
>
> scala> val rddNum = sc.parallelize(nums)
> rddNum: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at 
> parallelize at :23
>
> scala> val reducedNum = rddNum.reduceByKey(_+_)
> reducedNum: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at 
> reduceByKey at :25
>
> scala> reducedNum.mapPartitions(iter => Array(iter.size).iterator, 
> true).collect.toList
>
> res2: List[Int] = List(50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
> 0, 0)
>
> To distribute my data more evenly across the partitions I created my own
> custom Partitoiner:
>
> class JodaPartitioner(rddNumPartitions: Int) extends Partitioner {
>   def numPartitions: Int = rddNumPartitions
>   def getPartition(key: Any): Int = {
> key match {
>   case dateTime: DateTime =>
> val sum = dateTime.getYear + dateTime.getMonthOfYear +  
> dateTime.getDayOfMonth + dateTime.getMinuteOfDay  + dateTime.getSecondOfDay
> sum % numPartitions
>   case _ => 0
> }
>   }
> }
>
>
> On 20 November 2015 at 17:17, Patrick McGloin <mcgloin.patr...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have Spark application which contains the following segment:
>>
>> val reparitioned = rdd.repartition(16)
>> val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned, startDate, 
>> endDate)
>> val mapped: RDD[(DateTime, myData)] = filtered.map(kv=(kv._1.processingTime, 
>> kv._2))
>> val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_)
>>
>> When I run this with some logging this is what I see:
>>
>> reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512, 2508, 
>> 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)]
>> filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 
>> 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
>> mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 
>> 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
>> reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)]
>>
>> My logging is done using these two lines:
>>
>> val sizes: RDD[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, 
>> true)log.info(s"rdd ==> [${sizes.collect.toList}]")
>>
>> My question is why does my data end up in one partition after the
>> reduceByKey? After the filter it can be seen that the data is evenly
>> distributed, but the reduceByKey results in data in only one partition.
>>
>> Thanks,
>>
>> Patrick
>>
>
>


Re: Data in one partition after reduceByKey

2015-11-23 Thread Patrick McGloin
I will answer my own question, since I figured it out.  Here is my answer
in case anyone else has the same issue.

My DateTimes were all without seconds and milliseconds since I wanted to
group data belonging to the same minute. The hashCode() for Joda DateTimes
which are one minute apart is a constant:

scala> val now = DateTime.now
now: org.joda.time.DateTime = 2015-11-23T11:14:17.088Z

scala> now.withSecondOfMinute(0).withMillisOfSecond(0).hashCode -
now.minusMinutes(1).withSecondOfMinute(0).withMillisOfSecond(0).hashCode
res42: Int = 6

As can be seen by this example, if the hashCode values are similarly
spaced, they can end up in the same partition:

scala> val nums = for(i <- 0 to 100) yield ((i*20 % 1000), i)
nums: scala.collection.immutable.IndexedSeq[(Int, Int)] =
Vector((0,0), (20,1), (40,2), (60,3), (80,4), (100,5), (120,6),
(140,7), (160,8), (180,9), (200,10), (220,11), (240,12), (260,13),
(280,14), (300,15), (320,16), (340,17), (360,18), (380,19), (400,20),
(420,21), (440,22), (460,23), (480,24), (500,25), (520,26), (540,27),
(560,28), (580,29), (600,30), (620,31), (640,32), (660,33), (680,34),
(700,35), (720,36), (740,37), (760,38), (780,39), (800,40), (820,41),
(840,42), (860,43), (880,44), (900,45), (920,46), (940,47), (960,48),
(980,49), (0,50), (20,51), (40,52), (60,53), (80,54), (100,55),
(120,56), (140,57), (160,58), (180,59), (200,60), (220,61), (240,62),
(260,63), (280,64), (300,65), (320,66), (340,67), (360,68), (380,69),
(400,70), (420,71), (440,72), (460,73), (480,74), (500...

scala> val rddNum = sc.parallelize(nums)
rddNum: org.apache.spark.rdd.RDD[(Int, Int)] =
ParallelCollectionRDD[0] at parallelize at :23

scala> val reducedNum = rddNum.reduceByKey(_+_)
reducedNum: org.apache.spark.rdd.RDD[(Int, Int)] = ShuffledRDD[1] at
reduceByKey at :25

scala> reducedNum.mapPartitions(iter => Array(iter.size).iterator,
true).collect.toList

res2: List[Int] = List(50, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0)

To distribute my data more evenly across the partitions I created my own
custom Partitoiner:

class JodaPartitioner(rddNumPartitions: Int) extends Partitioner {
  def numPartitions: Int = rddNumPartitions
  def getPartition(key: Any): Int = {
key match {
  case dateTime: DateTime =>
val sum = dateTime.getYear + dateTime.getMonthOfYear +
dateTime.getDayOfMonth + dateTime.getMinuteOfDay  +
dateTime.getSecondOfDay
sum % numPartitions
  case _ => 0
}
  }
}


On 20 November 2015 at 17:17, Patrick McGloin <mcgloin.patr...@gmail.com>
wrote:

> Hi,
>
> I have Spark application which contains the following segment:
>
> val reparitioned = rdd.repartition(16)
> val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned, startDate, 
> endDate)
> val mapped: RDD[(DateTime, myData)] = filtered.map(kv=(kv._1.processingTime, 
> kv._2))
> val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_)
>
> When I run this with some logging this is what I see:
>
> reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512, 2508, 
> 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)]
> filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 
> 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
> mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076, 2042, 
> 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
> reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)]
>
> My logging is done using these two lines:
>
> val sizes: RDD[Int] = rdd.mapPartitions(iter => Array(iter.size).iterator, 
> true)log.info(s"rdd ==> [${sizes.collect.toList}]")
>
> My question is why does my data end up in one partition after the
> reduceByKey? After the filter it can be seen that the data is evenly
> distributed, but the reduceByKey results in data in only one partition.
>
> Thanks,
>
> Patrick
>


Data in one partition after reduceByKey

2015-11-20 Thread Patrick McGloin
Hi,

I have Spark application which contains the following segment:

val reparitioned = rdd.repartition(16)
val filtered: RDD[(MyKey, myData)] = MyUtils.filter(reparitioned,
startDate, endDate)
val mapped: RDD[(DateTime, myData)] =
filtered.map(kv=(kv._1.processingTime, kv._2))
val reduced: RDD[(DateTime, myData)] = mapped.reduceByKey(_+_)

When I run this with some logging this is what I see:

reparitioned ==> [List(2536, 2529, 2526, 2520, 2519, 2514, 2512,
2508, 2504, 2501, 2496, 2490, 2551, 2547, 2543, 2537)]
filtered ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076,
2042, 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
mapped ==> [List(2081, 2063, 2043, 2040, 2063, 2050, 2081, 2076,
2042, 2066, 2032, 2001, 2031, 2101, 2050, 2068)]
reduced ==> [List(0, 0, 0, 0, 0, 0, 922, 0, 0, 0, 0, 0, 0, 0, 0, 0)]

My logging is done using these two lines:

val sizes: RDD[Int] = rdd.mapPartitions(iter =>
Array(iter.size).iterator, true)log.info(s"rdd ==>
[${sizes.collect.toList}]")

My question is why does my data end up in one partition after the
reduceByKey? After the filter it can be seen that the data is evenly
distributed, but the reduceByKey results in data in only one partition.

Thanks,

Patrick


Re: Incorrect results with reduceByKey

2015-11-18 Thread tovbinm
Deep copying the data solved the issue:
data.map(r => {val t = SpecificData.get().deepCopy(r.getSchema, r); (t.id,
List(t)) }).reduceByKey(_ ++ _)

(noted here:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1003)


Thanks Igor Berman, for pointing that out.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incorrect-results-with-reduceByKey-tp25410p25420.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Incorrect results with reduceByKey

2015-11-17 Thread tovbinm
Howdy,

We've noticed a strange behavior with Avro serialized data and reduceByKey
RDD functionality. Please see below:

 // We're reading a bunch of Avro serialized data
val data: RDD[T] = sparkContext.hadoopFile(path,
classOf[AvroInputFormat[T]], classOf[AvroWrapper[T]], classOf[NullWritable])
// Incorrect data returned
val bad: RDD[(String,List[T])] = data.map(r => (r.id,
List(r))).reduceByKey(_ ++ _)
// After adding the partitioner we get everything as expected
val good: RDD[(String,List[T])] = data.map(r => (r.id,
List(r))).partitionBy(Partitioner.defaultPartitioner(data)).reduceByKey(_ ++
_)


Any ideas? 

Thanks in advance



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incorrect-results-with-reduceByKey-tp25410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: job hangs when using pipe() with reduceByKey()

2015-11-01 Thread Gylfi
Hi. 

What is slow exactly? 
In code-base 1: 
When you run the persist() + count() you stored the result in RAM. 
Then the map + reducebykey is done on in-memory data. 

In the latter case (all-in-oneline) you are doing both steps at the same
time.

So you are saying that if you sum-up the time to do both steps in the first
code-base it is still much faster than the latter code-base ? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242p25248.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: job hangs when using pipe() with reduceByKey()

2015-11-01 Thread hotdog
yes, the first code takes only 30mins.
 but the second method, I wait for 5 hours, only finish 10%



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242p25249.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




job hangs when using pipe() with reduceByKey()

2015-10-31 Thread hotdog
I meet a situation:
When I use 
val a = rdd.pipe("./my_cpp_program").persist()
a.count()  // just use it to persist a
val b = a.map(s => (s, 1)).reduceByKey().count()
it 's so fast

but when I use
val b = rdd.pipe("./my_cpp_program").map(s => (s, 1)).reduceByKey().count()
it is so slow
and there are many such log in my executors:
15/10/31 19:53:58 INFO collection.ExternalSorter: Thread 78 spilling
in-memory map of 633.1 MB to disk (8 times so far)
15/10/31 19:54:14 INFO collection.ExternalSorter: Thread 74 spilling
in-memory map of 633.1 MB to disk (8 times so far)
15/10/31 19:54:17 INFO collection.ExternalSorter: Thread 79 spilling
in-memory map of 633.1 MB to disk (8 times so far)
15/10/31 19:54:29 INFO collection.ExternalSorter: Thread 77 spilling
in-memory map of 633.1 MB to disk (8 times so far)
15/10/31 19:54:50 INFO collection.ExternalSorter: Thread 76 spilling
in-memory map of 633.1 MB to disk (9 times so far)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re:Re: job hangs when using pipe() with reduceByKey()

2015-10-31 Thread 李森栋
spark 1.4.1
hadoop 2.6.0
centos 6.6






At 2015-10-31 23:14:46, "Ted Yu" <yuzhih...@gmail.com> wrote:

Which Spark release are you using ?


Which OS ?


Thanks


On Sat, Oct 31, 2015 at 5:18 AM, hotdog <lisend...@163.com> wrote:
I meet a situation:
When I use
val a = rdd.pipe("./my_cpp_program").persist()
a.count()  // just use it to persist a
val b = a.map(s => (s, 1)).reduceByKey().count()
it 's so fast

but when I use
val b = rdd.pipe("./my_cpp_program").map(s => (s, 1)).reduceByKey().count()
it is so slow
and there are many such log in my executors:
15/10/31 19:53:58 INFO collection.ExternalSorter: Thread 78 spilling
in-memory map of 633.1 MB to disk (8 times so far)
15/10/31 19:54:14 INFO collection.ExternalSorter: Thread 74 spilling
in-memory map of 633.1 MB to disk (8 times so far)
15/10/31 19:54:17 INFO collection.ExternalSorter: Thread 79 spilling
in-memory map of 633.1 MB to disk (8 times so far)
15/10/31 19:54:29 INFO collection.ExternalSorter: Thread 77 spilling
in-memory map of 633.1 MB to disk (8 times so far)
15/10/31 19:54:50 INFO collection.ExternalSorter: Thread 76 spilling
in-memory map of 633.1 MB to disk (9 times so far)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





Re: job hangs when using pipe() with reduceByKey()

2015-10-31 Thread Ted Yu
Which Spark release are you using ?

Which OS ?

Thanks

On Sat, Oct 31, 2015 at 5:18 AM, hotdog <lisend...@163.com> wrote:

> I meet a situation:
> When I use
> val a = rdd.pipe("./my_cpp_program").persist()
> a.count()  // just use it to persist a
> val b = a.map(s => (s, 1)).reduceByKey().count()
> it 's so fast
>
> but when I use
> val b = rdd.pipe("./my_cpp_program").map(s => (s, 1)).reduceByKey().count()
> it is so slow
> and there are many such log in my executors:
> 15/10/31 19:53:58 INFO collection.ExternalSorter: Thread 78 spilling
> in-memory map of 633.1 MB to disk (8 times so far)
> 15/10/31 19:54:14 INFO collection.ExternalSorter: Thread 74 spilling
> in-memory map of 633.1 MB to disk (8 times so far)
> 15/10/31 19:54:17 INFO collection.ExternalSorter: Thread 79 spilling
> in-memory map of 633.1 MB to disk (8 times so far)
> 15/10/31 19:54:29 INFO collection.ExternalSorter: Thread 77 spilling
> in-memory map of 633.1 MB to disk (8 times so far)
> 15/10/31 19:54:50 INFO collection.ExternalSorter: Thread 76 spilling
> in-memory map of 633.1 MB to disk (9 times so far)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/job-hangs-when-using-pipe-with-reduceByKey-tp25242.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread swetha
Hi,

We currently use reduceByKey to reduce by a particular metric name in our
Streaming/Batch job. It seems to be doing a lot of shuffles and it has
impact on performance. Does using a custompartitioner before calling
reduceByKey improve performance?


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread Tathagata Das
If you just want to control the number of reducers, then setting the
numPartitions is sufficient. If you want to control how exact partitioning
scheme (that is some other scheme other than hash-based) then you need to
implement a custom partitioner. It can be used to improve data skews, etc.
which ultimately improves performance.

On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> wrote:

> Hi,
>
> We currently use reduceByKey to reduce by a particular metric name in our
> Streaming/Batch job. It seems to be doing a lot of shuffles and it has
> impact on performance. Does using a custompartitioner before calling
> reduceByKey improve performance?
>
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread swetha kasireddy
So, Wouldn't  using a customPartitioner on the rdd upon which  the
groupByKey  or reduceByKey is performed avoid shuffles and improve
performance? My code does groupByAndSort and reduceByKey on different
datasets as shown below. Would using a custom partitioner on those datasets
before using a  groupByKey or reduceByKey improve performance? My idea is
to avoid shuffles and improve performance. Also, right now I see a lot of
spills when there is a very large dataset for groupByKey and reduceByKey. I
think the memory is not sufficient. We need to group by sessionId and then
sort the Jsons based on the timeStamp as shown in the below code.


What is the alternative to using groupByKey for better performance? And in
case of reduceByKey, would using a customPartitioner on the RDD upon which
the reduceByKey is performed would reduce the shuffles and improve the
performance?


rdd.partitionBy(customPartitioner)

def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
List[(Long, String)])] =
{ val grpdRecs = rdd.groupByKey(); val srtdRecs =
grpdRecs.mapValues[(List[(Long, String)])](iter =>
iter.toList.sortBy(_._1)) srtdRecs }

rdd.reduceByKey((a, b) => {
  (Math.max(a._1, b._1), (a._2 ++ b._2))
})



On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com> wrote:

> If you just want to control the number of reducers, then setting the
> numPartitions is sufficient. If you want to control how exact partitioning
> scheme (that is some other scheme other than hash-based) then you need to
> implement a custom partitioner. It can be used to improve data skews, etc.
> which ultimately improves performance.
>
> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com> wrote:
>
>> Hi,
>>
>> We currently use reduceByKey to reduce by a particular metric name in our
>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has
>> impact on performance. Does using a custompartitioner before calling
>> reduceByKey improve performance?
>>
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread Tathagata Das
if you specify the same partitioner (custom or otherwise) for both
partitionBy and groupBy, then may be it will help. The fundamental problem
is groupByKey, that takes a lot of working memory.
1. Try to avoid groupByKey. What is it that you want to after sorting the
list of grouped events? can you do that operation with a reduceByKey?
2. If not, use more partitions. That would cause lesser data in each
partition, so less spilling.
3. You can control the amount memory allocated for shuffles by changing the
configuration spark.shuffle.memoryFraction . More fraction would cause less
spilling.


On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> So, Wouldn't  using a customPartitioner on the rdd upon which  the
> groupByKey  or reduceByKey is performed avoid shuffles and improve
> performance? My code does groupByAndSort and reduceByKey on different
> datasets as shown below. Would using a custom partitioner on those datasets
> before using a  groupByKey or reduceByKey improve performance? My idea is
> to avoid shuffles and improve performance. Also, right now I see a lot of
> spills when there is a very large dataset for groupByKey and reduceByKey. I
> think the memory is not sufficient. We need to group by sessionId and then
> sort the Jsons based on the timeStamp as shown in the below code.
>
>
> What is the alternative to using groupByKey for better performance? And in
> case of reduceByKey, would using a customPartitioner on the RDD upon which
> the reduceByKey is performed would reduce the shuffles and improve the
> performance?
>
>
> rdd.partitionBy(customPartitioner)
>
> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
> List[(Long, String)])] =
> { val grpdRecs = rdd.groupByKey(); val srtdRecs =
> grpdRecs.mapValues[(List[(Long, String)])](iter =>
> iter.toList.sortBy(_._1)) srtdRecs }
>
> rdd.reduceByKey((a, b) => {
>   (Math.max(a._1, b._1), (a._2 ++ b._2))
> })
>
>
>
> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> If you just want to control the number of reducers, then setting the
>> numPartitions is sufficient. If you want to control how exact partitioning
>> scheme (that is some other scheme other than hash-based) then you need to
>> implement a custom partitioner. It can be used to improve data skews, etc.
>> which ultimately improves performance.
>>
>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We currently use reduceByKey to reduce by a particular metric name in our
>>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has
>>> impact on performance. Does using a custompartitioner before calling
>>> reduceByKey improve performance?
>>>
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread Tathagata Das
If it is streaming, you can look at updateStateByKey for maintaining active
sessions. But wont work for batch.

and I answered that before. it can improve performance if you change the
partitioning scheme from hash-based to something else. Its hard to say
anything beyond that without understand the data skew and other details of
your application. Before jumping into that, you should simple change the
number of partitions and see if the performance improves.



On Tue, Oct 27, 2015 at 7:10 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> After sorting the list of grouped events I would need to have an RDD that
> has a key which is nothing but the  sessionId and a list of values that are
> sorted by timeStamp for each input Json. So basically the return type would
> be RDD[(String, List[(Long, String)]  where the key is the sessionId and
>  a list of tuples that has a timeStamp and Json as the values. I will need
> to use groupByKey to do a groupBy sessionId and secondary sort by timeStamp
> and then get the list of JsonValues in a sorted order. Is there any
> alternative for that? Please find the code below that I used for the same.
>
>
> Also, does using a customPartitioner for a reduceByKey improve performance?
>
>
> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
> List[(Long, String)])] =
> { val grpdRecs = rdd.groupByKey(); val srtdRecs =
> grpdRecs.mapValues[(List[(Long, String)])](iter =>
> iter.toList.sortBy(_._1)) srtdRecs }
>
>
> On Tue, Oct 27, 2015 at 6:47 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> if you specify the same partitioner (custom or otherwise) for both
>> partitionBy and groupBy, then may be it will help. The fundamental problem
>> is groupByKey, that takes a lot of working memory.
>> 1. Try to avoid groupByKey. What is it that you want to after sorting the
>> list of grouped events? can you do that operation with a reduceByKey?
>> 2. If not, use more partitions. That would cause lesser data in each
>> partition, so less spilling.
>> 3. You can control the amount memory allocated for shuffles by changing
>> the configuration spark.shuffle.memoryFraction . More fraction would cause
>> less spilling.
>>
>>
>> On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy <
>> swethakasire...@gmail.com> wrote:
>>
>>> So, Wouldn't  using a customPartitioner on the rdd upon which  the
>>> groupByKey  or reduceByKey is performed avoid shuffles and improve
>>> performance? My code does groupByAndSort and reduceByKey on different
>>> datasets as shown below. Would using a custom partitioner on those datasets
>>> before using a  groupByKey or reduceByKey improve performance? My idea is
>>> to avoid shuffles and improve performance. Also, right now I see a lot of
>>> spills when there is a very large dataset for groupByKey and reduceByKey. I
>>> think the memory is not sufficient. We need to group by sessionId and then
>>> sort the Jsons based on the timeStamp as shown in the below code.
>>>
>>>
>>> What is the alternative to using groupByKey for better performance? And
>>> in case of reduceByKey, would using a customPartitioner on the RDD upon
>>> which the reduceByKey is performed would reduce the shuffles and improve
>>> the performance?
>>>
>>>
>>> rdd.partitionBy(customPartitioner)
>>>
>>> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]):
>>> RDD[(String, List[(Long, String)])] =
>>> { val grpdRecs = rdd.groupByKey(); val srtdRecs =
>>> grpdRecs.mapValues[(List[(Long, String)])](iter =>
>>> iter.toList.sortBy(_._1)) srtdRecs }
>>>
>>> rdd.reduceByKey((a, b) => {
>>>   (Math.max(a._1, b._1), (a._2 ++ b._2))
>>> })
>>>
>>>
>>>
>>> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> If you just want to control the number of reducers, then setting the
>>>> numPartitions is sufficient. If you want to control how exact partitioning
>>>> scheme (that is some other scheme other than hash-based) then you need to
>>>> implement a custom partitioner. It can be used to improve data skews, etc.
>>>> which ultimately improves performance.
>>>>
>>>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We currently use reduceByKey to reduce by a particular metric name in
>>>>> our
>>>>> Streamin

Re: Does using Custom Partitioner before calling reduceByKey improve performance?

2015-10-27 Thread swetha kasireddy
After sorting the list of grouped events I would need to have an RDD that
has a key which is nothing but the  sessionId and a list of values that are
sorted by timeStamp for each input Json. So basically the return type would
be RDD[(String, List[(Long, String)]  where the key is the sessionId and  a
list of tuples that has a timeStamp and Json as the values. I will need to
use groupByKey to do a groupBy sessionId and secondary sort by timeStamp
and then get the list of JsonValues in a sorted order. Is there any
alternative for that? Please find the code below that I used for the same.


Also, does using a customPartitioner for a reduceByKey improve performance?


def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
List[(Long, String)])] =
{ val grpdRecs = rdd.groupByKey(); val srtdRecs =
grpdRecs.mapValues[(List[(Long, String)])](iter =>
iter.toList.sortBy(_._1)) srtdRecs }


On Tue, Oct 27, 2015 at 6:47 PM, Tathagata Das <t...@databricks.com> wrote:

> if you specify the same partitioner (custom or otherwise) for both
> partitionBy and groupBy, then may be it will help. The fundamental problem
> is groupByKey, that takes a lot of working memory.
> 1. Try to avoid groupByKey. What is it that you want to after sorting the
> list of grouped events? can you do that operation with a reduceByKey?
> 2. If not, use more partitions. That would cause lesser data in each
> partition, so less spilling.
> 3. You can control the amount memory allocated for shuffles by changing
> the configuration spark.shuffle.memoryFraction . More fraction would cause
> less spilling.
>
>
> On Tue, Oct 27, 2015 at 6:35 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> So, Wouldn't  using a customPartitioner on the rdd upon which  the
>> groupByKey  or reduceByKey is performed avoid shuffles and improve
>> performance? My code does groupByAndSort and reduceByKey on different
>> datasets as shown below. Would using a custom partitioner on those datasets
>> before using a  groupByKey or reduceByKey improve performance? My idea is
>> to avoid shuffles and improve performance. Also, right now I see a lot of
>> spills when there is a very large dataset for groupByKey and reduceByKey. I
>> think the memory is not sufficient. We need to group by sessionId and then
>> sort the Jsons based on the timeStamp as shown in the below code.
>>
>>
>> What is the alternative to using groupByKey for better performance? And
>> in case of reduceByKey, would using a customPartitioner on the RDD upon
>> which the reduceByKey is performed would reduce the shuffles and improve
>> the performance?
>>
>>
>> rdd.partitionBy(customPartitioner)
>>
>> def getGrpdAndSrtdRecs(rdd: RDD[(String, (Long, String))]): RDD[(String,
>> List[(Long, String)])] =
>> { val grpdRecs = rdd.groupByKey(); val srtdRecs =
>> grpdRecs.mapValues[(List[(Long, String)])](iter =>
>> iter.toList.sortBy(_._1)) srtdRecs }
>>
>> rdd.reduceByKey((a, b) => {
>>   (Math.max(a._1, b._1), (a._2 ++ b._2))
>> })
>>
>>
>>
>> On Tue, Oct 27, 2015 at 5:07 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> If you just want to control the number of reducers, then setting the
>>> numPartitions is sufficient. If you want to control how exact partitioning
>>> scheme (that is some other scheme other than hash-based) then you need to
>>> implement a custom partitioner. It can be used to improve data skews, etc.
>>> which ultimately improves performance.
>>>
>>> On Tue, Oct 27, 2015 at 1:20 PM, swetha <swethakasire...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> We currently use reduceByKey to reduce by a particular metric name in
>>>> our
>>>> Streaming/Batch job. It seems to be doing a lot of shuffles and it has
>>>> impact on performance. Does using a custompartitioner before calling
>>>> reduceByKey improve performance?
>>>>
>>>>
>>>> Thanks,
>>>> Swetha
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Does-using-Custom-Partitioner-before-calling-reduceByKey-improve-performance-tp25214.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: Configuring Spark for reduceByKey on on massive data sets

2015-10-12 Thread hotdog
hi Daniel, 
Do you solve your problem?
I met the same problem when running massive data using reduceByKey on yarn.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Configuring-Spark-for-reduceByKey-on-on-massive-data-sets-tp5966p25023.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: "Too many open files" exception on reduceByKey

2015-10-11 Thread Tian Zhang
It turns out the mesos can overwrite the OS ulimit -n setting. So we have
increased the mesos slave ulimit -n setting.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p25019.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: "Too many open files" exception on reduceByKey

2015-10-09 Thread tian zhang
You are right, I did find that mesos overwrite this to a smaller number.So we 
will modify that and try to run again. Thanks!
Tian 


 On Thursday, October 8, 2015 4:18 PM, DB Tsai <dbt...@dbtsai.com> wrote:
   

 Try to run to see actual ulimit. We found that mesos overrides the ulimit 
which causes the issue.
import sys.process._
val p = 1 to 100
val rdd = sc.parallelize(p, 100)
val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect



Sincerely,

DB Tsai
--Blog: 
https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D

On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang <tzhang...@yahoo.com> wrote:

I hit this issue with spark 1.3.0 stateful application (with
updateStateByKey) function on mesos.  It will
fail after running fine for about 24 hours.
The error stack trace as below, I checked ulimit -n and we have very large
numbers set on the machines.
What else can be wrong?
15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
java.io.FileNotFoundException:
/media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
(Too many open files)
        at java.io.FileOutputStream.open(Native Method)
        at java.io.FileOutputStream.(FileOutputStream.java:221)
        at java.io.FileOutputStream.(FileOutputStream.java:171)
        at
org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
        at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
        at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





  

Re: "Too many open files" exception on reduceByKey

2015-10-08 Thread Tian Zhang
I hit this issue with spark 1.3.0 stateful application (with
updateStateByKey) function on mesos.  It will 
fail after running fine for about 24 hours.
The error stack trace as below, I checked ulimit -n and we have very large
numbers set on the machines.
What else can be wrong?
15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
java.io.FileNotFoundException:
/media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
(Too many open files)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.(FileOutputStream.java:221)
at java.io.FileOutputStream.(FileOutputStream.java:171)
at
org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: "Too many open files" exception on reduceByKey

2015-10-08 Thread DB Tsai
Try to run to see actual ulimit. We found that mesos overrides the ulimit
which causes the issue.

import sys.process._
val p = 1 to 100
val rdd = sc.parallelize(p, 100)
val a = rdd.map(x=> Seq("sh", "-c", "ulimit -n").!!.toDouble.toLong).collect




Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D
<https://pgp.mit.edu/pks/lookup?search=0x59DF55B8AF08DF8D>

On Thu, Oct 8, 2015 at 3:22 PM, Tian Zhang <tzhang...@yahoo.com> wrote:

> I hit this issue with spark 1.3.0 stateful application (with
> updateStateByKey) function on mesos.  It will
> fail after running fine for about 24 hours.
> The error stack trace as below, I checked ulimit -n and we have very large
> numbers set on the machines.
> What else can be wrong?
> 15/09/27 18:45:11 WARN scheduler.TaskSetManager: Lost task 2.0 in stage
> 113727.0 (TID 833758, ip-10-112-10-221.ec2.internal):
> java.io.FileNotFoundException:
>
> /media/ephemeral0/oncue/mesos-slave/slaves/20150512-215537-2165010442-5050-1730-S5/frameworks/20150825-175705-2165010442-5050-13705-0338/executors/0/runs/19342849-d076-483c-88da-747896e19b93/./spark-6efa2dcd-aea7-478e-9fa9-6e0973578eb4/blockmgr-33b1e093-6dd6-4462-938c-2597516272a9/27/shuffle_535_2_0.index
> (Too many open files)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at java.io.FileOutputStream.(FileOutputStream.java:171)
> at
>
> org.apache.spark.shuffle.IndexShuffleBlockManager.writeIndexFile(IndexShuffleBlockManager.scala:85)
> at
>
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:69)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Too-many-open-files-exception-on-reduceByKey-tp2462p24985.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to make Group By/reduceByKey more efficient?

2015-09-24 Thread Adrian Tanase
All the *ByKey aggregations perform an efficient shuffle and preserve 
partitioning on the output. If all you need is to call reduceByKey, then don’t 
bother with groupBy. You should use groupBy if you really need all the 
datapoints from a key for a very custom operation.


From the docs:

Note: If you are grouping in order to perform an aggregation (such as a sum or 
average) over each key, using reduceByKey or aggregateByKey will yield much 
better performance. 


What you should worry about in more complex pipelines is that you’re actually 
preserving the partitioner between stages. For example, if you use a custom 
partitioner between a partitionBy and an updateStateBy key. Or if you use .map 
or .flatMap instead of .mapValues and .flatMapValues.

By the way, learn to use the Spark UI to understand the DAG / Execution plan 
and try to navigate the source code - I found the comments and the various 
preservePartitioner options very educational.

-adrian





On 9/23/15, 8:43 AM, "swetha" <swethakasire...@gmail.com> wrote:

>Hi,
>
>How to make Group By more efficient? Is it recommended to use a custom
>partitioner and then do a Group By? And can we use a custom partitioner and
>then use a  reduceByKey for optimization?
>
>
>Thanks,
>Swetha
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-Group-By-reduceByKey-more-efficient-tp24780.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


reduceByKey inside updateStateByKey in Spark Streaming???

2015-09-24 Thread swetha
Hi,

How to use reduceByKey inside updateStateByKey? Suppose I have a bunch of
keys for which I need to do sum and average inside the  updateStateByKey by
joining with old state. How do I accomplish that?


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-inside-updateStateByKey-in-Spark-Streaming-tp24808.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: reduceByKey inside updateStateByKey in Spark Streaming???

2015-09-24 Thread Adrian Tanase
The 2 operations can't be used inside one another.

If you need something like an all time average then you need to keep a tuple 
(sum, count) to which you add all the new values that come in every batch. The 
average is then just a map on the state DStream.

Makes sense? have I guessed your use case?

Sent from my iPhone

> On 24 Sep 2015, at 19:47, swetha <swethakasire...@gmail.com> wrote:
> 
> Hi,
> 
> How to use reduceByKey inside updateStateByKey? Suppose I have a bunch of
> keys for which I need to do sum and average inside the  updateStateByKey by
> joining with old state. How do I accomplish that?
> 
> 
> Thanks,
> Swetha
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/reduceByKey-inside-updateStateByKey-in-Spark-Streaming-tp24808.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to make Group By/reduceByKey more efficient?

2015-09-22 Thread swetha
Hi,

How to make Group By more efficient? Is it recommended to use a custom
partitioner and then do a Group By? And can we use a custom partitioner and
then use a  reduceByKey for optimization?


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-make-Group-By-reduceByKey-more-efficient-tp24780.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



reduceByKey not working on JavaPairDStream

2015-08-26 Thread Deepesh Maheshwari
Hi,
I have applied mapToPair and then a reduceByKey on a DStream to obtain a
JavaPairDStreamString, MapString, Object.
I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained
above.
But i do not see any logs from reduceByKey operation.
Can anyone explain why is this happening..?


find My Code Below -



* /***   * GroupLevel1 Groups - articleId, host and tags
 */*
JavaPairDStreamString, MapString, Object groupLevel1 =
inputDataMap

.mapToPair(
new PairFunctionMapString, Object, String,
MapString, Object() {

private static final long serialVersionUID =
5196132687044875422L;

@Override
public Tuple2String, MapString, Object call(

MapString, Object map) throws
Exception {
String host = (String) map.get(host);
String articleId = (String)
map.get(articleId);
List tags = (List) map.get(tags);

if (host == null || articleId == null) {
logger.error(*** Error Doc
\n + map);
}
String key = articleId_ + articleId +
_host_ + host + _tags_ + tags.toString();

//logger.info(key);
System.out.println(Printing Key -  + key);

map.put(articlecount, 1L);

return new Tuple2String, MapString,
Object(key, map);
}
})
.reduceByKey(
new Function2MapString, Object, MapString,
Object, MapString, Object() {

private static final long serialVersionUID = 1L;


@Override
public MapString, Object call(
MapString, Object map1,
MapString, Object map2) throws
Exception {
Long count1 = (Long)
map1.get(articlecount);
Long count2 = (Long)
map2.get(articlecount);

map1.put(articlecount, count1 + count2);
return map1;
}
});













*/***   * Grouping level 1 groups on articleId+host+tags
   * Tags can be multiple for an article.   * Grouping level 2
does -   *  1. For each tag in a row, find occurrence of that tag
in other rows.   *  2. If one tag found in another row, then add
the articleCount of current and new row and put as articleCount for that
tag.   *  Note -   *  Idea behind this grouping is to
get all article counts that contain a particular tag and preserve this
value.   */*


JavaPairDStreamString, MapString, Object groupLevel2 =
groupLevel1.flatMapToPair(new PairFlatMapFunctionTuple2String,
MapString, Object, String, MapString, Object() {
@Override
public IterableTuple2String, MapString, Object
call(Tuple2String, MapString, Object stringMapTuple2) throws Exception {

System.out.println(group level 2 tuple 1 - +
stringMapTuple2._1());
System.out.println(group level 2 tuple 2 - +
stringMapTuple2._2());
ArrayListString tagList = (ArrayListString)
stringMapTuple2._2().get(tags);
ArrayList tagKeyList = new ArrayList();
String host = (String) stringMapTuple2._2().get(host);
StringBuilder key;
for (String tag : tagList) {
key = new
StringBuilder(host_).append(host).append(_tag_).append(tag);
System.out.println(generated Key - +key);
tagKeyList.add(new Tuple2String, MapString,
Object(key.toString(), stringMapTuple2._2()));
}
return tagKeyList;
}
});

groupLevel2 = groupLevel2.reduceByKey(new Function2MapString,
Object, MapString, Object, MapString, Object() {
@Override
public MapString, Object call(MapString, Object dataMap1,
MapString, Object dataMap2) throws Exception {
System.out.println(Type of article map in 1  +
dataMap1.get(articleId).getClass());
System.out.println(Type of article map in 2  +
dataMap2.get(articleId).getClass());
MapString, String articleMap1 = (MapString, String)
dataMap1.get(articleId);
MapString, String articleMap2 = (MapString, String)
dataMap2.get(articleId);

if (articleMap1 == null || articleMap1.isEmpty()) {
System.out.println(returning because map 1 null

Re: reduceByKey not working on JavaPairDStream

2015-08-26 Thread Sean Owen
I don't see that you invoke any action in this code. It won't do
anything unless you tell it to perform an action that requires the
transformations.

On Wed, Aug 26, 2015 at 7:05 AM, Deepesh Maheshwari
deepesh.maheshwar...@gmail.com wrote:
 Hi,
 I have applied mapToPair and then a reduceByKey on a DStream to obtain a
 JavaPairDStreamString, MapString, Object.
 I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained
 above.
 But i do not see any logs from reduceByKey operation.
 Can anyone explain why is this happening..?


 find My Code Below -

  /***
  * GroupLevel1 Groups - articleId, host and tags
  */
 JavaPairDStreamString, MapString, Object groupLevel1 =
 inputDataMap

 .mapToPair(
 new PairFunctionMapString, Object, String,
 MapString, Object() {

 private static final long serialVersionUID =
 5196132687044875422L;

 @Override
 public Tuple2String, MapString, Object call(
 MapString, Object map) throws
 Exception {
 String host = (String) map.get(host);
 String articleId = (String)
 map.get(articleId);
 List tags = (List) map.get(tags);

 if (host == null || articleId == null) {
 logger.error(*** Error Doc
 \n + map);
 }
 String key = articleId_ + articleId +
 _host_ + host + _tags_ + tags.toString();

 //logger.info(key);
 System.out.println(Printing Key -  + key);
 map.put(articlecount, 1L);

 return new Tuple2String, MapString,
 Object(key, map);
 }
 })
 .reduceByKey(
 new Function2MapString, Object, MapString,
 Object, MapString, Object() {

 private static final long serialVersionUID = 1L;

 @Override
 public MapString, Object call(
 MapString, Object map1,
 MapString, Object map2) throws
 Exception {
 Long count1 = (Long)
 map1.get(articlecount);
 Long count2 = (Long)
 map2.get(articlecount);

 map1.put(articlecount, count1 + count2);
 return map1;
 }
 });





 /***
  * Grouping level 1 groups on articleId+host+tags
  * Tags can be multiple for an article.
  * Grouping level 2 does -
  *  1. For each tag in a row, find occurrence of that tag in other
 rows.
  *  2. If one tag found in another row, then add the articleCount of
 current and new row and put as articleCount for that tag.
  *  Note -
  *  Idea behind this grouping is to get all article counts that
 contain a particular tag and preserve this value.
  */


 JavaPairDStreamString, MapString, Object groupLevel2 =
 groupLevel1.flatMapToPair(new PairFlatMapFunctionTuple2String, MapString,
 Object, String, MapString, Object() {
 @Override
 public IterableTuple2String, MapString, Object
 call(Tuple2String, MapString, Object stringMapTuple2) throws Exception {
 System.out.println(group level 2 tuple 1 - +
 stringMapTuple2._1());
 System.out.println(group level 2 tuple 2 - +
 stringMapTuple2._2());
 ArrayListString tagList = (ArrayListString)
 stringMapTuple2._2().get(tags);
 ArrayList tagKeyList = new ArrayList();
 String host = (String) stringMapTuple2._2().get(host);
 StringBuilder key;
 for (String tag : tagList) {
 key = new
 StringBuilder(host_).append(host).append(_tag_).append(tag);
 System.out.println(generated Key - +key);
 tagKeyList.add(new Tuple2String, MapString,
 Object(key.toString(), stringMapTuple2._2()));
 }
 return tagKeyList;
 }
 });

 groupLevel2 = groupLevel2.reduceByKey(new Function2MapString,
 Object, MapString, Object, MapString, Object() {
 @Override
 public MapString, Object call(MapString, Object dataMap1,
 MapString, Object dataMap2) throws Exception {
 System.out.println(Type of article map in 1  +
 dataMap1.get(articleId).getClass());
 System.out.println(Type of article map in 2  +
 dataMap2.get(articleId).getClass

Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-24 Thread satish chandra j
HI All,

Please find fix info for users who are following the mail chain of this
issue and the respective solution below:

*reduceByKey: Non working snippet*

import org.apache.spark.Context
import org.apache.spark.Context._
import org.apache.spark.SparkConf
val conf = new SparkConf()
val sc = new SparkContext(conf)

val DataRDD =  SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4)))
DataRDD.reduceByKey(_+_).collect

Result: Array() is empty

*reduceByKey: Working snippet*

import org.apache.spark.Context
import org.apache.spark.Context._
import org.apache.spark.SparkConf
val conf = new SparkConf()
val sc = new
SparkContext(conf).set(spark.driver.allowMultipleContexts,true)

val DataRDD =  SC.makeRDD(Seq((0,1),(0,2),(1,2),(1,3),(2,4)))
DataRDD.reduceByKey(_+_).collect

Result: Array((0,3),(1,5),(2,4))

Regards,
Satish Chandra


On Sat, Aug 22, 2015 at 11:27 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 Currently using DSE 4.7 and Spark 1.2.2 version

 Regards,
 Satish

 On Fri, Aug 21, 2015 at 7:30 PM, java8964 java8...@hotmail.com wrote:

 What version of Spark you are using, or comes with DSE 4.7?

 We just cannot reproduce it in Spark.

 yzhang@localhost$ more test.spark
 val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs.reduceByKey((x,y) = x + y).collect
 yzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.spark
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.3.1
   /_/

 Using Scala version 2.10.4
 Spark context available as sc.
 SQL context available as sqlContext.
 Loading test.spark...
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at
 makeRDD at console:21
 15/08/21 09:58:51 WARN SizeEstimator: Failed to check whether
 UseCompressedOops is set; assuming yes
 res0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Yong


 --
 Date: Fri, 21 Aug 2015 19:24:09 +0530
 Subject: Re: Transformation not happening for reduceByKey or GroupByKey
 From: jsatishchan...@gmail.com
 To: abhis...@tetrationanalytics.com
 CC: user@spark.apache.org


 HI Abhishek,

 I have even tried that but rdd2 is empty

 Regards,
 Satish

 On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
 abhis...@tetrationanalytics.com wrote:

 You had:

  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)

 Maybe try:

  rdd2 = RDD.reduceByKey((x,y) = x+y)
  rdd2.take(3)

 -Abhishek-

 On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

  HI All,
  I have data in RDD as mentioned below:
 
  RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
  I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key
 
  Code:
  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)
 
  Result in console:
  RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
  res:Array[(Int,Int)] = Array()
 
  Command as mentioned
 
  dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
  Please let me know what is missing in my code, as my resultant Array is
 empty
 
 
 
  Regards,
  Satish
 






Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-22 Thread satish chandra j
HI All,
Currently using DSE 4.7 and Spark 1.2.2 version

Regards,
Satish

On Fri, Aug 21, 2015 at 7:30 PM, java8964 java8...@hotmail.com wrote:

 What version of Spark you are using, or comes with DSE 4.7?

 We just cannot reproduce it in Spark.

 yzhang@localhost$ more test.spark
 val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs.reduceByKey((x,y) = x + y).collect
 yzhang@localhost$ ~/spark/bin/spark-shell --master local -i test.spark
 Welcome to
     __
  / __/__  ___ _/ /__
 _\ \/ _ \/ _ `/ __/  '_/
/___/ .__/\_,_/_/ /_/\_\   version 1.3.1
   /_/

 Using Scala version 2.10.4
 Spark context available as sc.
 SQL context available as sqlContext.
 Loading test.spark...
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[0] at
 makeRDD at console:21
 15/08/21 09:58:51 WARN SizeEstimator: Failed to check whether
 UseCompressedOops is set; assuming yes
 res0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Yong


 --
 Date: Fri, 21 Aug 2015 19:24:09 +0530
 Subject: Re: Transformation not happening for reduceByKey or GroupByKey
 From: jsatishchan...@gmail.com
 To: abhis...@tetrationanalytics.com
 CC: user@spark.apache.org


 HI Abhishek,

 I have even tried that but rdd2 is empty

 Regards,
 Satish

 On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
 abhis...@tetrationanalytics.com wrote:

 You had:

  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)

 Maybe try:

  rdd2 = RDD.reduceByKey((x,y) = x+y)
  rdd2.take(3)

 -Abhishek-

 On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

  HI All,
  I have data in RDD as mentioned below:
 
  RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
  I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key
 
  Code:
  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)
 
  Result in console:
  RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
  res:Array[(Int,Int)] = Array()
 
  Command as mentioned
 
  dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
  Please let me know what is missing in my code, as my resultant Array is
 empty
 
 
 
  Regards,
  Satish
 





RE: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread java8964
I believe spark-shell -i scriptFile is there. We also use it, at least in 
Spark 1.3.1.
dse spark will just wrap spark-shell command, underline it is just invoking 
spark-shell.
I don't know too much about the original problem though.
Yong
Date: Fri, 21 Aug 2015 18:19:49 +0800
Subject: Re: Transformation not happening for reduceByKey or GroupByKey
From: zjf...@gmail.com
To: jsatishchan...@gmail.com
CC: robin.e...@xense.co.uk; user@spark.apache.org

Hi Satish,
I don't see where spark support -i, so suspect it is provided by DSE. In that 
case, it might be bug of DSE.


On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com 
wrote:
HI Robin,Yes, it is DSE but issue is related to Spark only
Regards,Satish Chandra
On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote:
Not sure, never used dse - it’s part of DataStax Enterprise right?
On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com wrote:
HI Robin,Yes, below mentioned piece or code works fine in Spark Shell but the 
same when place in Script File and executed with -i file name it creating an 
empty RDD
scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: 
org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at 
console:28

scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = 
Array((0,3), (1,50), (2,40))
Command:
dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile

I understand, I am missing something here due to which my final RDD does not 
have as required output
Regards,Satish Chandra
On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote:
This works for me:
scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs: 
org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at makeRDD at 
console:28

scala pairs.reduceByKey((x,y) = x + y).collectres43: Array[(Int, Int)] = 
Array((0,3), (1,50), (2,40))
On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com wrote:
HI All,I have data in RDD as mentioned below:
RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))

I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on 
Values for each key
Code:RDD.reduceByKey((x,y) = x+y)RDD.take(3)
Result in console:
RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at 
console:73res:Array[(Int,Int)] = Array()
Command as mentioned

dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile

Please let me know what is missing in my code, as my resultant Array is empty



Regards,Satish









-- 
Best Regards

Jeff Zhang
  

Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI All,
Any inputs for the actual problem statement

Regards,
Satish


On Fri, Aug 21, 2015 at 5:57 PM, Jeff Zhang zjf...@gmail.com wrote:

 Yong, Thanks for your reply.

 I tried spark-shell -i script-file, it works fine for me. Not sure the
 different with
 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile

 On Fri, Aug 21, 2015 at 7:01 PM, java8964 java8...@hotmail.com wrote:

 I believe spark-shell -i scriptFile is there. We also use it, at least
 in Spark 1.3.1.

 dse spark will just wrap spark-shell command, underline it is just
 invoking spark-shell.

 I don't know too much about the original problem though.

 Yong

 --
 Date: Fri, 21 Aug 2015 18:19:49 +0800
 Subject: Re: Transformation not happening for reduceByKey or GroupByKey
 From: zjf...@gmail.com
 To: jsatishchan...@gmail.com
 CC: robin.e...@xense.co.uk; user@spark.apache.org


 Hi Satish,

 I don't see where spark support -i, so suspect it is provided by DSE.
 In that case, it might be bug of DSE.



 On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j 
 jsatishchan...@gmail.com wrote:

 HI Robin,
 Yes, it is DSE but issue is related to Spark only

 Regards,
 Satish Chandra

 On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk
 wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish








 --
 Best Regards

 Jeff Zhang




 --
 Best Regards

 Jeff Zhang



Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread Abhishek R. Singh
You had:

 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

Maybe try:

 rdd2 = RDD.reduceByKey((x,y) = x+y)
 rdd2.take(3)

-Abhishek-

On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote:

 HI All,
 I have data in RDD as mentioned below:
 
 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on 
 Values for each key
 
 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)
 
 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at 
 console:73
 res:Array[(Int,Int)] = Array()
 
 Command as mentioned
 
 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
 Please let me know what is missing in my code, as my resultant Array is empty
 
 
 
 Regards,
 Satish
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI Abhishek,

I have even tried that but rdd2 is empty

Regards,
Satish

On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
abhis...@tetrationanalytics.com wrote:

 You had:

  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)

 Maybe try:

  rdd2 = RDD.reduceByKey((x,y) = x+y)
  rdd2.take(3)

 -Abhishek-

 On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com
 wrote:

  HI All,
  I have data in RDD as mentioned below:
 
  RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))
 
 
  I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key
 
  Code:
  RDD.reduceByKey((x,y) = x+y)
  RDD.take(3)
 
  Result in console:
  RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
  res:Array[(Int,Int)] = Array()
 
  Command as mentioned
 
  dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile
 
 
  Please let me know what is missing in my code, as my resultant Array is
 empty
 
 
 
  Regards,
  Satish
 




RE: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread java8964
What version of Spark you are using, or comes with DSE 4.7?
We just cannot reproduce it in Spark.
yzhang@localhost$ more test.sparkval pairs = 
sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))pairs.reduceByKey((x,y) = x + 
y).collectyzhang@localhost$ ~/spark/bin/spark-shell --master local -i 
test.sparkWelcome to    __ / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/   /___/ .__/\_,_/_/ /_/\_\   version 1.3.1  /_/
Using Scala version 2.10.4Spark context available as sc.SQL context available 
as sqlContext.Loading test.spark...pairs: org.apache.spark.rdd.RDD[(Int, Int)] 
= ParallelCollectionRDD[0] at makeRDD at console:2115/08/21 09:58:51 WARN 
SizeEstimator: Failed to check whether UseCompressedOops is set; assuming 
yesres0: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))
Yong

Date: Fri, 21 Aug 2015 19:24:09 +0530
Subject: Re: Transformation not happening for reduceByKey or GroupByKey
From: jsatishchan...@gmail.com
To: abhis...@tetrationanalytics.com
CC: user@spark.apache.org

HI Abhishek,
I have even tried that but rdd2 is empty
Regards,Satish
On Fri, Aug 21, 2015 at 6:47 PM, Abhishek R. Singh 
abhis...@tetrationanalytics.com wrote:
You had:



 RDD.reduceByKey((x,y) = x+y)

 RDD.take(3)



Maybe try:



 rdd2 = RDD.reduceByKey((x,y) = x+y)

 rdd2.take(3)



-Abhishek-



On Aug 20, 2015, at 3:05 AM, satish chandra j jsatishchan...@gmail.com wrote:



 HI All,

 I have data in RDD as mentioned below:



 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))





 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on 
 Values for each key



 Code:

 RDD.reduceByKey((x,y) = x+y)

 RDD.take(3)



 Result in console:

 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at 
 console:73

 res:Array[(Int,Int)] = Array()



 Command as mentioned



 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile





 Please let me know what is missing in my code, as my resultant Array is empty







 Regards,

 Satish






  

Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
Yes, DSE 4.7

Regards,
Satish Chandra

On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
 makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish







Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI Robin,
Yes, it is DSE but issue is related to Spark only

Regards,
Satish Chandra

On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
 makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish







Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread Jeff Zhang
Hi Satish,

I don't see where spark support -i, so suspect it is provided by DSE. In
that case, it might be bug of DSE.



On Fri, Aug 21, 2015 at 6:02 PM, satish chandra j jsatishchan...@gmail.com
wrote:

 HI Robin,
 Yes, it is DSE but issue is related to Spark only

 Regards,
 Satish Chandra

 On Fri, Aug 21, 2015 at 3:06 PM, Robin East robin.e...@xense.co.uk
 wrote:

 Not sure, never used dse - it’s part of DataStax Enterprise right?

 On 21 Aug 2015, at 10:07, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI Robin,
 Yes, below mentioned piece or code works fine in Spark Shell but the same
 when place in Script File and executed with -i file name it creating an
 empty RDD

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 Command:

 dse spark --master local --jars postgresql-9.4-1201.jar -i
  ScriptFile

 I understand, I am missing something here due to which my final RDD does
 not have as required output

 Regards,
 Satish Chandra

 On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk
 wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77]
 at makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function
 on Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey
 at console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish








-- 
Best Regards

Jeff Zhang


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-21 Thread satish chandra j
HI Robin,
Yes, below mentioned piece or code works fine in Spark Shell but the same
when place in Script File and executed with -i file name it creating an
empty RDD

scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
makeRDD at console:28


scala pairs.reduceByKey((x,y) = x + y).collect
res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

Command:

dse spark --master local --jars postgresql-9.4-1201.jar -i
 ScriptFile

I understand, I am missing something here due to which my final RDD does
not have as required output

Regards,
Satish Chandra

On Thu, Aug 20, 2015 at 8:23 PM, Robin East robin.e...@xense.co.uk wrote:

 This works for me:

 scala val pairs = sc.makeRDD(Seq((0,1),(0,2),(1,20),(1,30),(2,40)))
 pairs: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[77] at
 makeRDD at console:28


 scala pairs.reduceByKey((x,y) = x + y).collect
 res43: Array[(Int, Int)] = Array((0,3), (1,50), (2,40))

 On 20 Aug 2015, at 11:05, satish chandra j jsatishchan...@gmail.com
 wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
 Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
 console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish





Transformation not happening for reduceByKey or GroupByKey

2015-08-20 Thread satish chandra j
HI All,
I have data in RDD as mentioned below:

RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
Values for each key

Code:
RDD.reduceByKey((x,y) = x+y)
RDD.take(3)

Result in console:
RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
console:73
res:Array[(Int,Int)] = Array()

Command as mentioned

dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


Please let me know what is missing in my code, as my resultant Array is
empty



Regards,
Satish


Re: Transformation not happening for reduceByKey or GroupByKey

2015-08-20 Thread satish chandra j
HI All,
Could anybody let me know what is that i missing here, it should work as
its a basic transformation

Please let me know if any additional information required

Regards,
Satish

On Thu, Aug 20, 2015 at 3:35 PM, satish chandra j jsatishchan...@gmail.com
wrote:

 HI All,
 I have data in RDD as mentioned below:

 RDD : Array[(Int),(Int)] = Array((0,1), (0,2),(1,20),(1,30),(2,40))


 I am expecting output as Array((0,3),(1,50),(2,40)) just a sum function on
 Values for each key

 Code:
 RDD.reduceByKey((x,y) = x+y)
 RDD.take(3)

 Result in console:
 RDD: org.apache.spark.rdd.RDD[(Int,Int)]= ShuffledRDD[1] at reduceByKey at
 console:73
 res:Array[(Int,Int)] = Array()

 Command as mentioned

 dse spark --master local --jars postgresql-9.4-1201.jar -i  ScriptFile


 Please let me know what is missing in my code, as my resultant Array is
 empty



 Regards,
 Satish




Re: Apache Spark : Custom function for reduceByKey - missing arguments for method

2015-07-10 Thread Richard Marscher
Did you try it by adding the `_` after the method names to partially apply
them? Scala is saying that its trying to immediately apply those methods
but can't find arguments.  But you instead are trying to pass them along as
functions (which they aren't). Here is a link to a stackoverflow answer
that should help clarify: http://stackoverflow.com/a/19720808/72401. I
think there are two solutions, turn the getMax and getMin into functions by
using val ala:

val getMax: (DoubleDimension, DoubleDimension) = DoubleDimension = { (a,b)
=
  if (a  b) a
  else b
}

val getMin: (DoubleDimension, DoubleDimension) = DoubleDimension = { (a,b)
=
  if (a  b) a
  else b
}

or just partially apply them:

maxVector = attribMap.reduceByKey( getMax _)
minVector = attribMap.reduceByKey( getMin _)

On Thu, Jul 9, 2015 at 9:09 PM, ameyamm ameya.malond...@outlook.com wrote:

 I am trying to normalize a dataset (convert values for all attributes in
 the
 vector to 0-1 range). I created an RDD of tuple (attrib-name,
 attrib-value) for all the records in the dataset as follows:

 val attribMap : RDD[(String,DoubleDimension)] = contactDataset.flatMap(
   contact = {
 List(
   (dage,contact.dage match { case Some(value)
 = DoubleDimension(value) ; case None = null }),
   (dancstry1,contact.dancstry1 match { case
 Some(value) = DoubleDimension(value) ; case None = null }),
   (dancstry2,contact.dancstry2 match { case
 Some(value) = DoubleDimension(value) ; case None = null }),
   (ddepart,contact.ddepart match { case
 Some(value) = DoubleDimension(value) ; case None = null }),
   (dhispanic,contact.dhispanic match { case
 Some(value) = DoubleDimension(value) ; case None = null }),
   (dhour89,contact.dhour89 match { case
 Some(value) = DoubleDimension(value) ; case None = null })
 )
   }
 )

 Here, contactDataset is of the type RDD[Contact]. The fields of Contact
 class are of type Option[Long].

 DoubleDimension is a simple wrapper over Double datatype. It extends the
 Ordered trait and implements corresponding compare method and equals
 method.

 To obtain the max and min attribute vector for computing the normalized
 values,

 maxVector = attribMap.reduceByKey( getMax )
 minVector = attribMap.reduceByKey( getMin )

 Implementation of getMax and getMin is as follows:

 def getMax( a : DoubleDimension, b : DoubleDimension ) : DoubleDimension =
 {
 if (a  b) a
 else b
 }

 def getMin( a : DoubleDimension, b : DoubleDimension) : DoubleDimension = {
 if (a  b) a
 else b
 }

 I get a compile error at calls to the methods getMax and getMin stating:

 [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:117: error:
 missing arguments for method getMax in class DatasetReader;

 [ERROR] follow this method with '_' if you want to treat it as a partially
 applied function

 [ERROR] maxVector = attribMap.reduceByKey( getMax )

 [ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:118: error:
 missing arguments for method getMin in class DatasetReader;

 [ERROR] follow this method with '_' if you want to treat it as a partially
 applied function

 [ERROR] minVector = attribMap.reduceByKey( getMin )

 I am not sure what I am doing wrong here. My RDD is an RDD of Pairs and as
 per my knowledge, I can pass any method to it as long as the functions is
 of
 the type f : (V, V) = V.

 I am really stuck here. Please help.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Custom-function-for-reduceByKey-missing-arguments-for-method-tp23756.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com http://localytics.com/ | Our Blog
http://localytics.com/blog | Twitter http://twitter.com/localytics |
Facebook http://facebook.com/localytics | LinkedIn
http://www.linkedin.com/company/1148792?trk=tyah


Apache Spark : Custom function for reduceByKey - missing arguments for method

2015-07-09 Thread ameyamm
I am trying to normalize a dataset (convert values for all attributes in the
vector to 0-1 range). I created an RDD of tuple (attrib-name,
attrib-value) for all the records in the dataset as follows:

val attribMap : RDD[(String,DoubleDimension)] = contactDataset.flatMap( 
  contact = { 
List(
  (dage,contact.dage match { case Some(value)
= DoubleDimension(value) ; case None = null }),
  (dancstry1,contact.dancstry1 match { case
Some(value) = DoubleDimension(value) ; case None = null }),
  (dancstry2,contact.dancstry2 match { case
Some(value) = DoubleDimension(value) ; case None = null }),
  (ddepart,contact.ddepart match { case
Some(value) = DoubleDimension(value) ; case None = null }),
  (dhispanic,contact.dhispanic match { case
Some(value) = DoubleDimension(value) ; case None = null }),
  (dhour89,contact.dhour89 match { case
Some(value) = DoubleDimension(value) ; case None = null })
)
  }
)

Here, contactDataset is of the type RDD[Contact]. The fields of Contact
class are of type Option[Long].

DoubleDimension is a simple wrapper over Double datatype. It extends the
Ordered trait and implements corresponding compare method and equals method.

To obtain the max and min attribute vector for computing the normalized
values,

maxVector = attribMap.reduceByKey( getMax )
minVector = attribMap.reduceByKey( getMin )

Implementation of getMax and getMin is as follows:

def getMax( a : DoubleDimension, b : DoubleDimension ) : DoubleDimension = {
if (a  b) a 
else b 
}

def getMin( a : DoubleDimension, b : DoubleDimension) : DoubleDimension = {
if (a  b) a 
else b 
}

I get a compile error at calls to the methods getMax and getMin stating:

[ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:117: error:
missing arguments for method getMax in class DatasetReader;

[ERROR] follow this method with '_' if you want to treat it as a partially
applied function

[ERROR] maxVector = attribMap.reduceByKey( getMax )

[ERROR] .../com/ameyamm/input_generator/DatasetReader.scala:118: error:
missing arguments for method getMin in class DatasetReader;

[ERROR] follow this method with '_' if you want to treat it as a partially
applied function

[ERROR] minVector = attribMap.reduceByKey( getMin )

I am not sure what I am doing wrong here. My RDD is an RDD of Pairs and as
per my knowledge, I can pass any method to it as long as the functions is of
the type f : (V, V) = V.

I am really stuck here. Please help.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-Custom-function-for-reduceByKey-missing-arguments-for-method-tp23756.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: run reduceByKey on huge data in spark

2015-06-30 Thread barge.nilesh
I 'm using 50 servers , 35 executors per server, 140GB memory per server

35 executors *per server* sounds kind of odd to me.

With 35 executors per server and server having 140gb, meaning each executor
is going to get only 4gb, 4gb will be divided in to shuffle/storage memory
fractions... assuming storage memory fraction=0.6 as default then 2.4gb
working space for each executor, so if any of the partition size (key group
size) exceeds 2.4gb there will be OOM...

May be you can try with the less number of executors per server/node...






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546p23555.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



run reduceByKey on huge data in spark

2015-06-30 Thread hotdog
I'm running reduceByKey in spark. My program is the simplest example of
spark:

val counts = textFile.flatMap(line = line.split( )).repartition(2).
 .map(word = (word, 1))
 .reduceByKey(_ + _, 1)
counts.saveAsTextFile(hdfs://...)
but it always run out of memory...

I 'm using 50 servers , 35 executors per server, 140GB memory per server.

the documents volume is : 8TB documents, 20 billion documents, 1000 billion
words in total. and the words after reduce will be about 100 million.

I wonder how to set the configuration of spark?

I wonder what value should these parameters be?

1. the number of the maps ? 2 for example?
2. the number of the reduces ? 1 for example?
3. others parameters?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: run reduceByKey on huge data in spark

2015-06-30 Thread lisendong
hello, I ‘m using spark 1.4.2-SNAPSHOT
I ‘m running in yarn mode:-)

I wonder if the spark.shuffle.memoryFraction or spark.shuffle.manager work?
how to set these parameters...
 在 2015年7月1日,上午1:32,Ted Yu yuzhih...@gmail.com 写道:
 
 Which Spark release are you using ?
 
 Are you running in standalone mode ?
 
 Cheers
 
 On Tue, Jun 30, 2015 at 10:03 AM, hotdog lisend...@163.com 
 mailto:lisend...@163.com wrote:
 I'm running reduceByKey in spark. My program is the simplest example of
 spark:
 
 val counts = textFile.flatMap(line = line.split( )).repartition(2).
  .map(word = (word, 1))
  .reduceByKey(_ + _, 1)
 counts.saveAsTextFile(hdfs://...)
 but it always run out of memory...
 
 I 'm using 50 servers , 35 executors per server, 140GB memory per server.
 
 the documents volume is : 8TB documents, 20 billion documents, 1000 billion
 words in total. and the words after reduce will be about 100 million.
 
 I wonder how to set the configuration of spark?
 
 I wonder what value should these parameters be?
 
 1. the number of the maps ? 2 for example?
 2. the number of the reduces ? 1 for example?
 3. others parameters?
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546.html
  
 http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
 mailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org 
 mailto:user-h...@spark.apache.org
 
 



Re: run reduceByKey on huge data in spark

2015-06-30 Thread Ted Yu
Which Spark release are you using ?

Are you running in standalone mode ?

Cheers

On Tue, Jun 30, 2015 at 10:03 AM, hotdog lisend...@163.com wrote:

 I'm running reduceByKey in spark. My program is the simplest example of
 spark:

 val counts = textFile.flatMap(line = line.split( )).repartition(2).
  .map(word = (word, 1))
  .reduceByKey(_ + _, 1)
 counts.saveAsTextFile(hdfs://...)
 but it always run out of memory...

 I 'm using 50 servers , 35 executors per server, 140GB memory per server.

 the documents volume is : 8TB documents, 20 billion documents, 1000 billion
 words in total. and the words after reduce will be about 100 million.

 I wonder how to set the configuration of spark?

 I wonder what value should these parameters be?

 1. the number of the maps ? 2 for example?
 2. the number of the reduces ? 1 for example?
 3. others parameters?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/run-reduceByKey-on-huge-data-in-spark-tp23546.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




reduceByKey - add values to a list

2015-06-25 Thread Kannappan Sirchabesan
Hi,
  I am trying to see what is the best way to reduce the values of a RDD of 
(key,value) pairs into (key,ListOfValues) pair. I know various ways of 
achieving this, but I am looking for a efficient, elegant one-liner if there is 
one. 

Example:
Input RDD: (USA, California), (UK, Yorkshire), (USA, Colorado)
Output RDD: (USA, [California, Colorado]), (UK, Yorkshire)

Is it possible to use reduceByKey or foldByKey to achieve this, instead of 
groupBykey. 

Something equivalent to a cons operator from LISP?, so that I could just say 
reduceBykey(lambda x,y:  (cons x y) ). May be it is more a python question than 
a spark question of how to create a list from 2 elements without a starting 
empty list?

Thanks,
Kannappan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



  1   2   3   >