Unsubscribe

2018-02-11 Thread Archit Thakur
Unsubscribe


Re: Spark streaming job hangs

2015-12-01 Thread Archit Thakur
Which version of spark you are runinng? Have you created Kafka-Directstream
? I am asking coz you might / might not be using receivers.
Also, When you say hangs, you mean there is no other log after this and
process still up?
Or do you mean, it kept on adding the jobs but did nothing else. (I am
optimistic :) ).

On Tue, Dec 1, 2015 at 4:12 PM, Paul Leclercq 
wrote:

> You might not have enough cores to process data from Kafka
>
>
>> When running a Spark Streaming program locally, do not use “local” or
>> “local[1]” as the master URL. Either of these means that only one thread
>> will be used for running tasks locally. If you are using a input DStream
>> based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single
>> thread will be used to run the receiver, leaving no thread for processing
>> the received data. *Hence, when running locally, always use “local[n]”
>> as the master URL, ​*where n > number of receivers to run (see Spark
>> Properties for information on how to set the master).*
>
>
>
>  
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#input-dstreams-and-receivers
> 
>
> 2015-12-01 7:13 GMT+01:00 Cassa L :
>
>> Hi,
>>  I am reading data from Kafka into spark. It runs fine for sometime but
>> then hangs forever with following output. I don't see and errors in logs.
>> How do I debug this?
>>
>> 2015-12-01 06:04:30,697 [dag-scheduler-event-loop] INFO
>> (Logging.scala:59) - Adding task set 19.0 with 4 tasks
>> 2015-12-01 06:04:30,872 [pool-13-thread-1] INFO  (Logging.scala:59) -
>> Disconnected from Cassandra cluster: APG DEV Cluster
>> 2015-12-01 06:04:35,060 [JobGenerator] INFO  (Logging.scala:59) - Added
>> jobs for time 1448949875000 ms
>> 2015-12-01 06:04:40,054 [JobGenerator] INFO  (Logging.scala:59) - Added
>> jobs for time 144894988 ms
>> 2015-12-01 06:04:45,034 [JobGenerator] INFO  (Logging.scala:59) - Added
>> jobs for time 1448949885000 ms
>> 2015-12-01 06:04:50,100 [JobGenerator] INFO  (Logging.scala:59) - Added
>> jobs for time 144894989 ms
>> 2015-12-01 06:04:55,064 [JobGenerator] INFO  (Logging.scala:59) - Added
>> jobs for time 1448949895000 ms
>> 2015-12-01 06:05:00,125 [JobGenerator] INFO  (Logging.scala:59) - Added
>> jobs for time 144894990 ms
>>
>>
>> Thanks
>> LCassa
>>
>
>
>
> --
>
> Paul Leclercq | Data engineer
>
>
>  paul.lecle...@tabmo.io  |  http://www.tabmo.fr/
>


Dynamic DAG use-case for spark streaming.

2015-09-29 Thread Archit Thakur
Hi,

 We are using spark streaming as our processing engine, and as part of
output we want to push the data to UI. Now there would be multiple users
accessing the system with there different filters on. Based on the filters
and other inputs we want to either run a SQL Query on DStream or do a
custom logic processing. This would need the system to read the
filters/query and generate the execution graph at runtime. I cant see any
support in spark streaming for generating the execution graph on the fly.
I think I can broadcast the query to executors and read the broadcasted
query at runtime but that would also limit my user to 1 at a time.

Do we not expect the spark streaming to take queries/filters from outside
world. Does output in spark streaming only means outputting to an external
source which could then be queried.

Thanks,
Archit Thakur.


Re: using multiple dstreams together (spark streaming)

2015-09-28 Thread Archit Thakur
@TD: Doesn't transformWith need both of the DStreams to be of same
slideDuration.
[Spark Version: 1.3.1]



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/using-multiple-dstreams-together-spark-streaming-tp9947p24839.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Parquet number of partitions

2015-05-07 Thread Archit Thakur
Hi.
No. of partitions are determined by the RDD it uses in the plan it creates.
It uses NewHadoopRDD which gives partitions by getSplits of input format it
is using. It uses FilteringParquetRowInputFormat subclass of
ParquetInputFormat. To change the no of partitions write a new input format
and make the NewHadoopRDD use your plan. or if u r ready to shuffle u can
use repartition api without change of code.

Thanks & Regards.

On Tue, May 5, 2015 at 7:56 PM, Masf  wrote:

> Hi Eric.
>
> Q1:
> When I read parquet files, I've tested that Spark generates so many
> partitions as parquet files exist in the path.
>
> Q2:
> To reduce the number of partitions you can use rdd.repartition(x), x=>
> number of partitions. Depend on your case, repartition could be a heavy task
>
>
> Regards.
> Miguel.
>
> On Tue, May 5, 2015 at 3:56 PM, Eric Eijkelenboom <
> eric.eijkelenb...@gmail.com> wrote:
>
>> Hello guys
>>
>> Q1: How does Spark determine the number of partitions when reading a
>> Parquet file?
>>
>> val df = sqlContext.parquetFile(path)
>>
>> Is it some way related to the number of Parquet row groups in my input?
>>
>> Q2: How can I reduce this number of partitions? Doing this:
>>
>> df.rdd.coalesce(200).count
>>
>> from the spark-shell causes job execution to hang…
>>
>> Any ideas? Thank you in advance.
>>
>> Eric
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
>
>
> Saludos.
> Miguel Ángel
>


Re: Number of input partitions in SparkContext.sequenceFile

2015-05-02 Thread Archit Thakur
Hi,

How did u check no of splits in ur file. Did i run ur mr job or calculated
it.?
 The formula for split size is
max(minSize, min(max size, block size)). Can u check if it satisfies ur
case.?

Thanks & Regards,
Archit Thakur.

On Saturday, April 25, 2015, Wenlei Xie  wrote:

> Hi,
>
> I checked the number of partitions by
>
> System.out.println("INFO: RDD with " + rdd.partitions().size() + "
> partitions created.");
>
>
> Each single split is about 100MB. I am currently loading the data from
> local file system, would this explains this observation?
>
> Thank you!
>
> Best,
> Wenlei
>
> On Tue, Apr 21, 2015 at 6:28 AM, Archit Thakur  > wrote:
>
>> Hi,
>>
>> It should generate the same no of partitions as the no. of splits.
>> Howd you check no of partitions.? Also please paste your file size and
>> hdfs-site.xml and mapred-site.xml here.
>>
>> Thanks and Regards,
>> Archit Thakur.
>>
>> On Sat, Apr 18, 2015 at 6:20 PM, Wenlei Xie > > wrote:
>>
>>> Hi,
>>>
>>> I am wondering the mechanism that determines the number of partitions
>>> created by SparkContext.sequenceFile ?
>>>
>>> For example, although my file has only 4 splits, Spark would create 16
>>> partitions for it. Is it determined by the file size? Is there any way to
>>> control it? (Looks like I can only tune minPartitions but not maxPartitions)
>>>
>>> Thank you!
>>>
>>> Best,
>>> Wenlei
>>>
>>>
>>>
>>
>
>
> --
> Wenlei Xie (谢文磊)
>
> Ph.D. Candidate
> Department of Computer Science
> 456 Gates Hall, Cornell University
> Ithaca, NY 14853, USA
> Email: wenlei@gmail.com
> 
>


Re: How to run spark programs in eclipse like mapreduce

2015-04-21 Thread Archit Thakur
You just need to specify your master as local and run the main clas that
created the sparkcontext object in eclipse.

On Mon, Apr 20, 2015 at 12:18 PM, Akhil Das 
wrote:

> Why not build the project and submit the build jar with Spark submit?
>
> If you want to run it within eclipse, then all you have to do is, create a
> SparkContext pointing to your cluster, do a
> sc.addJar("/path/to/your/project/jar") and then you can hit the run button
> to run the job (note that network shouldn't be a problem between your
> driver and the cluster)
>
> Thanks
> Best Regards
>
> On Mon, Apr 20, 2015 at 12:14 PM, sandeep vura 
> wrote:
>
>> Hi Sparkers,
>>
>> I have written a code in python in eclipse now that code should execute
>> in spark cluster like mapreduce jobs in hadoop cluster.Can anyone please
>> help me with instructions.
>>
>> Regards,
>> Sandeep.v
>>
>
>


Re: Custom Partitioning Spark

2015-04-21 Thread Archit Thakur
Hi,

This should work. How are you checking the no. of partitions.?

Thanks and Regards,
Archit Thakur.

On Mon, Apr 20, 2015 at 7:26 PM, mas  wrote:

> Hi,
>
> I aim to do custom partitioning on a text file. I first convert it into
> pairRDD and then try to use my custom partitioner. However, somehow it is
> not working. My code snippet is given below.
>
> val file=sc.textFile(filePath)
> val locLines=file.map(line => line.split("\t")).map(line=>
> ((line(2).toDouble,line(3).toDouble),line(5).toLong))
> val ck=locLines.partitionBy(new HashPartitioner(50)) // new
> CustomPartitioner(50) -- none of the way is working here.
>
> while reading the file using "textFile" method it automatically partitions
> the file. However when i explicitly want to partition the new rdd
> "locLines", It doesn't appear to do anything and even the number of
> partitions are same which is created by sc.textFile().
>
> Any help in this regard will be highly appreciated.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Custom-Partitioning-Spark-tp22571.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Number of input partitions in SparkContext.sequenceFile

2015-04-21 Thread Archit Thakur
Hi,

It should generate the same no of partitions as the no. of splits.
Howd you check no of partitions.? Also please paste your file size and
hdfs-site.xml and mapred-site.xml here.

Thanks and Regards,
Archit Thakur.

On Sat, Apr 18, 2015 at 6:20 PM, Wenlei Xie  wrote:

> Hi,
>
> I am wondering the mechanism that determines the number of partitions
> created by SparkContext.sequenceFile ?
>
> For example, although my file has only 4 splits, Spark would create 16
> partitions for it. Is it determined by the file size? Is there any way to
> control it? (Looks like I can only tune minPartitions but not maxPartitions)
>
> Thank you!
>
> Best,
> Wenlei
>
>
>


Re: mapPartitions vs foreachPartition

2015-04-20 Thread Archit Thakur
True.

On Mon, Apr 20, 2015 at 4:14 PM, Arun Patel  wrote:

> mapPartitions is a transformation and foreachPartition is a an action?
>
> Thanks
> Arun
>
> On Mon, Apr 20, 2015 at 4:38 AM, Archit Thakur 
> wrote:
>
>> The same, which is between map and foreach. map takes iterator returns
>> iterator foreach takes iterator returns Unit.
>>
>> On Mon, Apr 20, 2015 at 4:05 PM, Arun Patel 
>> wrote:
>>
>>> What is difference between mapPartitions vs foreachPartition?
>>>
>>> When to use these?
>>>
>>> Thanks,
>>> Arun
>>>
>>
>>
>


Re: mapPartitions vs foreachPartition

2015-04-20 Thread Archit Thakur
The same, which is between map and foreach. map takes iterator returns
iterator foreach takes iterator returns Unit.

On Mon, Apr 20, 2015 at 4:05 PM, Arun Patel  wrote:

> What is difference between mapPartitions vs foreachPartition?
>
> When to use these?
>
> Thanks,
> Arun
>


Re: Running spark over HDFS

2015-04-20 Thread Archit Thakur
There are lot of similar problems shared and resolved by users on this same
portal. I have been part of those discussions before, Search those, Please
Try them and let us know, if you still face problems.

Thanks and Regards,
Archit Thakur.

On Mon, Apr 20, 2015 at 3:05 PM, madhvi  wrote:

>  On Monday 20 April 2015 02:52 PM, SURAJ SHETH wrote:
>
>   Hi Madhvi,
> I think the memory requested by your job, i.e. 2.0 GB is higher than what
> is available.
> Please request for 256 MB explicitly while creating Spark Context and try
> again.
>
>  Thanks and Regards,
> Suraj Sheth
>
>
>   Tried the same but still no luck:|
>
> Madhvi
>


Re: Addition of new Metrics for killed executors.

2015-04-20 Thread Archit Thakur
Hi Twinkle,

We have a use case in where we want to debug the reason of how n why an
executor got killed.
Could be because of stackoverflow, GC or any other unexpected scenario.
If I see the driver UI there is no information present around killed
executors, So was just curious how do people usually debug those things
apart from scanning logs and understanding it. The metrics we are planning
to add are similar to what we have for non killed executors - [data per
stage specifically] - numFailedTasks, executorRunTime, inputBytes,
memoryBytesSpilled .. etc.

Apart from that we also intend to add all information present in an
executor tabs for running executors.

Thanks,
Archit Thakur.

On Mon, Apr 20, 2015 at 1:31 PM, twinkle sachdeva <
twinkle.sachd...@gmail.com> wrote:

> Hi Archit,
>
> What is your use case and what kind of metrics are you planning to add?
>
> Thanks,
> Twinkle
>
> On Fri, Apr 17, 2015 at 4:07 PM, Archit Thakur 
> wrote:
>
>> Hi,
>>
>> We are planning to add new Metrics in Spark for the executors that got
>> killed during the execution. Was just curious, why this info is not already
>> present. Is there some reason for not adding it.?
>> Any ideas around are welcome.
>>
>> Thanks and Regards,
>> Archit Thakur.
>>
>
>


Re: Custom partioner

2015-04-18 Thread Archit Thakur
Yes you can. Use partitionby method and pass partitioner to it.
On Apr 17, 2015 8:18 PM, "Jeetendra Gangele"  wrote:

> Ok is there a way, I can use  hash Partitioning so that I can improve the
> performance?
>
>
> On 17 April 2015 at 19:33, Archit Thakur 
> wrote:
>
>> By custom installation, I meant change the code and build it. I have not
>> done the complete impact analysis, just had a look on the code.
>>
>> When you say, same key goes to same node, It would need shuffling unless
>> the raw data you are reading is present that way.
>> On Apr 17, 2015 6:30 PM, "Jeetendra Gangele" 
>> wrote:
>>
>>> Hi Archit Thanks for reply.
>>> How can I don the costom compilation so reduce it to 4 bytes.I want to
>>> make it to 4 bytes in any case can you please guide?
>>>
>>> I am applying flatMapvalue in each step after ZipWithIndex it should be
>>> in same Node right? Why its suffling?
>>> Also I am running with very less records currently still its shuffling ?
>>>
>>> regards
>>> jeetendra
>>>
>>>
>>>
>>> On 17 April 2015 at 15:58, Archit Thakur 
>>> wrote:
>>>
>>>> I dont think you can change it to 4 bytes without any custom
>>>> compilation.
>>>> To make same key go to same node, you'll have to repartition the data,
>>>> which is shuffling anyway. Unless your raw data is such that the same key
>>>> is on same node, you'll have to shuffle atleast once to make same key on
>>>> same node.
>>>>
>>>> On Thu, Apr 16, 2015 at 10:16 PM, Jeetendra Gangele <
>>>> gangele...@gmail.com> wrote:
>>>>
>>>>> Hi All
>>>>>
>>>>> I have a RDD which has 1 million keys and each key is repeated from
>>>>> around 7000 values so total there will be around 1M*7K records in RDD.
>>>>>
>>>>> and each key is created from ZipWithIndex so key start from 0 to M-1
>>>>> the problem with ZipWithIndex is it take long for key which is 8
>>>>> bytes. can I reduce it to 4 bytes?
>>>>>
>>>>> Now how Can I make sure the record with same key will go the same node
>>>>> so that I can avoid shuffling. Also how default partition-er will work 
>>>>> here.
>>>>>
>>>>> Regards
>>>>> jeetendra
>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>
>
>
>


Re: Can't get SparkListener to work

2015-04-18 Thread Archit Thakur
Hi Praveen,
Can you try once removing throw exception in map. Do you still not get it.?
On Apr 18, 2015 8:14 AM, "Praveen Balaji" 
wrote:

> Thanks for the response, Imran. I probably chose the wrong methods for
> this email. I implemented all methods of SparkListener and the only
> callback I get is onExecutorMetricsUpdate.
>
> Here's the complete code:
>
> ==
>
> import org.apache.spark.scheduler._
>
> sc.addSparkListener(new SparkListener() {
>   override def onStageCompleted(e: SparkListenerStageCompleted) =
> println(" onStageCompleted");
>   override def onStageSubmitted(e: SparkListenerStageSubmitted) =
> println(" onStageSubmitted");
>   override def onTaskStart(e: SparkListenerTaskStart) = println("
> onTaskStart");
>   override def onTaskGettingResult(e: SparkListenerTaskGettingResult)
> = println(" onTaskGettingResult");
>   override def onTaskEnd(e: SparkListenerTaskEnd) = println("
> onTaskEnd");
>   override def onJobStart(e: SparkListenerJobStart) = println(">>>
> onJobStart");
>   override def onJobEnd(e: SparkListenerJobEnd) = println("
> onJobEnd");
>   override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate)
> = println(" onEnvironmentUpdate");
>   override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded)
> = println(" onBlockManagerAdded");
>   override def onBlockManagerRemoved(e:
> SparkListenerBlockManagerRemoved) = println(" onBlockManagerRemoved");
>   override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
> println(" onUnpersistRDD");
>   override def onApplicationStart(e: SparkListenerApplicationStart) =
> println(" onApplicationStart");
>   override def onApplicationEnd(e: SparkListenerApplicationEnd) =
> println(" onApplicationEnd");
>   override def onExecutorMetricsUpdate(e:
> SparkListenerExecutorMetricsUpdate) = println("
> onExecutorMetricsUpdate");
> });
>
> sc.parallelize(List(1, 2, 3)).map(throw new
> SparkException("test")).collect();
>
> =
>
> On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid 
> wrote:
>
>> when you start the spark-shell, its already too late to get the
>> ApplicationStart event.  Try listening for StageCompleted or JobEnd instead.
>>
>> On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji <
>> secondorderpolynom...@gmail.com> wrote:
>>
>>> I'm trying to create a simple SparkListener to get notified of error on
>>> executors. I do not get any call backs on my SparkListener. Here some
>>> simple code I'm executing in spark-shell. But I still don't get any
>>> callbacks on my listener. Am I doing something wrong?
>>>
>>> Thanks for any clue you can send my way.
>>>
>>> Cheers
>>> Praveen
>>>
>>> ==
>>> import org.apache.spark.scheduler.SparkListener
>>> import org.apache.spark.scheduler.SparkListenerApplicationStart
>>> import org.apache.spark.scheduler.SparkListenerApplicationEnd
>>> import org.apache.spark.SparkException
>>>
>>> sc.addSparkListener(new SparkListener() {
>>>   override def onApplicationStart(applicationStart:
>>> SparkListenerApplicationStart) {
>>> println(" onApplicationStart: " + applicationStart.appName);
>>>   }
>>>
>>>   override def onApplicationEnd(applicationEnd:
>>> SparkListenerApplicationEnd) {
>>> println(" onApplicationEnd: " + applicationEnd.time);
>>>   }
>>> });
>>>
>>> sc.parallelize(List(1, 2, 3)).map(throw new
>>> SparkException("test")).collect();
>>> ===
>>>
>>> output:
>>>
>>> scala> org.apache.spark.SparkException: hshsh
>>> at $iwC$$iwC$$iwC$$iwC.(:29)
>>> at $iwC$$iwC$$iwC.(:34)
>>> at $iwC$$iwC.(:36)
>>> at $iwC.(:38)
>>>
>>>
>>
>


Re: Custom partioner

2015-04-17 Thread Archit Thakur
By custom installation, I meant change the code and build it. I have not
done the complete impact analysis, just had a look on the code.

When you say, same key goes to same node, It would need shuffling unless
the raw data you are reading is present that way.
On Apr 17, 2015 6:30 PM, "Jeetendra Gangele"  wrote:

> Hi Archit Thanks for reply.
> How can I don the costom compilation so reduce it to 4 bytes.I want to
> make it to 4 bytes in any case can you please guide?
>
> I am applying flatMapvalue in each step after ZipWithIndex it should be in
> same Node right? Why its suffling?
> Also I am running with very less records currently still its shuffling ?
>
> regards
> jeetendra
>
>
>
> On 17 April 2015 at 15:58, Archit Thakur 
> wrote:
>
>> I dont think you can change it to 4 bytes without any custom compilation.
>> To make same key go to same node, you'll have to repartition the data,
>> which is shuffling anyway. Unless your raw data is such that the same key
>> is on same node, you'll have to shuffle atleast once to make same key on
>> same node.
>>
>> On Thu, Apr 16, 2015 at 10:16 PM, Jeetendra Gangele > > wrote:
>>
>>> Hi All
>>>
>>> I have a RDD which has 1 million keys and each key is repeated from
>>> around 7000 values so total there will be around 1M*7K records in RDD.
>>>
>>> and each key is created from ZipWithIndex so key start from 0 to M-1
>>> the problem with ZipWithIndex is it take long for key which is 8 bytes.
>>> can I reduce it to 4 bytes?
>>>
>>> Now how Can I make sure the record with same key will go the same node
>>> so that I can avoid shuffling. Also how default partition-er will work here.
>>>
>>> Regards
>>> jeetendra
>>>
>>>
>>
>
>
>


Re: Joined RDD

2015-04-17 Thread Archit Thakur
map phase of join*

On Fri, Apr 17, 2015 at 5:28 PM, Archit Thakur 
wrote:

> Ajay,
>
> This is true. When we call join again on two RDD's.Rather than computing
> the whole pipe again, It reads the map output of the map phase of an
> RDD(which it usually gets from shuffle manager).
>
> If you see the code:
>
>  override def compute(s: Partition, context: TaskContext): Iterator[(K,
> Array[Iterable[_]])] = {
>
> val sparkConf = SparkEnv.get.conf
>
> val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true
> )
>
> for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
>
>   case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
>
> // Read them from the parent
>
> val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[
> Product2[K, Any]]]
>
> rddIterators += ((it, depNum))
>
>
>   case ShuffleCoGroupSplitDep(handle) =>
>
> // Read map outputs of shuffle
>
> val it = SparkEnv.get.shuffleManager
>
>   .getReader(handle, split.index, split.index + 1, context)
>
>   .read()
>
> rddIterators += ((it, depNum))
>
> }
>
> This is CoGroupedRDD.scala, spark-1.3 code.
> If you see the UI, it shows these map stages as skipped.(And, this answers
> your question as well, Hoai-Thu Vong[in different thread about skipped
> stages.]).
>
> Thanks and Regards,
>
> Archit Thakur.
>
>
>
>
> On Thu, Nov 13, 2014 at 3:10 PM, ajay garg  wrote:
>
>> Yes that is my understanding of how it should work.
>> But in my case when I call collect first time, it reads the data from
>> files
>> on the disk.
>> Subsequent collect queries are not reading data files ( Verified from the
>> logs.)
>> On spark ui I see only shuffle read and no shuffle write.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820p18829.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Joined RDD

2015-04-17 Thread Archit Thakur
Ajay,

This is true. When we call join again on two RDD's.Rather than computing
the whole pipe again, It reads the map output of the map phase of an
RDD(which it usually gets from shuffle manager).

If you see the code:

 override def compute(s: Partition, context: TaskContext): Iterator[(K,
Array[Iterable[_]])] = {

val sparkConf = SparkEnv.get.conf

val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)

for ((dep, depNum) <- split.deps.zipWithIndex) dep match {

  case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>

// Read them from the parent

val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[
Product2[K, Any]]]

rddIterators += ((it, depNum))


  case ShuffleCoGroupSplitDep(handle) =>

// Read map outputs of shuffle

val it = SparkEnv.get.shuffleManager

  .getReader(handle, split.index, split.index + 1, context)

  .read()

rddIterators += ((it, depNum))

}

This is CoGroupedRDD.scala, spark-1.3 code.
If you see the UI, it shows these map stages as skipped.(And, this answers
your question as well, Hoai-Thu Vong[in different thread about skipped
stages.]).

Thanks and Regards,

Archit Thakur.




On Thu, Nov 13, 2014 at 3:10 PM, ajay garg  wrote:

> Yes that is my understanding of how it should work.
> But in my case when I call collect first time, it reads the data from files
> on the disk.
> Subsequent collect queries are not reading data files ( Verified from the
> logs.)
> On spark ui I see only shuffle read and no shuffle write.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820p18829.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Addition of new Metrics for killed executors.

2015-04-17 Thread Archit Thakur
Hi,

We are planning to add new Metrics in Spark for the executors that got
killed during the execution. Was just curious, why this info is not already
present. Is there some reason for not adding it.?
Any ideas around are welcome.

Thanks and Regards,
Archit Thakur.


Re: Custom partioner

2015-04-17 Thread Archit Thakur
I dont think you can change it to 4 bytes without any custom compilation.
To make same key go to same node, you'll have to repartition the data,
which is shuffling anyway. Unless your raw data is such that the same key
is on same node, you'll have to shuffle atleast once to make same key on
same node.

On Thu, Apr 16, 2015 at 10:16 PM, Jeetendra Gangele 
wrote:

> Hi All
>
> I have a RDD which has 1 million keys and each key is repeated from around
> 7000 values so total there will be around 1M*7K records in RDD.
>
> and each key is created from ZipWithIndex so key start from 0 to M-1
> the problem with ZipWithIndex is it take long for key which is 8 bytes.
> can I reduce it to 4 bytes?
>
> Now how Can I make sure the record with same key will go the same node so
> that I can avoid shuffling. Also how default partition-er will work here.
>
> Regards
> jeetendra
>
>


Re: Use Case of mutable RDD - any ideas around will help.

2014-09-12 Thread Archit Thakur
LittleCode snippet:

line1: cacheTable(existingRDDTableName)
line2: //some operations which will materialize existingRDD dataset.
line3: existingRDD.union(newRDD).registerAsTable(new_existingRDDTableName)
line4: cacheTable(new_existingRDDTableName)
line5: //some operation that will materialize new _existingRDD.

now, what we expect is in line4 rather than caching both
existingRDDTableName and new_existingRDDTableName, it should cache only
new_existingRDDTableName. but we cannot explicitly uncache
existingRDDTableName because we want the union to use the cached
existingRDDTableName. since being lazy new_existingRDDTableName could be
materialized later and by then we cant lose existingRDDTableName from
cache.

What if keep the same name of the new table

so, cacheTable(existingRDDTableName)
existingRDD.union(newRDD).registerAsTable(existingRDDTableName)
cacheTable(existingRDDTableName) //might not be needed again.

Will our both cases be satisfied, that it uses existingRDDTableName from
cache for union and dont duplicate the data in the cache but somehow,
append to the older cacheTable.

Thanks and Regards,


Archit Thakur.
Sr Software Developer,
Guavus, Inc.

On Sat, Sep 13, 2014 at 12:01 AM, pankaj arora 
wrote:

> I think i should elaborate usecase little more.
>
> So we have UI dashboard whose response time is quite fast as all the data
> is
> cached. Users query data based on time range and also there is always new
> data coming into the system at predefined frequency lets say 1 hour.
>
> As you said i can uncache tables it will basically drop all data from
> memory.
> I cannot afford losing my cache even for short interval. As all queries
> from
> UI will get slow till the time cache loads again. UI response time needs to
> be predictable and shoudl be fast enough so that user does not get
> irritated.
>
> Also i cannot keep two copies of data(till newrdd materialize) into memory
> as it will surpass total available memory in system.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Re-Use-Case-of-mutable-RDD-any-ideas-around-will-help-tp14095p14112.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Anyone know hot to submit spark job to yarn in java code?

2014-08-29 Thread Archit Thakur
Hi,

I am facing the same problem.
Did you find any solution or work around?

Thanks and Regards,
Archit Thakur.


On Thu, Jan 16, 2014 at 6:22 AM, Liu, Raymond  wrote:

> Hi
>
> Regarding your question
>
> 1) when I run the above script, which jar is beed submitted to the yarn
> server ?
>
> What SPARK_JAR env point to and the --jar point to are both submitted to
> the yarn server
>
> 2) It like the  spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar
> plays the role of client side and
> spark-examples-assembly-0.8.1-incubating.jar goes with spark runtime and
> examples which will be running in yarn, am I right?
>
> The spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar will also go to
> yarn cluster as runtime for app
> jar(spark-examples-assembly-0.8.1-incubating.jar)
>
> 3) Does anyone have any similar experience ? I did lots of hadoop MR stuff
> and want follow the same logic to submit spark job. For now I can only find
> the command line way to submit spark job to yarn. I believe there is a easy
> way to integration spark in a web allocation.
>
> You can use the yarn-client mode, you might want to take a look on docs/
> running-on-yarn.md, and probably you might want to try master branch to
> check our latest update on this part of docs. And in yarn client mode, the
> sparkcontext itself will do similar thing as what the command line is doing
> to submit a yarn job
>
> Then to use it with java, you might want to try out JavaSparkContext
> instead of SparkContext, I don't personally run it with complicated
> applications. But a small example app did works.
>
>
> Best Regards,
> Raymond Liu
>
> -Original Message-
> From: John Zhao [mailto:jz...@alpinenow.com]
> Sent: Thursday, January 16, 2014 2:25 AM
> To: u...@spark.incubator.apache.org
> Subject: Anyone know hot to submit spark job to yarn in java code?
>
> Now I am working on a web application and  I want to  submit a spark job
> to hadoop yarn.
> I have already do my own assemble and  can run it in command line by the
> following script:
>
> export YARN_CONF_DIR=/home/gpadmin/clusterConfDir/yarn
> export
> SPARK_JAR=./assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar
> ./spark-class org.apache.spark.deploy.yarn.Client  --jar
> ./examples/target/scala-2.9.3/spark-examples-assembly-0.8.1-incubating.jar
> --class org.apache.spark.examples.SparkPi --args yarn-standalone
> --num-workers 3 --master-memory 1g --worker-memory 512m --worker-cores 1
>
> It works fine.
> The I realized that it is hard to submit the job from a web application
> .Looks like the spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar or
> spark-examples-assembly-0.8.1-incubating.jar is a really big jar. I believe
> it contains everything .
> So my question is :
> 1) when I run the above script, which jar is beed submitted to the yarn
> server ?
> 2) It loos like the  spark-assembly-0.8.1-incubating-hadoop2.0.5-alpha.jar
> plays the role of client side and
> spark-examples-assembly-0.8.1-incubating.jar goes with spark runtime and
> examples which will be running in yarn, am I right?
> 3) Does anyone have any similar experience ? I did lots of hadoop MR stuff
> and want follow the same logic to submit spark job. For now I can only find
> the command line way to submit spark job to yarn. I believe there is a easy
> way to integration spark in a web allocation.
>
>
> Thanks.
> John.
>


Re: Running Spark On Yarn without Spark-Submit

2014-08-29 Thread Archit Thakur
including user@spark.apache.org.


On Fri, Aug 29, 2014 at 2:03 PM, Archit Thakur 
wrote:

> Hi,
>
> My requirement is to run Spark on Yarn without using the script
> spark-submit.
>
> I have a servlet and a tomcat server. As and when request comes, it
> creates a new SC and keeps it alive for the further requests, I ma setting
> my master in sparkConf
>
> as sparkConf.setMaster("yarn-cluster")
>
> but the request is stuck indefinitely.
>
> This works when I set
> sparkConf.setMaster("yarn-client")
>
> I am not sure, why is it not launching job in yarn-cluster mode.
>
> Any thoughts?
>
> Thanks and Regards,
> Archit Thakur.
>
>
>
>


Running Spark On Yarn without Spark-Submit

2014-08-29 Thread Archit Thakur
Hi,

My requirement is to run Spark on Yarn without using the script
spark-submit.

I have a servlet and a tomcat server. As and when request comes, it creates
a new SC and keeps it alive for the further requests, I ma setting my
master in sparkConf

as sparkConf.setMaster("yarn-cluster")

but the request is stuck indefinitely.

This works when I set
sparkConf.setMaster("yarn-client")

I am not sure, why is it not launching job in yarn-cluster mode.

Any thoughts?

Thanks and Regards,
Archit Thakur.


Re: Logging in Spark through YARN.

2014-07-30 Thread Archit Thakur
Hi Marcelo,

Thanks for your quick comment. This doesn't seem working in 1.0.0. release.

-Archit Thakur.


On Thu, Jul 31, 2014 at 3:18 AM, Marcelo Vanzin  wrote:

> Hi Archit,
>
> Are you using spark-submit? If so, can you try adding the following to
> its command line:
>
>   --files /dir/log4j.properties
>
> I tested that locally with the master branch and it works, but I don't
> have a 1.0.x install readily available for me to test at the moment.
> Handling of SPARK_LOG4J_CONF should also be fixed on master, but I
> don't remember what was its status in the 1.0 branch...
>
>
> On Tue, Jul 29, 2014 at 11:37 PM, Archit Thakur
>  wrote:
> > Hi,
> >
> > I want to manage logging of containers when I run Spark through YARN. I
> > checked there is a environment variable exposed to custom
> log4j.properties.
> >
> > Setting SPARK_LOG4J_CONF to "/dir/log4j.properties" should ideally make
> > containers use
> > "/dir/log4j.properties" file for logging. This doesn't seem working.
> >
> > Also, there is a log4j.properties file present in the Spark assembly and
> I
> > guess it is picking that only.
> >
> > Quick Google gave me this:
> https://issues.apache.org/jira/browse/SPARK-2007
> >
> > Is this functnality broken/not working yet?
> > Or I m mistaken somewhere?
> >
> > Thanks.
> > Archit.
>
>
>
> --
> Marcelo
>


Logging in Spark through YARN.

2014-07-29 Thread Archit Thakur
Hi,

I want to manage logging of containers when I run Spark through YARN. I
checked there is a environment variable exposed to custom log4j.properties.

Setting SPARK_LOG4J_CONF to "/dir/log4j.properties" should ideally make
containers use
"/dir/log4j.properties" file for logging. This doesn't seem working.

Also, there is a log4j.properties file present in the Spark assembly and I
guess it is picking that only.

Quick Google gave me this: https://issues.apache.org/jira/browse/SPARK-2007

Is this functnality broken/not working yet?
Or I m mistaken somewhere?

Thanks.
Archit.


Re: java.lang.ClassNotFoundException

2014-05-12 Thread Archit Thakur
Hi Joe,

Your messages are going into spam folder for me.

Thx, Archit_Thakur.


On Fri, May 2, 2014 at 9:22 AM, Joe L  wrote:

> Hi, You should include the jar file of your project. for example:
> conf.set("yourjarfilepath.jar")
>
> Joe
>   On Friday, May 2, 2014 7:39 AM, proofmoore [via Apache Spark User List]
> <[hidden email]> wrote:
>   HelIo. I followed "A Standalone App in Java" part of the tutorial
> https://spark.apache.org/docs/0.8.1/quick-start.html
>
> Spark standalone cluster looks it's running without a problem :
> http://i.stack.imgur.com/7bFv8.png
>
> I have built a fat jar for running this JavaApp on the cluster. Before
> maven package:
>
> find .
>
> ./pom.xml
> ./src
> ./src/main
> ./src/main/java
> ./src/main/java/SimpleApp.java
>
>
> content of SimpleApp.java is :
>
>  import org.apache.spark.api.java.*;
>  import org.apache.spark.api.java.function.Function;
>  import org.apache.spark.SparkConf;
>  import org.apache.spark.SparkContext;
>
>
>  public class SimpleApp {
>  public static void main(String[] args) {
>
>  SparkConf conf =  new SparkConf()
>.setMaster("spark://10.35.23.13:7077")
>.setAppName("My app")
>.set("spark.executor.memory", "1g");
>
>  JavaSparkContext   sc = new JavaSparkContext (conf);
>  String logFile = "/home/ubuntu/spark-0.9.1/test_data";
>  JavaRDD logData = sc.textFile(logFile).cache();
>
>  long numAs = logData.filter(new Function() {
>   public Boolean call(String s) { return s.contains("a"); }
>  }).count();
>
>  System.out.println("Lines with a: " + numAs);
>  }
>  }
>
> This program only works when master is set as setMaster("local").
> Otherwise I get this error : http://i.stack.imgur.com/doRSn.png
>
> Thanks,
> Ibrahim
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-tp5191.html
>  To start a new topic under Apache Spark User List, email [hidden email]
> To unsubscribe from Apache Spark User List, click here.
> NAML
>
>
>
> --
> View this message in context: Re: java.lang.ClassNotFoundException
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>