Re: How to get the HDFS path for each RDD

2015-09-26 Thread Fengdong Yu
Hi Anchit,

this is not my expected, because you specified the HDFS directory in your code.
I've solved like this:

   val text = sc.hadoopFile(Args.input,
   classOf[TextInputFormat], classOf[LongWritable], 
classOf[Text], 2)
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]

  hadoopRdd.mapPartitionsWithInputSplit((inputSplit, iterator) => {
  val file = inputSplit.asInstanceOf[FileSplit]
  terator.map ( tp => {tp._1, new Text(file.toString + “,” + 
tp._2.toString)})
  }




> On Sep 25, 2015, at 13:12, Anchit Choudhry  wrote:
> 
> Hi Fengdong,
> 
> So I created two files in HDFS under a test folder.
> 
> test/dt=20100101.json
> { "key1" : "value1" }
> 
> test/dt=20100102.json
> { "key2" : "value2" }
> 
> Then inside PySpark shell
> 
> rdd = sc.wholeTextFiles('./test/*')
> rdd.collect()
> [(u'hdfs://localhost:9000/user/hduser/test/dt=20100101.json', u'{ "key1" : 
> "value1" }), (u'hdfs://localhost:9000/user/hduser/test/dt=20100102.json', u'{ 
> "key2" : "value2" })]
> import json
> def editMe(y, x):
>   j = json.loads(y)
>   j['source'] = x
>   return j
> 
> rdd.map(lambda (x,y): editMe(y,x)).collect()
> [{'source': u'hdfs://localhost:9000/user/hduser/test/dt=20100101.json', 
> u'key1': u'value1'}, {u'key2': u'value2', 'source': 
> u'hdfs://localhost:9000/user/hduser/test/dt=20100102.json'}]
> 
> Similarly you could modify the function to return 'source' and 'date' with 
> some string manipulation per your requirements.
> 
> Let me know if this helps.
> 
> Thanks,
> Anchit
> 
> 
> On 24 September 2015 at 23:55, Fengdong Yu  > wrote:
> 
> yes. such as I have two data sets:
> 
> date set A: /data/test1/dt=20100101
> data set B: /data/test2/dt=20100202
> 
> 
> all data has the same JSON format , such as:
> {“key1” : “value1”, “key2” : “value2” }
> 
> 
> my output expected:
> {“key1” : “value1”, “key2” : “value2” , “source” : “test1”, “date” : 
> “20100101"}
> {“key1” : “value1”, “key2” : “value2” , “source” : “test2”, “date” : 
> “20100202"}
> 
> 
>> On Sep 25, 2015, at 11:52, Anchit Choudhry > > wrote:
>> 
>> Sure. May I ask for a sample input(could be just few lines) and the output 
>> you are expecting to bring clarity to my thoughts?
>> 
>> On Thu, Sep 24, 2015, 23:44 Fengdong Yu > > wrote:
>> Hi Anchit, 
>> 
>> Thanks for the quick answer.
>> 
>> my exact question is : I want to add HDFS location into each line in my JSON 
>>  data.
>> 
>> 
>> 
>>> On Sep 25, 2015, at 11:25, Anchit Choudhry >> > wrote:
>>> 
>>> Hi Fengdong,
>>> 
>>> Thanks for your question.
>>> 
>>> Spark already has a function called wholeTextFiles within sparkContext 
>>> which can help you with that:
>>> 
>>> Python
>>> hdfs://a-hdfs-path/part-0
>>> hdfs://a-hdfs-path/part-1
>>> ...
>>> hdfs://a-hdfs-path/part-n
>>> rdd = sparkContext.wholeTextFiles(“hdfs://a- <>hdfs-path”)
>>> (a-hdfs-path/part-0, its content)
>>> (a-hdfs-path/part-1, its content)
>>> ...
>>> (a-hdfs-path/part-n, its content)
>>> More info: http://spark 
>>> .apache.org/docs/latest/api/python/pyspark.html?highlight=wholetext#pyspark.SparkContext.wholeTextFiles
>>> 
>>> 
>>> 
>>> Scala
>>> 
>>> val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")
>>> 
>>> More info: 
>>> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext@wholeTextFiles(String,Int):RDD[(String,String)]
>>>  
>>> Let us know if this helps or you need more help.
>>> 
>>> Thanks,
>>> Anchit Choudhry
>>> 
>>> On 24 September 2015 at 23:12, Fengdong Yu >> > wrote:
>>> Hi,
>>> 
>>> I have  multiple files with JSON format, such as:
>>> 
>>> /data/test1_data/sub100/test.data
>>> /data/test2_data/sub200/test.data
>>> 
>>> 
>>> I can sc.textFile(“/data/*/*”)
>>> 
>>> but I want to add the {“source” : “HDFS_LOCATION”} to each line, then save 
>>> it the one target HDFS location.
>>> 
>>> how to do it, Thanks.
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> -
>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org 
>>> 
>>> For additional commands, e-mail: dev-h...@spark.apache.org 
>>> 
>>> 
>>> 
>> 
> 
> 



Re: RFC: packaging Spark without assemblies

2015-09-26 Thread Steve Loughran

> On 25 Sep 2015, at 19:11, Marcelo Vanzin  wrote:
> 
> - People who ship the assembly with their application. As Matei
> suggested (and I agree), that is kinda weird. But currently that is
> the easiest way to embed Spark and get, for example, the YARN backend
> working. There are ways around that but they are tricky. The code
> changes I propose would make that much easier to do without the need
> for an assembly.

not wierd if you are bypassing bin/spark


> 
> - People who somehow depend on the layout of the Spark distribution.
> Meaning they expect a "lib/" directory with an assembly in there
> matching a specific file name pattern. Although I kinda consider that
> to be an invalid use case (as in "you're doing it wrong").

well, spark-submit and spark-example shells do something close to this, though 
primarly as error checking against >1 artifact and classpath confusion

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Spark Streaming with Tachyon : Data Loss on Receiver Failure due to WAL error

2015-09-26 Thread Dibyendu Bhattacharya
Hi,

Recently I was working on a PR to use Tachyon as OFF_HEAP store for Spark
Streaming and make sure Spark Streaming can recover from Driver failure and
recover the blocks form Tachyon.

The The Motivation for this PR is  :

If Streaming application stores the blocks OFF_HEAP, it may not need any
WAL like feature to recover from Driver failure. As long as the writing of
blocks to Tachyon from Streaming receiver is durable, it should be
recoverable from Tachyon directly on Driver failure.
This can solve the issue of expensive WAL write and duplicating the blocks
both in MEMORY and also WAL and also guarantee end to end No-Data-Loss
channel using OFF_HEAP store.

https://github.com/apache/spark/pull/8817

This PR still under review . But having done various fail over testing in
my environment , I see this PR worked perfectly fine without any data loss
. Let see what TD and other have to say on this PR .

Below is the configuration I used to test this PR ..


Spark : 1.6 from Master
Tachyon : 0.7.1

SparkConfiguration Details :

SparkConf conf = new SparkConf().setAppName("TestTachyon")
.set("spark.streaming.unpersist", "true")
.set("spark.local.dir", "/mnt1/spark/tincan")
.set("tachyon.zookeeper.address","10.252.5.113:2182")
.set("tachyon.usezookeeper","true")
.set("spark.externalBlockStore.url", "tachyon-ft://
ip-10-252-5-113.asskickery.us:19998")
.set("spark.externalBlockStore.baseDir", "/sparkstreaming")
.set("spark.externalBlockStore.folderName","pearson")
.set("spark.externalBlockStore.dirId", "subpub")

.set("spark.externalBlockStore.keepExternalBlockStoreDirOnShutdown","true");

JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(
1));

String checkpointDirectory = "hdfs://10.252.5.113:9000/user/hadoop/spark/wal
";

jsc.checkpoint(checkpointDirectory);


//I am using the My Receiver Based Consumer (
https://github.com/dibbhatt/kafka-spark-consumer) . But
KafkaUtil.CreateStream will also work

JavaDStream unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.OFF_HEAP());




Regards,
Dibyendu

On Sat, Sep 26, 2015 at 11:59 AM, N B  wrote:

> Hi Dibyendu,
>
> How does one go about configuring spark streaming to use tachyon as its
> place for storing checkpoints? Also, can one do this with tachyon running
> on a completely different node than where spark processes are running?
>
> Thanks
> Nikunj
>
>
> On Thu, May 21, 2015 at 8:35 PM, Dibyendu Bhattacharya <
> dibyendu.bhattach...@gmail.com> wrote:
>
>> Hi Tathagata,
>>
>> Thanks for looking into this. Further investigating I found that the
>> issue is with Tachyon does not support File Append. The streaming receiver
>> which writes to WAL when failed, and again restarted, not able to append to
>> same WAL file after restart.
>>
>> I raised this with Tachyon user group, and Haoyuan told that within 3
>> months time Tachyon file append will be ready. Will revisit this issue
>> again then .
>>
>> Regards,
>> Dibyendu
>>
>>
>> On Fri, May 22, 2015 at 12:24 AM, Tathagata Das 
>> wrote:
>>
>>> Looks like somehow the file size reported by the FSInputDStream of
>>> Tachyon's FileSystem interface, is returning zero.
>>>
>>> On Mon, May 11, 2015 at 4:38 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
 Just to follow up this thread further .

 I was doing some fault tolerant testing of Spark Streaming with Tachyon
 as OFF_HEAP block store. As I said in earlier email, I could able to solve
 the BlockNotFound exception when I used Hierarchical Storage of
 Tachyon ,  which is good.

 I continue doing some testing around storing the Spark Streaming WAL
 and CheckPoint files also in Tachyon . Here is few finding ..


 When I store the Spark Streaming Checkpoint location in Tachyon , the
 throughput is much higher . I tested the Driver and Receiver failure cases
 , and Spark Streaming is able to recover without any Data Loss on Driver
 failure.

 *But on Receiver failure , Spark Streaming looses data* as I see
 Exception while reading the WAL file from Tachyon "receivedData" location
  for the same Receiver id which just failed.

 If I change the Checkpoint location back to HDFS , Spark Streaming can
 recover from both Driver and Receiver failure .

 Here is the Log details when Spark Streaming receiver failed ...I
 raised a JIRA for the same issue :
 https://issues.apache.org/jira/browse/SPARK-7525



 INFO : org.apache.spark.scheduler.DAGScheduler - *Executor lost: 2
 (epoch 1)*
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Trying to
 remove executor 2 from BlockManagerMaster.
 INFO : org.apache.spark.storage.BlockManagerMasterEndpoint - Removing
 block manager BlockManagerId(2, 10.252.5.54, 45789)
 INFO : org.apache.spark.storage.BlockManagerMaster - Removed 2

Re: How to get the HDFS path for each RDD

2015-09-26 Thread Fengdong Yu
Anchit,

please ignore my inputs. you are right. Thanks.



> On Sep 26, 2015, at 17:27, Fengdong Yu  wrote:
> 
> Hi Anchit,
> 
> this is not my expected, because you specified the HDFS directory in your 
> code.
> I've solved like this:
> 
>val text = sc.hadoopFile(Args.input,
>classOf[TextInputFormat], classOf[LongWritable], 
> classOf[Text], 2)
> val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
> 
>   hadoopRdd.mapPartitionsWithInputSplit((inputSplit, iterator) => {
>   val file = inputSplit.asInstanceOf[FileSplit]
>   terator.map ( tp => {tp._1, new Text(file.toString + “,” + 
> tp._2.toString)})
>   }
> 
> 
> 
> 
>> On Sep 25, 2015, at 13:12, Anchit Choudhry > > wrote:
>> 
>> Hi Fengdong,
>> 
>> So I created two files in HDFS under a test folder.
>> 
>> test/dt=20100101.json
>> { "key1" : "value1" }
>> 
>> test/dt=20100102.json
>> { "key2" : "value2" }
>> 
>> Then inside PySpark shell
>> 
>> rdd = sc.wholeTextFiles('./test/*')
>> rdd.collect()
>> [(u'hdfs://localhost:9000/user/hduser/test/dt=20100101.json', u'{ "key1" : 
>> "value1" }), (u'hdfs://localhost:9000/user/hduser/test/dt=20100102.json', 
>> u'{ "key2" : "value2" })]
>> import json
>> def editMe(y, x):
>>   j = json.loads(y)
>>   j['source'] = x
>>   return j
>> 
>> rdd.map(lambda (x,y): editMe(y,x)).collect()
>> [{'source': u'hdfs://localhost:9000/user/hduser/test/dt=20100101.json', 
>> u'key1': u'value1'}, {u'key2': u'value2', 'source': 
>> u'hdfs://localhost:9000/user/hduser/test/dt=20100102.json'}]
>> 
>> Similarly you could modify the function to return 'source' and 'date' with 
>> some string manipulation per your requirements.
>> 
>> Let me know if this helps.
>> 
>> Thanks,
>> Anchit
>> 
>> 
>> On 24 September 2015 at 23:55, Fengdong Yu > > wrote:
>> 
>> yes. such as I have two data sets:
>> 
>> date set A: /data/test1/dt=20100101
>> data set B: /data/test2/dt=20100202
>> 
>> 
>> all data has the same JSON format , such as:
>> {“key1” : “value1”, “key2” : “value2” }
>> 
>> 
>> my output expected:
>> {“key1” : “value1”, “key2” : “value2” , “source” : “test1”, “date” : 
>> “20100101"}
>> {“key1” : “value1”, “key2” : “value2” , “source” : “test2”, “date” : 
>> “20100202"}
>> 
>> 
>>> On Sep 25, 2015, at 11:52, Anchit Choudhry >> > wrote:
>>> 
>>> Sure. May I ask for a sample input(could be just few lines) and the output 
>>> you are expecting to bring clarity to my thoughts?
>>> 
>>> On Thu, Sep 24, 2015, 23:44 Fengdong Yu >> > wrote:
>>> Hi Anchit, 
>>> 
>>> Thanks for the quick answer.
>>> 
>>> my exact question is : I want to add HDFS location into each line in my 
>>> JSON  data.
>>> 
>>> 
>>> 
 On Sep 25, 2015, at 11:25, Anchit Choudhry > wrote:
 
 Hi Fengdong,
 
 Thanks for your question.
 
 Spark already has a function called wholeTextFiles within sparkContext 
 which can help you with that:
 
 Python
 hdfs://a-hdfs-path/part-0
 hdfs://a-hdfs-path/part-1
 ...
 hdfs://a-hdfs-path/part-n
 rdd = sparkContext.wholeTextFiles(“hdfs://a- <>hdfs-path”)
 (a-hdfs-path/part-0, its content)
 (a-hdfs-path/part-1, its content)
 ...
 (a-hdfs-path/part-n, its content)
 More info: http://spark 
 .apache.org/docs/latest/api/python/pyspark.html?highlight=wholetext#pyspark.SparkContext.wholeTextFiles
 
 
 
 Scala
 
 val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")
 
 More info: 
 https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext@wholeTextFiles(String,Int):RDD[(String,String)]
  
 Let us know if this helps or you need more help.
 
 Thanks,
 Anchit Choudhry
 
 On 24 September 2015 at 23:12, Fengdong Yu > wrote:
 Hi,
 
 I have  multiple files with JSON format, such as:
 
 /data/test1_data/sub100/test.data
 /data/test2_data/sub200/test.data
 
 
 I can sc.textFile(“/data/*/*”)
 
 but I want to add the {“source” : “HDFS_LOCATION”} to each line, then save 
 it the one target HDFS location.
 
 how to do it, Thanks.
 
 
 
 
 
 
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org 
 
 For additional commands, e-mail: dev-h...@spark.apache.org 
 
 
 
>>> 
>> 
>> 

Re: RDD API patterns

2015-09-26 Thread Mike Hynes
Hello Devs,

This email concerns some timing results for a treeAggregate in
computing a (stochastic) gradient over an RDD of labelled points, as
is currently done in the MLlib optimization routine for SGD.

In SGD, the underlying RDD is downsampled by a fraction f \in (0,1],
and the subgradients over all the instances in the downsampled RDD are
aggregated to the driver as a dense vector. However, we have noticed
some unusual behaviour when f < 1: it takes the same amount of time to
compute the stochastic gradient for a stochastic minibatch as it does
for a full batch (f = 1).

Attached are two plots of the mean task timing metrics for each level
in the aggregation, which has been performed with 4 levels (level 4 is
the final level, in which the results are communicated to the driver).
16 nodes are used, and the RDD has 256 partitions. We run in (client)
standalone mode. Here, the total time for the tasks is shown (\tau)
alongside the execution time (not counting GC),
serialization/deserialization time, the GC time, and the difference
between tau and all other times, assumed to be variable
IO/communication/waiting time. The RDD in this case is a labelled
point representation of the KDD Bridge to Algebra dataset, with 20M
(sparse) instances and a problem dimension of 30M. The sparsity of the
instances is very high---each individual instance vector may have only
a hundred nonzeros. All metrics have been taken from the JSON Spark
event logs.

The plot gradient_f1.pdf shows the times for a gradient computation
with f = 1, and gradient_f-3.pdf shows the same metrics with f = 1e-3.
For other f values in {1e-1 1e-2 ... 1e-5}, the same effect is
observed.

What I would like to mention about these plots, and ask if anyone has
experience with, is the following:
1. The times are essentially identical; I would have thought that
downsampling the RDD before aggregating the subgradients would at
least reduce the execution time required, if not the
communication/serialization times.
2. The serialization time in level 4 is almost entirely from the
result serialization to the driver, and not the task deserialization.
In each level of the treeAggregation, however, the local (dense)
gradients have to be communicated between compute nodes, so I am
surprised that it takes so much longer to return the vectors to the
driver.

I initially wondered if the large IO overhead in the last stage had
anything to do with client mode vs cluster mode, since, from what I
understand, only a single core is allocated to the driver thread in
client mode. However, when running tests in the two modes, I have
previously seen no appreciable difference in the running time for
other (admittedly smaller) problems. Furthermore, I am still very
confused about why the execution time for each task is just as large
for the downsampled RDD. It seems unlikely that sampling each
partition would be as expensive as the gradient computations, even for
sparse feature vectors.

If anyone has experience working with the sampling in minibatch SGD or
has tested the scalability of the treeAggregation operation for
vectors, I'd really appreciate your thoughts.

Thanks,
Mike


gradient_f1.pdf
Description: Adobe PDF document


gradient_f-3.pdf
Description: Adobe PDF document

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Re: treeAggregate timing / SGD performance with miniBatchFraction < 1

2015-09-26 Thread Evan R. Sparks
Off the top of my head, I'm not sure, but it looks like virtually all the
extra time between each stage is accounted for with T_{io} in your plot,
which I'm guessing is time spent communicating results over the network? Is
your driver running on the master or is it on a different node? If you look
at the code for treeAggregate, the last stage uses a .reduce() for the
final combination, which happens on the driver. In this case, the size of
the gradients is O(1GB) so if you've got to go over a slow link for the
last portion this could really make a difference.

On Sat, Sep 26, 2015 at 10:20 AM, Mike Hynes <91m...@gmail.com> wrote:

> Hi Evan,
>
> (I just realized my initial email was a reply to the wrong thread; I'm
> very sorry about this).
>
> Thanks for your email, and your thoughts on the sampling. That the
> gradient computations are essentially the cost of a pass through each
> element of the partition makes sense, especially given the sparsity of
> the feature vectors.
>
> Would you have any idea why the communication time is so much larger
> in the final level of the aggregation, however? I can't immediately
> see why it should take longer to transfer the local gradient vectors
> in that level, since they are dense in every level. Furthermore, the
> driver is receiving the result of only 4 tasks, which is relatively
> small.
>
> Mike
>
>
> On 9/26/15, Evan R. Sparks  wrote:
> > Mike,
> >
> > I believe the reason you're seeing near identical performance on the
> > gradient computations is twofold
> > 1) Gradient computations for GLM models are computationally pretty cheap
> > from a FLOPs/byte read perspective. They are essentially a BLAS "gemv"
> call
> > in the dense case, which is well known to be bound by memory bandwidth on
> > modern processors. So, you're basically paying the cost of a scan of the
> > points you've sampled to do the gradient computation.
> > 2) The default sampling mechanism used by the GradientDescent optimizer
> in
> > MLlib is implemented via RDD.sample, which does reservoir sampling on
> each
> > partition. This requires a full scan of each partition at every iteration
> > to collect the samples.
> >
> > So - you're going to pay the cost of a scan to do the sampling anyway,
> and
> > the gradient computation is essentially free at this point (and can be
> > pipelined, etc.).
> >
> > It is quite possible to improve #2 by coming up with a better sampling
> > algorithm. One easy algorithm would be to assume the data is already
> > randomly shuffled (or do that once) and then use the first
> > miniBatchFraction*partitionSize records on the first iteration, the
> second
> > set on the second set on the second iteration, and so on. You could
> > protoype this algorithm pretty easily by converting your data to an
> > RDD[Array[DenseVector]] and doing some bookkeeping at each iteration.
> >
> > That said - eventually the overheads of the platform catch up to you. As
> a
> > rule of thumb I estimate about 50ms/iteration as a floor for things like
> > task serialization and other platform overheads. You've got to balance
> how
> > much computation you want to do vs. the amount of time you want to spend
> > waiting for the platform.
> >
> > - Evan
> >
> > On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <91m...@gmail.com> wrote:
> >
> >> Hello Devs,
> >>
> >> This email concerns some timing results for a treeAggregate in
> >> computing a (stochastic) gradient over an RDD of labelled points, as
> >> is currently done in the MLlib optimization routine for SGD.
> >>
> >> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1],
> >> and the subgradients over all the instances in the downsampled RDD are
> >> aggregated to the driver as a dense vector. However, we have noticed
> >> some unusual behaviour when f < 1: it takes the same amount of time to
> >> compute the stochastic gradient for a stochastic minibatch as it does
> >> for a full batch (f = 1).
> >>
> >> Attached are two plots of the mean task timing metrics for each level
> >> in the aggregation, which has been performed with 4 levels (level 4 is
> >> the final level, in which the results are communicated to the driver).
> >> 16 nodes are used, and the RDD has 256 partitions. We run in (client)
> >> standalone mode. Here, the total time for the tasks is shown (\tau)
> >> alongside the execution time (not counting GC),
> >> serialization/deserialization time, the GC time, and the difference
> >> between tau and all other times, assumed to be variable
> >> IO/communication/waiting time. The RDD in this case is a labelled
> >> point representation of the KDD Bridge to Algebra dataset, with 20M
> >> (sparse) instances and a problem dimension of 30M. The sparsity of the
> >> instances is very high---each individual instance vector may have only
> >> a hundred nonzeros. All metrics have been taken from the JSON Spark
> >> event logs.
> >>
> >> The plot gradient_f1.pdf shows the times for a gradient 

treeAggregate timing / SGD performance with miniBatchFraction < 1

2015-09-26 Thread Mike Hynes
Hi Evan,

(I just realized my initial email was a reply to the wrong thread; I'm
very sorry about this).

Thanks for your email, and your thoughts on the sampling. That the
gradient computations are essentially the cost of a pass through each
element of the partition makes sense, especially given the sparsity of
the feature vectors.

Would you have any idea why the communication time is so much larger
in the final level of the aggregation, however? I can't immediately
see why it should take longer to transfer the local gradient vectors
in that level, since they are dense in every level. Furthermore, the
driver is receiving the result of only 4 tasks, which is relatively
small.

Mike


On 9/26/15, Evan R. Sparks  wrote:
> Mike,
>
> I believe the reason you're seeing near identical performance on the
> gradient computations is twofold
> 1) Gradient computations for GLM models are computationally pretty cheap
> from a FLOPs/byte read perspective. They are essentially a BLAS "gemv" call
> in the dense case, which is well known to be bound by memory bandwidth on
> modern processors. So, you're basically paying the cost of a scan of the
> points you've sampled to do the gradient computation.
> 2) The default sampling mechanism used by the GradientDescent optimizer in
> MLlib is implemented via RDD.sample, which does reservoir sampling on each
> partition. This requires a full scan of each partition at every iteration
> to collect the samples.
>
> So - you're going to pay the cost of a scan to do the sampling anyway, and
> the gradient computation is essentially free at this point (and can be
> pipelined, etc.).
>
> It is quite possible to improve #2 by coming up with a better sampling
> algorithm. One easy algorithm would be to assume the data is already
> randomly shuffled (or do that once) and then use the first
> miniBatchFraction*partitionSize records on the first iteration, the second
> set on the second set on the second iteration, and so on. You could
> protoype this algorithm pretty easily by converting your data to an
> RDD[Array[DenseVector]] and doing some bookkeeping at each iteration.
>
> That said - eventually the overheads of the platform catch up to you. As a
> rule of thumb I estimate about 50ms/iteration as a floor for things like
> task serialization and other platform overheads. You've got to balance how
> much computation you want to do vs. the amount of time you want to spend
> waiting for the platform.
>
> - Evan
>
> On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Hello Devs,
>>
>> This email concerns some timing results for a treeAggregate in
>> computing a (stochastic) gradient over an RDD of labelled points, as
>> is currently done in the MLlib optimization routine for SGD.
>>
>> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1],
>> and the subgradients over all the instances in the downsampled RDD are
>> aggregated to the driver as a dense vector. However, we have noticed
>> some unusual behaviour when f < 1: it takes the same amount of time to
>> compute the stochastic gradient for a stochastic minibatch as it does
>> for a full batch (f = 1).
>>
>> Attached are two plots of the mean task timing metrics for each level
>> in the aggregation, which has been performed with 4 levels (level 4 is
>> the final level, in which the results are communicated to the driver).
>> 16 nodes are used, and the RDD has 256 partitions. We run in (client)
>> standalone mode. Here, the total time for the tasks is shown (\tau)
>> alongside the execution time (not counting GC),
>> serialization/deserialization time, the GC time, and the difference
>> between tau and all other times, assumed to be variable
>> IO/communication/waiting time. The RDD in this case is a labelled
>> point representation of the KDD Bridge to Algebra dataset, with 20M
>> (sparse) instances and a problem dimension of 30M. The sparsity of the
>> instances is very high---each individual instance vector may have only
>> a hundred nonzeros. All metrics have been taken from the JSON Spark
>> event logs.
>>
>> The plot gradient_f1.pdf shows the times for a gradient computation
>> with f = 1, and gradient_f-3.pdf shows the same metrics with f = 1e-3.
>> For other f values in {1e-1 1e-2 ... 1e-5}, the same effect is
>> observed.
>>
>> What I would like to mention about these plots, and ask if anyone has
>> experience with, is the following:
>> 1. The times are essentially identical; I would have thought that
>> downsampling the RDD before aggregating the subgradients would at
>> least reduce the execution time required, if not the
>> communication/serialization times.
>> 2. The serialization time in level 4 is almost entirely from the
>> result serialization to the driver, and not the task deserialization.
>> In each level of the treeAggregation, however, the local (dense)
>> gradients have to be communicated between compute nodes, so I am
>> surprised that it takes so much 

Re: ClassCastException using DataFrame only when num-executors > 2 ...

2015-09-26 Thread Olivier Girardot
sorry for the delay, yes still.
I'm still trying to figure out if it comes from bad data and trying to
isolate the bug itself...

2015-09-11 0:28 GMT+02:00 Reynold Xin :

> Does this still happen on 1.5.0 release?
>
>
> On Mon, Aug 31, 2015 at 9:31 AM, Olivier Girardot 
> wrote:
>
>> tested now against Spark 1.5.0 rc2, and same exceptions happen when
>> num-executors > 2 :
>>
>> 15/08/25 10:31:10 WARN scheduler.TaskSetManager: Lost task 0.1 in stage
>> 5.0 (TID 501, xxx): java.lang.ClassCastException: java.lang.Double
>> cannot be cast to java.lang.Long
>> at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
>> at
>> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41)
>> at
>> org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:220)
>> at
>> org.apache.spark.sql.catalyst.expressions.JoinedRow.getLong(JoinedRow.scala:85)
>> at
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
>> Source)
>> at
>> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:325)
>> at
>> org.apache.spark.sql.execution.Window$$anonfun$8$$anon$1.next(Window.scala:252)
>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>
>>
>> 2015-08-26 11:47 GMT+02:00 Olivier Girardot :
>>
>>> Hi everyone,
>>> I know this "post title" doesn't seem very logical and I agree,
>>> we have a very complex computation using "only" pyspark dataframes and
>>> when launching the computation on a CDH 5.3 cluster using Spark 1.5.0 rc1
>>> (problem is reproduced with 1.4.x).
>>> If the number of executors is the default 2, the computation is very
>>> long but doesn't fail.
>>> If the number of executors is 3 or more (tested up to 20), then the
>>> computation fails very quickly with the following error :
>>>
>>> *Caused by: java.lang.ClassCastException: java.lang.Double cannot be
>>> cast to java.lang.Long*
>>>
>>> The complete stracktrace being :
>>>
>>> Driver stacktrace:
>>> at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1267)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1255)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1254)
>>> at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1254)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:684)
>>> at scala.Option.foreach(Option.scala:236)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:684)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1480)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1442)
>>> at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1431)
>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:554)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1805)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1818)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1831)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1902)
>>> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:905)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:904)
>>> at org.apache.spark.RangePartitioner$.sketch(Partitioner.scala:264)
>>> at org.apache.spark.RangePartitioner.(Partitioner.scala:126)
>>> at
>>> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:156)
>>> at
>>> org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:141)
>>> at
>>> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
>>> ... 138 more
>>> *Caused by: java.lang.ClassCastException: java.lang.Double cannot be
>>> cast to java.lang.Long*
>>> at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:41)
>>> at
>>> 

Re: RDD API patterns

2015-09-26 Thread Evan R. Sparks
Mike,

I believe the reason you're seeing near identical performance on the
gradient computations is twofold
1) Gradient computations for GLM models are computationally pretty cheap
from a FLOPs/byte read perspective. They are essentially a BLAS "gemv" call
in the dense case, which is well known to be bound by memory bandwidth on
modern processors. So, you're basically paying the cost of a scan of the
points you've sampled to do the gradient computation.
2) The default sampling mechanism used by the GradientDescent optimizer in
MLlib is implemented via RDD.sample, which does reservoir sampling on each
partition. This requires a full scan of each partition at every iteration
to collect the samples.

So - you're going to pay the cost of a scan to do the sampling anyway, and
the gradient computation is essentially free at this point (and can be
pipelined, etc.).

It is quite possible to improve #2 by coming up with a better sampling
algorithm. One easy algorithm would be to assume the data is already
randomly shuffled (or do that once) and then use the first
miniBatchFraction*partitionSize records on the first iteration, the second
set on the second set on the second iteration, and so on. You could
protoype this algorithm pretty easily by converting your data to an
RDD[Array[DenseVector]] and doing some bookkeeping at each iteration.

That said - eventually the overheads of the platform catch up to you. As a
rule of thumb I estimate about 50ms/iteration as a floor for things like
task serialization and other platform overheads. You've got to balance how
much computation you want to do vs. the amount of time you want to spend
waiting for the platform.

- Evan

On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <91m...@gmail.com> wrote:

> Hello Devs,
>
> This email concerns some timing results for a treeAggregate in
> computing a (stochastic) gradient over an RDD of labelled points, as
> is currently done in the MLlib optimization routine for SGD.
>
> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1],
> and the subgradients over all the instances in the downsampled RDD are
> aggregated to the driver as a dense vector. However, we have noticed
> some unusual behaviour when f < 1: it takes the same amount of time to
> compute the stochastic gradient for a stochastic minibatch as it does
> for a full batch (f = 1).
>
> Attached are two plots of the mean task timing metrics for each level
> in the aggregation, which has been performed with 4 levels (level 4 is
> the final level, in which the results are communicated to the driver).
> 16 nodes are used, and the RDD has 256 partitions. We run in (client)
> standalone mode. Here, the total time for the tasks is shown (\tau)
> alongside the execution time (not counting GC),
> serialization/deserialization time, the GC time, and the difference
> between tau and all other times, assumed to be variable
> IO/communication/waiting time. The RDD in this case is a labelled
> point representation of the KDD Bridge to Algebra dataset, with 20M
> (sparse) instances and a problem dimension of 30M. The sparsity of the
> instances is very high---each individual instance vector may have only
> a hundred nonzeros. All metrics have been taken from the JSON Spark
> event logs.
>
> The plot gradient_f1.pdf shows the times for a gradient computation
> with f = 1, and gradient_f-3.pdf shows the same metrics with f = 1e-3.
> For other f values in {1e-1 1e-2 ... 1e-5}, the same effect is
> observed.
>
> What I would like to mention about these plots, and ask if anyone has
> experience with, is the following:
> 1. The times are essentially identical; I would have thought that
> downsampling the RDD before aggregating the subgradients would at
> least reduce the execution time required, if not the
> communication/serialization times.
> 2. The serialization time in level 4 is almost entirely from the
> result serialization to the driver, and not the task deserialization.
> In each level of the treeAggregation, however, the local (dense)
> gradients have to be communicated between compute nodes, so I am
> surprised that it takes so much longer to return the vectors to the
> driver.
>
> I initially wondered if the large IO overhead in the last stage had
> anything to do with client mode vs cluster mode, since, from what I
> understand, only a single core is allocated to the driver thread in
> client mode. However, when running tests in the two modes, I have
> previously seen no appreciable difference in the running time for
> other (admittedly smaller) problems. Furthermore, I am still very
> confused about why the execution time for each task is just as large
> for the downsampled RDD. It seems unlikely that sampling each
> partition would be as expensive as the gradient computations, even for
> sparse feature vectors.
>
> If anyone has experience working with the sampling in minibatch SGD or
> has tested the scalability of the treeAggregation operation for
> vectors, I'd really 

Re: [VOTE] Release Apache Spark 1.5.1 (RC1)

2015-09-26 Thread robineast
+1


build/mvn clean package -DskipTests -Pyarn -Phadoop-2.6
OK
Basic graph tests
  Load graph using edgeListFile...SUCCESS
  Run PageRank...SUCCESS
Minimum Spanning Tree Algorithm
  Run basic Minimum Spanning Tree algorithm...SUCCESS
  Run Minimum Spanning Tree taxonomy creation...SUCCESS



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Release-Apache-Spark-1-5-1-RC1-tp14310p14380.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: treeAggregate timing / SGD performance with miniBatchFraction < 1

2015-09-26 Thread Mike Hynes
That is an interesting point; I run the driver as a background process
on the master node so that I can still pipe the stdout/stderr
filestreams to the (network) filesystem.
I should mention that the master is connected to the slaves with a 10
Gb link on the same managed switch that the slaves use.

On 9/26/15, Evan R. Sparks  wrote:
> Off the top of my head, I'm not sure, but it looks like virtually all the
> extra time between each stage is accounted for with T_{io} in your plot,
> which I'm guessing is time spent communicating results over the network? Is
> your driver running on the master or is it on a different node? If you look
> at the code for treeAggregate, the last stage uses a .reduce() for the
> final combination, which happens on the driver. In this case, the size of
> the gradients is O(1GB) so if you've got to go over a slow link for the
> last portion this could really make a difference.
>
> On Sat, Sep 26, 2015 at 10:20 AM, Mike Hynes <91m...@gmail.com> wrote:
>
>> Hi Evan,
>>
>> (I just realized my initial email was a reply to the wrong thread; I'm
>> very sorry about this).
>>
>> Thanks for your email, and your thoughts on the sampling. That the
>> gradient computations are essentially the cost of a pass through each
>> element of the partition makes sense, especially given the sparsity of
>> the feature vectors.
>>
>> Would you have any idea why the communication time is so much larger
>> in the final level of the aggregation, however? I can't immediately
>> see why it should take longer to transfer the local gradient vectors
>> in that level, since they are dense in every level. Furthermore, the
>> driver is receiving the result of only 4 tasks, which is relatively
>> small.
>>
>> Mike
>>
>>
>> On 9/26/15, Evan R. Sparks  wrote:
>> > Mike,
>> >
>> > I believe the reason you're seeing near identical performance on the
>> > gradient computations is twofold
>> > 1) Gradient computations for GLM models are computationally pretty
>> > cheap
>> > from a FLOPs/byte read perspective. They are essentially a BLAS "gemv"
>> call
>> > in the dense case, which is well known to be bound by memory bandwidth
>> > on
>> > modern processors. So, you're basically paying the cost of a scan of
>> > the
>> > points you've sampled to do the gradient computation.
>> > 2) The default sampling mechanism used by the GradientDescent optimizer
>> in
>> > MLlib is implemented via RDD.sample, which does reservoir sampling on
>> each
>> > partition. This requires a full scan of each partition at every
>> > iteration
>> > to collect the samples.
>> >
>> > So - you're going to pay the cost of a scan to do the sampling anyway,
>> and
>> > the gradient computation is essentially free at this point (and can be
>> > pipelined, etc.).
>> >
>> > It is quite possible to improve #2 by coming up with a better sampling
>> > algorithm. One easy algorithm would be to assume the data is already
>> > randomly shuffled (or do that once) and then use the first
>> > miniBatchFraction*partitionSize records on the first iteration, the
>> second
>> > set on the second set on the second iteration, and so on. You could
>> > protoype this algorithm pretty easily by converting your data to an
>> > RDD[Array[DenseVector]] and doing some bookkeeping at each iteration.
>> >
>> > That said - eventually the overheads of the platform catch up to you.
>> > As
>> a
>> > rule of thumb I estimate about 50ms/iteration as a floor for things
>> > like
>> > task serialization and other platform overheads. You've got to balance
>> how
>> > much computation you want to do vs. the amount of time you want to
>> > spend
>> > waiting for the platform.
>> >
>> > - Evan
>> >
>> > On Sat, Sep 26, 2015 at 9:27 AM, Mike Hynes <91m...@gmail.com> wrote:
>> >
>> >> Hello Devs,
>> >>
>> >> This email concerns some timing results for a treeAggregate in
>> >> computing a (stochastic) gradient over an RDD of labelled points, as
>> >> is currently done in the MLlib optimization routine for SGD.
>> >>
>> >> In SGD, the underlying RDD is downsampled by a fraction f \in (0,1],
>> >> and the subgradients over all the instances in the downsampled RDD are
>> >> aggregated to the driver as a dense vector. However, we have noticed
>> >> some unusual behaviour when f < 1: it takes the same amount of time to
>> >> compute the stochastic gradient for a stochastic minibatch as it does
>> >> for a full batch (f = 1).
>> >>
>> >> Attached are two plots of the mean task timing metrics for each level
>> >> in the aggregation, which has been performed with 4 levels (level 4 is
>> >> the final level, in which the results are communicated to the driver).
>> >> 16 nodes are used, and the RDD has 256 partitions. We run in (client)
>> >> standalone mode. Here, the total time for the tasks is shown (\tau)
>> >> alongside the execution time (not counting GC),
>> >> serialization/deserialization time, the GC time, and the difference
>> >> between tau 

Spark-Kafka Connector issue

2015-09-26 Thread Ratika Prasad
Hi All,

I am trying out the spark streaming and reading the messages from kafka topics 
which later would be created into streams as below...I have the kafka setup on 
a vm and topics created however when I try to run the program below from my 
spark vm as below I get an error even though the kafka server and zookeeper are 
up and running

./bin/spark-submit --class org.stream.processing.JavaKafkaStreamEventProcessing 
--master local spark-stream-processing-0.0.1-SNAPSHOT-jar-with-dependencies.jar 
172.28.161.32:2181 redemption_inbound

Exception in thread "main" org.apache.spark.SparkException: 
java.io.EOFException: Received -1 when reading from channel, socket has likely 
been closed.
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at 
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at 
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
at 
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:532)
at 
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at 
org.stream.processing.JavaKafkaStreamEventProcessing.main(JavaKafkaStreamEventProcessing.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Program

public static void main(String[] args) {
if (args.length < 2) {
  System.err.println("Usage: DirectKafkaWordCount  \n" +
  "   is a list of one or more Kafka brokers\n" +
  "   is a list of one or more kafka topics to consume 
from\n\n");
  System.exit(1);
}

String brokers = args[0];
String topics = args[1];

// Create context with 2 second batch interval
SparkConf sparkConf = new 
SparkConf().setAppName("JavaKafkaStreamEventProcessing");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(2));

HashSet topicsSet = new 
HashSet(Arrays.asList(topics.split(",")));
HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", brokers);

// Create direct kafka stream with brokers and topics
JavaPairInputDStream messages = 
KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);

// Get the lines, split them into words, count the words and print
JavaDStream lines = messages.map(new Function, String>() {
  public String call(Tuple2 tuple2) {
return tuple2._2();
  }
});
JavaDStream words = lines.flatMap(new FlatMapFunction() {
  public Iterable call(String x) {
return Lists.newArrayList(SPACE.split(x));
  }
});
JavaPairDStream wordCounts = words.mapToPair(
  new PairFunction() {
public Tuple2 call(String s) {
  return new Tuple2(s, 1);
}
  }).reduceByKey(
new Function2() {
public Integer call(Integer i1, Integer i2) {
  return i1 + i2;
}
  });
wordCounts.print();
System.out.println("Word Counts are : " + wordCounts.toString());

// Start the computation
jssc.start();
jssc.awaitTermination();
  }
}