[TorrentBroadcast] Pyspark Application terminated saying "Failed to get broadcast_1_ piece0 of broadcast_1 in Spark 2.0.0"

2016-12-28 Thread Palash Gupta
Hi Apache Spark User team,


Greetings!
I started developing an application using Apache Hadoop and Spark using python. 
My pyspark application randomly terminated saying "Failed to get broadcast_1*" 
and I have been searching for suggestion and support in Stakeoverflow at Failed 
to get broadcast_1_piece0 of broadcast_1 in pyspark application

  
|  
|  
|  
|   ||

  |

  |
|  
|   |  
Failed to get broadcast_1_piece0 of broadcast_1 in pyspark application
 I was building an application on Apache Spark 2.00 with Python 3.4 and trying 
to load some CSV files from HDFS (...  |   |

  |

  |

 

Could you please provide suggestion registering myself in Apache User list or 
how can I get suggestion or support to debug the problem I am facing?

Your response will be highly appreciated. 


 Thanks & Best Regards,
Engr. Palash GuptaWhatsApp/Viber: +8801817181502Skype: palash2494


   

Re: Spark streaming with Yarn: executors not fully utilized

2016-12-28 Thread Nishant Kumar
Any update on this guys ?

On Wed, Dec 28, 2016 at 10:19 AM, Nishant Kumar 
wrote:

> I have updated my question:
>
> http://stackoverflow.com/questions/41345552/spark-
> streaming-with-yarn-executors-not-fully-utilized
>
> On Wed, Dec 28, 2016 at 9:49 AM, Nishant Kumar 
> wrote:
>
>> Hi,
>>
>> I am running spark streaming with Yarn with -
>>
>> *spark-submit --master yarn --deploy-mode cluster --num-executors 2 
>> --executor-memory 8g --driver-memory 2g --executor-cores 8 ..*
>>
>> I am consuming Kafka through DireactStream approach (No receiver). I
>> have 2 topics (each with 3 partitions).
>>
>> I reparation RDD (i have one DStream) into 16 parts (assuming no of
>> executor * num of cores = 2 * 8 = 16 *Is it correct ?*) and then i do
>> foreachPartition and writes each partition to local file and then send it
>> to other server (not spark) through http (Using apache http client post
>> with multi-part).
>>
>> *When i checked details of this step (or JOB is it correct naming?)
>> through Spark UI, it showed that total 16 task executed on single executor
>> with 8 task at a time.*
>>
>> This is Spark UI details -
>>
>> *Details for Stage 717 (Attempt 0)*
>>
>> Index  ID  Attempt Status  Locality Level  Executor ID / Host  Launch Time 
>> Duration  GC Time Shuffle Read Size / Records Errors
>> 0  5080  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:46 2 s 11 ms 313.3 KB / 6137
>> 1  5081  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:46 2 s 11 ms 328.5 KB / 6452
>> 2  5082  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:46 2 s 11 ms 324.3 KB / 6364
>> 3  5083  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:46 2 s 11 ms 321.5 KB / 6306
>> 4  5084  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:46 2 s 11 ms 324.8 KB / 6364
>> 5  5085  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:46 2 s 11 ms 320.8 KB / 6307
>> 6  5086  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:46 2 s 11 ms 323.4 KB / 6356
>> 7  5087  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:46 3 s 11 ms 316.8 KB / 6207
>> 8  5088  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:48 2 s   317.7 KB / 6245
>> 9  5089  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:48 2 s   320.4 KB / 6280
>> 10  5090  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:48 2 s   323.0 KB / 6334
>> 11  5091  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:48 2 s   323.7 KB / 6371
>> 12  5092  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:48 2 s   316.7 KB / 6218
>> 13  5093  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:48 2 s   321.0 KB / 6301
>> 14  5094  0 SUCCESS NODE_LOCAL  1 / executor1_machine_host_name  2016/12/27 
>> 12:11:48 2 s   321.4 KB / 6304
>>
>> I was expecting it to execute 16 parallel task (2 executor * 8 core) on
>> either one or more executor. I think i am missing something. Please help.
>>
>>
>> --
>> *Nishant Kumar*
>> Senior Software Engineer
>>
>> Phone: +91 80088 42030
>> Skype: nishant.kumar_applift
>>
>>
>> *AppLift India*
>> 107/3, 80 Feet Main Road,
>> Koramangala 4th Block,
>> Bangalore - 560034
>> www.AppLift.com 
>>
>
>
>
> --
> *Nishant Kumar*
> Senior Software Engineer
>
> Phone: +91 80088 42030
> Skype: nishant.kumar_applift
>
>
> *AppLift India*
> 107/3, 80 Feet Main Road,
> Koramangala 4th Block,
> Bangalore - 560034
> www.AppLift.com 
>



-- 
*Nishant Kumar*
Senior Software Engineer

Phone: +91 80088 42030
Skype: nishant.kumar_applift


*AppLift India*
107/3, 80 Feet Main Road,
Koramangala 4th Block,
Bangalore - 560034
www.AppLift.com 

-- 


Meet us at:
@ *PG Connects*, London, Jan 16-17
@ *Mobile Games Forum*, London, Jan 17-18
@ *GMASA*, Jakarta, Jan 26
@* Casual Connect*, Berlin, Feb 7-9
@* Mobile World Congress*, Barcelona, Feb 27-Mar 2
@* GDC*, San Francisco, Feb 27-Mar 3

Click here  to see all the events we will be 
attending. 


Re: Dependency Injection and Microservice development with Spark

2016-12-28 Thread Miguel Morales
Hi

Not sure about Spring boot but trying to use DI libraries you'll run into 
serialization issues.I've had luck using an old version of Scaldi.  
Recently though I've been passing the class types as arguments with default 
values.  Then in the spark code it gets instantiated.  So you're basically 
passing and serializing a class name.

Sent from my iPhone

> On Dec 28, 2016, at 1:55 PM, Lars Albertsson  wrote:
> 
> Do you really need dependency injection?
> 
> DI is often used for testing purposes. Data processing jobs are easy
> to test without DI, however, due to their functional and synchronous
> nature. Hence, DI is often unnecessary for testing data processing
> jobs, whether they are batch or streaming jobs.
> 
> Or do you want to use DI for other reasons?
> 
> 
> Lars Albertsson
> Data engineering consultant
> www.mapflat.com
> https://twitter.com/lalleal
> +46 70 7687109
> Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com
> 
> 
> On Fri, Dec 23, 2016 at 11:56 AM, Chetan Khatri
>  wrote:
>> Hello Community,
>> 
>> Current approach I am using for Spark Job Development with Scala + SBT and
>> Uber Jar with yml properties file to pass configuration parameters. But If i
>> would like to use Dependency Injection and MicroService Development like
>> Spring Boot feature in Scala then what would be the standard approach.
>> 
>> Thanks
>> 
>> Chetan
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Invert large matrix

2016-12-28 Thread Yanwei Wayne Zhang
Hi all,


I have a matrix X stored as RDD[SparseVector] that is high dimensional, say 800 
million rows and 2 million columns, and more 95% of the entries are zero.

Is there a way to invert (X'X + eye) efficiently, where X' is the transpose of 
X and eye is the identity matrix? I am thinking of using RowMatrix but not sure 
if it is feasible.

Any suggestion is highly appreciated.


Thanks.


Wayne



Re: [Spark 2.0.2 HDFS]: no data locality

2016-12-28 Thread Miguel Morales
If you're using Kubernetes you can group spark and hdfs to run in the
same stack.  Meaning they'll basically run in the same network space
and share ips.  Just gotta make sure there's no port conflicts.

On Wed, Dec 28, 2016 at 5:07 AM, Karamba  wrote:
>
> Good idea, thanks!
>
> But unfortunately that's not possible. All containers are connected to
> an overlay network.
>
> Is there any other possiblity to say spark that it is on the same *NODE*
> as an hdfs data node?
>
>
> On 28.12.2016 12:00, Miguel Morales wrote:
>> It might have to do with your container ips, it depends on your
>> networking setup.  You might want to try host networking so that the
>> containers share the ip with the host.
>>
>> On Wed, Dec 28, 2016 at 1:46 AM, Karamba  wrote:
>>> Hi Sun Rui,
>>>
>>> thanks for answering!
>>>
>>>
 Although the Spark task scheduler is aware of rack-level data locality, it 
 seems that only YARN implements the support for it.
>>> This explains why the script that I configured in core-site.xml
>>> topology.script.file.name is not called in by the spark container.
>>> But at time of reading from hdfs in a spark program, the script is
>>> called in my hdfs namenode container.
>>>
 However, node-level locality can still work for Standalone.
>>> I have a couple of physical hosts that run spark and hdfs docker
>>> containers. How does spark standalone knows that spark and docker
>>> containers are on the same host?
>>>
 Data Locality involves in both task data locality and executor data 
 locality. Executor data locality is only supported on YARN with executor 
 dynamic allocation enabled. For standalone, by default, a Spark 
 application will acquire all available cores in the cluster, generally 
 meaning there is at least one executor on each node, in which case task 
 data locality can work because a task can be dispatched to an executor on 
 any of the preferred nodes of the task for execution.

 for your case, have you set spark.cores.max to limit the cores to acquire, 
 which means executors are available on a subset of the cluster nodes?
>>> I set "--total-executor-cores 1" in order to use only a small subset of
>>> the cluster.
>>>
>>>
>>>
>>> On 28.12.2016 02:58, Sun Rui wrote:
 Although the Spark task scheduler is aware of rack-level data locality, it 
 seems that only YARN implements the support for it. However, node-level 
 locality can still work for Standalone.

 It is not necessary to copy the hadoop config files into the Spark CONF 
 directory. Set HADOOP_CONF_DIR to point to the conf directory of your 
 Hadoop.

 Data Locality involves in both task data locality and executor data 
 locality. Executor data locality is only supported on YARN with executor 
 dynamic allocation enabled. For standalone, by default, a Spark 
 application will acquire all available cores in the cluster, generally 
 meaning there is at least one executor on each node, in which case task 
 data locality can work because a task can be dispatched to an executor on 
 any of the preferred nodes of the task for execution.

 for your case, have you set spark.cores.max to limit the cores to acquire, 
 which means executors are available on a subset of the cluster nodes?

> On Dec 27, 2016, at 01:39, Karamba  wrote:
>
> Hi,
>
> I am running a couple of docker hosts, each with an HDFS and a spark
> worker in a spark standalone cluster.
> In order to get data locality awareness, I would like to configure Racks
> for each host, so that a spark worker container knows from which hdfs
> node container it should load its data. Does this make sense?
>
> I configured HDFS container nodes via the core-site.xml in
> $HADOOP_HOME/etc and this works. hdfs dfsadmin -printTopology shows my
> setup.
>
> I configured SPARK the same way. I placed core-site.xml and
> hdfs-site.xml in the SPARK_CONF_DIR ... BUT this has no effect.
>
> Submitting a spark job via spark-submit to the spark-master that loads
> from HDFS just has Data locality ANY.
>
> It would be great if anybody would help me getting the right 
> configuration!
>
> Thanks and best regards,
> on
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



org.apache.spark.SparkException: PairwiseRDD: unexpected value: List([B@20b7e9d2)

2016-12-28 Thread prayag
Hi Guys,

I have a simple spark job:
"
   df = spark.read.csv(fpath, header=True, inferSchema=False)

   def map_func(line):
   map_keys = tuple([line['key1"] + [line[k] for k in KEYS])
   return map_keys, line

   d = df.rdd.map(lmap_func).groupByKey()
   d.map(complicated_func).saveAsTextFile(path)
"

This job is failing with:
16/12/28 05:06:19 ERROR Executor: Exception in task 175.1 in stage 10.1 (TID
51849)
org.apache.spark.SparkException: PairwiseRDD: unexpected value:
List([B@20b7e9d2)
at
org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:392)
at
org.apache.spark.api.python.PairwiseRDD$$anonfun$compute$2.apply(PythonRDD.scala:390)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:162)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The job works for 80 GB data set. This error occurs when I try with 500GB
data set. I'll appreciate any thoughts you have on this.

-Prayag



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-SparkException-PairwiseRDD-unexpected-value-List-B-20b7e9d2-tp28257.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Dependency Injection and Microservice development with Spark

2016-12-28 Thread Lars Albertsson
Do you really need dependency injection?

DI is often used for testing purposes. Data processing jobs are easy
to test without DI, however, due to their functional and synchronous
nature. Hence, DI is often unnecessary for testing data processing
jobs, whether they are batch or streaming jobs.

Or do you want to use DI for other reasons?


Lars Albertsson
Data engineering consultant
www.mapflat.com
https://twitter.com/lalleal
+46 70 7687109
Calendar: https://goo.gl/6FBtlS, https://freebusy.io/la...@mapflat.com


On Fri, Dec 23, 2016 at 11:56 AM, Chetan Khatri
 wrote:
> Hello Community,
>
> Current approach I am using for Spark Job Development with Scala + SBT and
> Uber Jar with yml properties file to pass configuration parameters. But If i
> would like to use Dependency Injection and MicroService Development like
> Spring Boot feature in Scala then what would be the standard approach.
>
> Thanks
>
> Chetan

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Is there any scheduled release date for Spark 2.1.0?

2016-12-28 Thread Justin Miller
Interesting, because a bug that seemed to be fixed in 2.1.0-SNAPSHOT doesn't 
appear to be fixed in 2.1.0 stable (it centered around a null-pointer exception 
during code gen). It seems to be fixed in 2.1.1-SNAPSHOT, but I can try stable 
again.

> On Dec 28, 2016, at 1:38 PM, Mark Hamstra  wrote:
> 
> A SNAPSHOT build is not a stable artifact, but rather floats to the top of 
> commits that are intended for the next release.  So, 2.1.1-SNAPSHOT comes 
> after the 2.1.0 release and contains any code at the time that the artifact 
> was built that was committed to the branch-2.1 maintenance branch and is, 
> therefore, intended for the eventual 2.1.1 maintenance release.  Once a 
> release is tagged and stable artifacts for it can be built, there is no 
> purpose for s SNAPSHOT of that release -- e.g. there is no longer any purpose 
> for a 2.1.0-SNAPSHOT release; if you want 2.1.0, then you should be using 
> stable artifacts now, not SNAPSHOTs.
> 
> The existence of a SNAPSHOT doesn't imply anything about the release date of 
> the associated finished version.  Rather, it only indicates a name that is 
> attached to all of the code that is currently intended for the associated 
> release number. 
> 
> On Wed, Dec 28, 2016 at 3:09 PM, Justin Miller  > wrote:
> It looks like the jars for 2.1.0-SNAPSHOT are gone?
> 
> https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/
>  
> 
> 
> Also:
> 
> 2.1.0-SNAPSHOT/ 
> 
>   Fri Dec 23 16:31:42 UTC 2016
> 2.1.1-SNAPSHOT/ 
> 
>   Wed Dec 28 20:01:10 UTC 2016
> 2.2.0-SNAPSHOT/ 
> 
>   Wed Dec 28 19:12:38 UTC 2016 
> 
> What's with 2.1.1-SNAPSHOT? Is that version about to be released as well?
> 
> Thanks!
> Justin
> 
>> On Dec 28, 2016, at 12:53 PM, Mark Hamstra > > wrote:
>> 
>> The v2.1.0 tag is there: https://github.com/apache/spark/tree/v2.1.0 
>> 
>> 
>> On Wed, Dec 28, 2016 at 2:04 PM, Koert Kuipers > > wrote:
>> seems like the artifacts are on maven central but the website is not yet 
>> updated.
>> 
>> strangely the tag v2.1.0 is not yet available on github. i assume its equal 
>> to v2.1.0-rc5
>> 
>> On Fri, Dec 23, 2016 at 10:52 AM, Justin Miller 
>> > wrote:
>> I'm curious about this as well. Seems like the vote passed.
>> 
>> > On Dec 23, 2016, at 2:00 AM, Aseem Bansal > > > wrote:
>> >
>> >
>> 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 
>> 
>> 
>> 
> 
> 



Re: Is there any scheduled release date for Spark 2.1.0?

2016-12-28 Thread Mark Hamstra
A SNAPSHOT build is not a stable artifact, but rather floats to the top of
commits that are intended for the next release.  So, 2.1.1-SNAPSHOT comes
after the 2.1.0 release and contains any code at the time that the artifact
was built that was committed to the branch-2.1 maintenance branch and is,
therefore, intended for the eventual 2.1.1 maintenance release.  Once a
release is tagged and stable artifacts for it can be built, there is no
purpose for s SNAPSHOT of that release -- e.g. there is no longer any
purpose for a 2.1.0-SNAPSHOT release; if you want 2.1.0, then you should be
using stable artifacts now, not SNAPSHOTs.

The existence of a SNAPSHOT doesn't imply anything about the release date
of the associated finished version.  Rather, it only indicates a name that
is attached to all of the code that is currently intended for the
associated release number.

On Wed, Dec 28, 2016 at 3:09 PM, Justin Miller <
justin.mil...@protectwise.com> wrote:

> It looks like the jars for 2.1.0-SNAPSHOT are gone?
>
> https://repository.apache.org/content/groups/snapshots/org/
> apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/
>
> Also:
>
> 2.1.0-SNAPSHOT/
> 
>  Fri
> Dec 23 16:31:42 UTC 2016
> 2.1.1-SNAPSHOT/
> 
>  Wed
> Dec 28 20:01:10 UTC 2016
> 2.2.0-SNAPSHOT/
> 
>  Wed
> Dec 28 19:12:38 UTC 2016
>
> What's with 2.1.1-SNAPSHOT? Is that version about to be released as well?
>
> Thanks!
> Justin
>
> On Dec 28, 2016, at 12:53 PM, Mark Hamstra 
> wrote:
>
> The v2.1.0 tag is there: https://github.com/apache/spark/tree/v2.1.0
>
> On Wed, Dec 28, 2016 at 2:04 PM, Koert Kuipers  wrote:
>
>> seems like the artifacts are on maven central but the website is not yet
>> updated.
>>
>> strangely the tag v2.1.0 is not yet available on github. i assume its
>> equal to v2.1.0-rc5
>>
>> On Fri, Dec 23, 2016 at 10:52 AM, Justin Miller <
>> justin.mil...@protectwise.com> wrote:
>>
>>> I'm curious about this as well. Seems like the vote passed.
>>>
>>> > On Dec 23, 2016, at 2:00 AM, Aseem Bansal 
>>> wrote:
>>> >
>>> >
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>
>


Re: Is there any scheduled release date for Spark 2.1.0?

2016-12-28 Thread Koert Kuipers
ah yes you are right. i must not have fetched correctly earlier

On Wed, Dec 28, 2016 at 2:53 PM, Mark Hamstra 
wrote:

> The v2.1.0 tag is there: https://github.com/apache/spark/tree/v2.1.0
>
> On Wed, Dec 28, 2016 at 2:04 PM, Koert Kuipers  wrote:
>
>> seems like the artifacts are on maven central but the website is not yet
>> updated.
>>
>> strangely the tag v2.1.0 is not yet available on github. i assume its
>> equal to v2.1.0-rc5
>>
>> On Fri, Dec 23, 2016 at 10:52 AM, Justin Miller <
>> justin.mil...@protectwise.com> wrote:
>>
>>> I'm curious about this as well. Seems like the vote passed.
>>>
>>> > On Dec 23, 2016, at 2:00 AM, Aseem Bansal 
>>> wrote:
>>> >
>>> >
>>>
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Is there any scheduled release date for Spark 2.1.0?

2016-12-28 Thread Justin Miller
It looks like the jars for 2.1.0-SNAPSHOT are gone?

https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-sql_2.11/2.1.0-SNAPSHOT/
 


Also:

2.1.0-SNAPSHOT/ 

Fri Dec 23 16:31:42 UTC 2016
2.1.1-SNAPSHOT/ 

Wed Dec 28 20:01:10 UTC 2016
2.2.0-SNAPSHOT/ 

Wed Dec 28 19:12:38 UTC 2016 

What's with 2.1.1-SNAPSHOT? Is that version about to be released as well?

Thanks!
Justin

> On Dec 28, 2016, at 12:53 PM, Mark Hamstra  wrote:
> 
> The v2.1.0 tag is there: https://github.com/apache/spark/tree/v2.1.0 
> 
> 
> On Wed, Dec 28, 2016 at 2:04 PM, Koert Kuipers  > wrote:
> seems like the artifacts are on maven central but the website is not yet 
> updated.
> 
> strangely the tag v2.1.0 is not yet available on github. i assume its equal 
> to v2.1.0-rc5
> 
> On Fri, Dec 23, 2016 at 10:52 AM, Justin Miller 
> > wrote:
> I'm curious about this as well. Seems like the vote passed.
> 
> > On Dec 23, 2016, at 2:00 AM, Aseem Bansal  > > wrote:
> >
> >
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 
> 



Re: Is there any scheduled release date for Spark 2.1.0?

2016-12-28 Thread Mark Hamstra
The v2.1.0 tag is there: https://github.com/apache/spark/tree/v2.1.0

On Wed, Dec 28, 2016 at 2:04 PM, Koert Kuipers  wrote:

> seems like the artifacts are on maven central but the website is not yet
> updated.
>
> strangely the tag v2.1.0 is not yet available on github. i assume its
> equal to v2.1.0-rc5
>
> On Fri, Dec 23, 2016 at 10:52 AM, Justin Miller <
> justin.mil...@protectwise.com> wrote:
>
>> I'm curious about this as well. Seems like the vote passed.
>>
>> > On Dec 23, 2016, at 2:00 AM, Aseem Bansal  wrote:
>> >
>> >
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Spark/Mesos with GPU support

2016-12-28 Thread Ji Yan
Dear Spark Users,

Has anyone had successful experience running Spark on Mesos with GPU support? 
We have a Mesos cluster that can see and offer nvidia GPU resources. With 
Spark, it seems that the GPU support with Mesos 
(https://github.com/apache/spark/pull/14644 
) has only recently been merged 
into Spark Master which is not found in 2.0.2 release yet. We have a custom 
built Spark from 2.1-rc5 which is confirmed to have the above change. However 
when we try to run any code from Spark on this Mesos setup, the spark program 
hangs and keeps saying

“WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your 
cluster UI to ensure that workers are registered and have sufficient resources”

We are pretty sure that the cluster has enough resources as there is nothing 
running on it. If we disable the GPU support in configuration and restart mesos 
and retry the same program, it would work.

Any comment/advice on this greatly appreciated

Thanks,
Ji


-- 
 

The information in this email is confidential and may be legally 
privileged. It is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be 
taken in reliance on it, is prohibited and may be unlawful.


Re: [PySpark - 1.6] - Avoid object serialization

2016-12-28 Thread Chawla,Sumit
Would this work for you?

def processRDD(rdd):
analyzer = ShortTextAnalyzer(root_dir)
rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1]))

ssc.union(*streams).filter(lambda x: x[1] != None)
.foreachRDD(lambda rdd: processRDD(rdd))



Regards
Sumit Chawla


On Wed, Dec 28, 2016 at 7:57 AM, Sidney Feiner 
wrote:

> Hey,
>
> I just posted this question on Stack Overflow (link here
> )
> and decided to try my luck here as well J
>
>
>
> I'm writing a PySpark job but I got into some performance issues.
> Basically, all it does is read events from Kafka and logs the
> transformations made. Thing is, the transformation is calculated based on
> an object's function, and that object is pretty heavy as it contains a
> Graph and an inner-cache which gets automatically updated as it processes
> rdd's. So when I write the following piece of code:
>
> analyzer = ShortTextAnalyzer(root_dir)
>
> logger.info("Start analyzing the documents from kafka")
>
> ssc.union(*streams).filter(lambda x: x[1] != None).foreachRDD(lambda rdd: 
> rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1])))
>
>
>
> It serializes my analyzer which takes a lot of time because of the graph,
> and as it is copied to the executor, the cache is only relevant for that
> specific RDD.
>
> If the job was written in Scala, I could have written an Object which
> would exist in every executor and then my object wouldn't have to be
> serialized each time.
>
> I've read in a post (http://www.spark.tc/deserialization-in-pyspark-
> storage/) that prior to PySpark 2.0, objects are always serialized. So
> does that mean that I have no way to avoid the serialization?
>
> I'd love to hear about a way to avoid serialization in PySpark if it
> exists. To have my object created once for each executor and then it could
> avoid the serialization process, gain time and actually have a working
> cache system?
>
> Thanks in advance :)
>
> *Sidney Feiner*   */*  SW Developer
>
> M: +972.528197720 <+972%2052-819-7720>  */*  Skype: sidney.feiner.startapp
>
>
>
> [image: StartApp] 
>
>
>


Re: Is there any scheduled release date for Spark 2.1.0?

2016-12-28 Thread Koert Kuipers
seems like the artifacts are on maven central but the website is not yet
updated.

strangely the tag v2.1.0 is not yet available on github. i assume its equal
to v2.1.0-rc5

On Fri, Dec 23, 2016 at 10:52 AM, Justin Miller <
justin.mil...@protectwise.com> wrote:

> I'm curious about this as well. Seems like the vote passed.
>
> > On Dec 23, 2016, at 2:00 AM, Aseem Bansal  wrote:
> >
> >
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Error: at sqlContext.createDataFrame with RDD and Schema

2016-12-28 Thread Chetan Khatri
Resolved above error by creating SparkSession

val spark = SparkSession.builder().appName("Hbase - Spark
POC").getOrCreate()

Error after:

spark.sql("SELECT * FROM student").show()

But while doing show() action on Dataframe throws below error:

scala> sqlContext.sql("select * from student").show()
16/12/28 21:04:23 ERROR executor.Executor: Exception in task 0.0 in stage
2.0 (TID 2)
java.lang.RuntimeException: Error while encoding:
java.lang.RuntimeException: java.lang.Integer is not a valid external type
for schema of string
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, Rowid),
StringType), true) AS Rowid#35
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, Rowid),
StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 0
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, Rowid),
StringType), true)
  +- validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, Rowid),
StringType)
 +- getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 0, Rowid)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top
level row object)
   +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, maths),
StringType), true) AS maths#36
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, maths),
StringType), true)
   :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt
   :  :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object)
   :  :  +- input[0, org.apache.spark.sql.Row, true]
   :  +- 1
   :- null
   +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String,
StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, maths),
StringType), true)
  +- validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, maths),
StringType)
 +- getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 1, maths)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top
level row object)
   +- input[0, org.apache.spark.sql.Row, true]

if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row
object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 2, english),
StringType), true) AS english#37
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level
row object).isNullAt) null else staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString,
validateexternaltype(getexternalrowfield(assertnotnull(input[0,
org.apache.spark.sql.Row, true], top level row object), 2, english),
StringType), true)

Kindly help, unable to check with error that what exactly is.

Thanks.,


On Wed, Dec 28, 2016 at 9:00 PM, Chetan Khatri 
wrote:

> Hello Spark Community,
>
> I am reading HBase table from Spark and getting RDD but now i wants to
> convert RDD of Spark Rows and want to convert to DF.
>
> *Source Code:*
>
> bin/spark-shell --packages 
> it.nerdammer.bigdata:spark-hbase-connector_2.10:1.0.3
> --conf spark.hbase.host=127.0.0.1
>
> import it.nerdammer.spark.hbase._
> import org.apache.spark.{SparkConf, SparkContext}

[PySpark - 1.6] - Avoid object serialization

2016-12-28 Thread Sidney Feiner
Hey,
I just posted this question on Stack Overflow (link 
here)
 and decided to try my luck here as well :)


I'm writing a PySpark job but I got into some performance issues. Basically, 
all it does is read events from Kafka and logs the transformations made. Thing 
is, the transformation is calculated based on an object's function, and that 
object is pretty heavy as it contains a Graph and an inner-cache which gets 
automatically updated as it processes rdd's. So when I write the following 
piece of code:

analyzer = ShortTextAnalyzer(root_dir)

logger.info("Start analyzing the documents from kafka")

ssc.union(*streams).filter(lambda x: x[1] != None).foreachRDD(lambda rdd: 
rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1])))



It serializes my analyzer which takes a lot of time because of the graph, and 
as it is copied to the executor, the cache is only relevant for that specific 
RDD.

If the job was written in Scala, I could have written an Object which would 
exist in every executor and then my object wouldn't have to be serialized each 
time.

I've read in a post (http://www.spark.tc/deserialization-in-pyspark-storage/) 
that prior to PySpark 2.0, objects are always serialized. So does that mean 
that I have no way to avoid the serialization?

I'd love to hear about a way to avoid serialization in PySpark if it exists. To 
have my object created once for each executor and then it could avoid the 
serialization process, gain time and actually have a working cache system?

Thanks in advance :)
Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]



Error: at sqlContext.createDataFrame with RDD and Schema

2016-12-28 Thread Chetan Khatri
Hello Spark Community,

I am reading HBase table from Spark and getting RDD but now i wants to
convert RDD of Spark Rows and want to convert to DF.

*Source Code:*

bin/spark-shell --packages
it.nerdammer.bigdata:spark-hbase-connector_2.10:1.0.3 --conf
spark.hbase.host=127.0.0.1

import it.nerdammer.spark.hbase._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType

val sparkConf = new SparkConf().setAppName("HBase Spark POC")

val sparkContext = new SparkContext(sparkConf)

val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)

val hBaseRDD = sc.hbaseTable[(Option[String], Option[Int], Option[Int],
Option[Int], Option[Int], Option[Int])]("university").select("maths",
"english","science","history","computer").inColumnFamily("school")

val rowRDD = hBaseRDD.map(i =>
Row(i._1.get,i._2.get,i._3.get,i._4.get,i._5.get,i._6.get))

val stdSchemaString= "Rowid,maths,english,science,history,computer"

val stdSchema= StructType(stdSchemaString.split(",").map(fieldName =>
StructField(fieldName, StringType, true)))

val stdDf = sqlContext.createDataFrame(rowRDD,stdSchema);

// Getting Error

stdDf.registerTempTable("student")

sqlContext.sql("select * from student").show()

*Error*

scala> val stdDf = sqlContext.createDataFrame(rowRDD,stdSchema);
16/12/28 20:50:59 ERROR metastore.RetryingHMSHandler:
AlreadyExistsException(message:Database default already exists)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:891)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
at com.sun.proxy.$Proxy21.create_database(Unknown Source)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createDatabase(HiveMetaStoreClient.java:644)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:156)
at com.sun.proxy.$Proxy22.createDatabase(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.createDatabase(Hive.java:306)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply$mcV$sp(HiveClientImpl.scala:309)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:309)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$createDatabase$1.apply(HiveClientImpl.scala:309)
at
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:280)
at
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
at
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
at
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:269)
at
org.apache.spark.sql.hive.client.HiveClientImpl.createDatabase(HiveClientImpl.scala:308)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply$mcV$sp(HiveExternalCatalog.scala:99)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply(HiveExternalCatalog.scala:99)
at
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$createDatabase$1.apply(HiveExternalCatalog.scala:99)
at
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:72)
at
org.apache.spark.sql.hive.HiveExternalCatalog.createDatabase(HiveExternalCatalog.scala:98)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:147)
at
org.apache.spark.sql.catalyst.catalog.SessionCatalog.(SessionCatalog.scala:89)
at
org.apache.spark.sql.hive.HiveSessionCatalog.(HiveSessionCatalog.scala:51)
at
org.apache.spark.sql.hive.HiveSessionState.catalog$lzycompute(HiveSessionState.scala:49)
at
org.apache.spark.sql.hive.HiveSessionState.catalog(HiveSessionState.scala:48)
at
org.apache.spark.sql.hive.HiveSessionState$$anon$1.(HiveSessionState.scala:63)
at
org.apache.spark.sql.hive.HiveSessionState.analyzer$lzycompute(HiveSessionState.scala:63)
at
org.apache.spark.sql.hive.HiveSessionState.analyzer(HiveSessionState.scala:62)
at
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:542)
at 

Re: [Spark 2.0.2 HDFS]: no data locality

2016-12-28 Thread Karamba

Good idea, thanks!

But unfortunately that's not possible. All containers are connected to
an overlay network.

Is there any other possiblity to say spark that it is on the same *NODE*
as an hdfs data node?


On 28.12.2016 12:00, Miguel Morales wrote:
> It might have to do with your container ips, it depends on your
> networking setup.  You might want to try host networking so that the
> containers share the ip with the host.
>
> On Wed, Dec 28, 2016 at 1:46 AM, Karamba  wrote:
>> Hi Sun Rui,
>>
>> thanks for answering!
>>
>>
>>> Although the Spark task scheduler is aware of rack-level data locality, it 
>>> seems that only YARN implements the support for it.
>> This explains why the script that I configured in core-site.xml
>> topology.script.file.name is not called in by the spark container.
>> But at time of reading from hdfs in a spark program, the script is
>> called in my hdfs namenode container.
>>
>>> However, node-level locality can still work for Standalone.
>> I have a couple of physical hosts that run spark and hdfs docker
>> containers. How does spark standalone knows that spark and docker
>> containers are on the same host?
>>
>>> Data Locality involves in both task data locality and executor data 
>>> locality. Executor data locality is only supported on YARN with executor 
>>> dynamic allocation enabled. For standalone, by default, a Spark application 
>>> will acquire all available cores in the cluster, generally meaning there is 
>>> at least one executor on each node, in which case task data locality can 
>>> work because a task can be dispatched to an executor on any of the 
>>> preferred nodes of the task for execution.
>>>
>>> for your case, have you set spark.cores.max to limit the cores to acquire, 
>>> which means executors are available on a subset of the cluster nodes?
>> I set "--total-executor-cores 1" in order to use only a small subset of
>> the cluster.
>>
>>
>>
>> On 28.12.2016 02:58, Sun Rui wrote:
>>> Although the Spark task scheduler is aware of rack-level data locality, it 
>>> seems that only YARN implements the support for it. However, node-level 
>>> locality can still work for Standalone.
>>>
>>> It is not necessary to copy the hadoop config files into the Spark CONF 
>>> directory. Set HADOOP_CONF_DIR to point to the conf directory of your 
>>> Hadoop.
>>>
>>> Data Locality involves in both task data locality and executor data 
>>> locality. Executor data locality is only supported on YARN with executor 
>>> dynamic allocation enabled. For standalone, by default, a Spark application 
>>> will acquire all available cores in the cluster, generally meaning there is 
>>> at least one executor on each node, in which case task data locality can 
>>> work because a task can be dispatched to an executor on any of the 
>>> preferred nodes of the task for execution.
>>>
>>> for your case, have you set spark.cores.max to limit the cores to acquire, 
>>> which means executors are available on a subset of the cluster nodes?
>>>
 On Dec 27, 2016, at 01:39, Karamba  wrote:

 Hi,

 I am running a couple of docker hosts, each with an HDFS and a spark
 worker in a spark standalone cluster.
 In order to get data locality awareness, I would like to configure Racks
 for each host, so that a spark worker container knows from which hdfs
 node container it should load its data. Does this make sense?

 I configured HDFS container nodes via the core-site.xml in
 $HADOOP_HOME/etc and this works. hdfs dfsadmin -printTopology shows my
 setup.

 I configured SPARK the same way. I placed core-site.xml and
 hdfs-site.xml in the SPARK_CONF_DIR ... BUT this has no effect.

 Submitting a spark job via spark-submit to the spark-master that loads
 from HDFS just has Data locality ANY.

 It would be great if anybody would help me getting the right configuration!

 Thanks and best regards,
 on

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org

>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Getting values list per partition

2016-12-28 Thread Rohit Verma
Hi

I am trying something like


final Dataset df = 
spark.read().csv("src/main/resources/star2000.csv").select("_c1").as(Encoders.STRING());
  final Dataset arrayListDataset = df.mapPartitions(new 
MapPartitionsFunction() {
@Override
public Iterator call(Iterator iterator) throws 
Exception {
ArrayList s = new ArrayList<>();
iterator.forEachRemaining(it -> s.add(it));
return Iterators.singletonIterator(s);
}
}, Encoders.javaSerialization(ArrayList.class));
  JavaEsSparkSQL.saveToEs(arrayListDataset,"spark/docs");

Is there a better/performant way of building arrayListDataset above.

Rohit


Re: Apache Hive with Spark Configuration

2016-12-28 Thread Gourav Sengupta
Hi,
I think that you can configure the hive metastore versions in SPARK.


Regards,
Gourav

On Wed, Dec 28, 2016 at 12:22 PM, Chetan Khatri  wrote:

> Hello Users / Developers,
>
> I am using Hive 2.0.1 with MySql as a Metastore, can you tell me which
> version is more compatible with Spark 2.0.2 ?
>
> THanks
>


Apache Hive with Spark Configuration

2016-12-28 Thread Chetan Khatri
Hello Users / Developers,

I am using Hive 2.0.1 with MySql as a Metastore, can you tell me which
version is more compatible with Spark 2.0.2 ?

THanks


Re: [Spark 2.0.2 HDFS]: no data locality

2016-12-28 Thread Miguel Morales
It might have to do with your container ips, it depends on your
networking setup.  You might want to try host networking so that the
containers share the ip with the host.

On Wed, Dec 28, 2016 at 1:46 AM, Karamba  wrote:
>
> Hi Sun Rui,
>
> thanks for answering!
>
>
>> Although the Spark task scheduler is aware of rack-level data locality, it 
>> seems that only YARN implements the support for it.
>
> This explains why the script that I configured in core-site.xml
> topology.script.file.name is not called in by the spark container.
> But at time of reading from hdfs in a spark program, the script is
> called in my hdfs namenode container.
>
>> However, node-level locality can still work for Standalone.
>
> I have a couple of physical hosts that run spark and hdfs docker
> containers. How does spark standalone knows that spark and docker
> containers are on the same host?
>
>> Data Locality involves in both task data locality and executor data 
>> locality. Executor data locality is only supported on YARN with executor 
>> dynamic allocation enabled. For standalone, by default, a Spark application 
>> will acquire all available cores in the cluster, generally meaning there is 
>> at least one executor on each node, in which case task data locality can 
>> work because a task can be dispatched to an executor on any of the preferred 
>> nodes of the task for execution.
>>
>> for your case, have you set spark.cores.max to limit the cores to acquire, 
>> which means executors are available on a subset of the cluster nodes?
> I set "--total-executor-cores 1" in order to use only a small subset of
> the cluster.
>
>
>
> On 28.12.2016 02:58, Sun Rui wrote:
>> Although the Spark task scheduler is aware of rack-level data locality, it 
>> seems that only YARN implements the support for it. However, node-level 
>> locality can still work for Standalone.
>>
>> It is not necessary to copy the hadoop config files into the Spark CONF 
>> directory. Set HADOOP_CONF_DIR to point to the conf directory of your Hadoop.
>>
>> Data Locality involves in both task data locality and executor data 
>> locality. Executor data locality is only supported on YARN with executor 
>> dynamic allocation enabled. For standalone, by default, a Spark application 
>> will acquire all available cores in the cluster, generally meaning there is 
>> at least one executor on each node, in which case task data locality can 
>> work because a task can be dispatched to an executor on any of the preferred 
>> nodes of the task for execution.
>>
>> for your case, have you set spark.cores.max to limit the cores to acquire, 
>> which means executors are available on a subset of the cluster nodes?
>>
>>> On Dec 27, 2016, at 01:39, Karamba  wrote:
>>>
>>> Hi,
>>>
>>> I am running a couple of docker hosts, each with an HDFS and a spark
>>> worker in a spark standalone cluster.
>>> In order to get data locality awareness, I would like to configure Racks
>>> for each host, so that a spark worker container knows from which hdfs
>>> node container it should load its data. Does this make sense?
>>>
>>> I configured HDFS container nodes via the core-site.xml in
>>> $HADOOP_HOME/etc and this works. hdfs dfsadmin -printTopology shows my
>>> setup.
>>>
>>> I configured SPARK the same way. I placed core-site.xml and
>>> hdfs-site.xml in the SPARK_CONF_DIR ... BUT this has no effect.
>>>
>>> Submitting a spark job via spark-submit to the spark-master that loads
>>> from HDFS just has Data locality ANY.
>>>
>>> It would be great if anybody would help me getting the right configuration!
>>>
>>> Thanks and best regards,
>>> on
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: how to integrate Apache Kafka with spark ?

2016-12-28 Thread Tushar Adeshara
Please see below links depending on version of Spark


2.x  http://spark.apache.org/docs/latest/streaming-kafka-integration.html

Spark Streaming + Kafka Integration Guide - Spark 2.0.2 
...
spark.apache.org
Spark Streaming + Kafka Integration Guide. Apache Kafka is publish-subscribe 
messaging rethought as a distributed, partitioned, replicated commit log 
service.


1.6.x https://spark.apache.org/docs/1.6.3/streaming-kafka-integration.html


Regards,
Tushar Adeshara
Technical Specialist - Analytics Practice
Persistent Systems Ltd. | Partners in Innovation | 
www.persistentsys.com



From: sathyanarayanan mudhaliyar 
Sent: 28 December 2016 12:27
To: user@spark.apache.org
Subject: how to integrate Apache Kafka with spark ?

How do I take input from Apache Kafka into Apache Spark Streaming for stream 
processing ?

-sathya

DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.



Re: [Spark 2.0.2 HDFS]: no data locality

2016-12-28 Thread Karamba

Hi Sun Rui,

thanks for answering!


> Although the Spark task scheduler is aware of rack-level data locality, it 
> seems that only YARN implements the support for it. 

This explains why the script that I configured in core-site.xml
topology.script.file.name is not called in by the spark container.
But at time of reading from hdfs in a spark program, the script is
called in my hdfs namenode container.

> However, node-level locality can still work for Standalone.

I have a couple of physical hosts that run spark and hdfs docker
containers. How does spark standalone knows that spark and docker
containers are on the same host?

> Data Locality involves in both task data locality and executor data locality. 
> Executor data locality is only supported on YARN with executor dynamic 
> allocation enabled. For standalone, by default, a Spark application will 
> acquire all available cores in the cluster, generally meaning there is at 
> least one executor on each node, in which case task data locality can work 
> because a task can be dispatched to an executor on any of the preferred nodes 
> of the task for execution.
>
> for your case, have you set spark.cores.max to limit the cores to acquire, 
> which means executors are available on a subset of the cluster nodes?
I set "--total-executor-cores 1" in order to use only a small subset of
the cluster.



On 28.12.2016 02:58, Sun Rui wrote:
> Although the Spark task scheduler is aware of rack-level data locality, it 
> seems that only YARN implements the support for it. However, node-level 
> locality can still work for Standalone.
>
> It is not necessary to copy the hadoop config files into the Spark CONF 
> directory. Set HADOOP_CONF_DIR to point to the conf directory of your Hadoop.
>
> Data Locality involves in both task data locality and executor data locality. 
> Executor data locality is only supported on YARN with executor dynamic 
> allocation enabled. For standalone, by default, a Spark application will 
> acquire all available cores in the cluster, generally meaning there is at 
> least one executor on each node, in which case task data locality can work 
> because a task can be dispatched to an executor on any of the preferred nodes 
> of the task for execution.
>
> for your case, have you set spark.cores.max to limit the cores to acquire, 
> which means executors are available on a subset of the cluster nodes?
>
>> On Dec 27, 2016, at 01:39, Karamba  wrote:
>>
>> Hi,
>>
>> I am running a couple of docker hosts, each with an HDFS and a spark
>> worker in a spark standalone cluster.
>> In order to get data locality awareness, I would like to configure Racks
>> for each host, so that a spark worker container knows from which hdfs
>> node container it should load its data. Does this make sense?
>>
>> I configured HDFS container nodes via the core-site.xml in
>> $HADOOP_HOME/etc and this works. hdfs dfsadmin -printTopology shows my
>> setup.
>>
>> I configured SPARK the same way. I placed core-site.xml and
>> hdfs-site.xml in the SPARK_CONF_DIR ... BUT this has no effect.
>>
>> Submitting a spark job via spark-submit to the spark-master that loads
>> from HDFS just has Data locality ANY.
>>
>> It would be great if anybody would help me getting the right configuration!
>>
>> Thanks and best regards,
>> on
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Dataframe: Save to hdfs is taking long time

2016-12-28 Thread Raju Bairishetti
Try setting num partitions to (number of executors * number of cores) while
writing to dest location.

You should be very very careful while setting num partitions as incorrect
number may lead to shuffle.

On Fri, Dec 16, 2016 at 12:56 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> I am trying to save the files as Paraquet.
>
> On Thu, Dec 15, 2016 at 10:41 PM, Felix Cheung 
> wrote:
>
>> What is the format?
>>
>>
>> --
>> *From:* KhajaAsmath Mohammed 
>> *Sent:* Thursday, December 15, 2016 7:54:27 PM
>> *To:* user @spark
>> *Subject:* Spark Dataframe: Save to hdfs is taking long time
>>
>> Hi,
>>
>> I am using issue while saving the dataframe back to HDFS. It's taking
>> long time to run.
>>
>> val results_dataframe = sqlContext.sql("select gt.*,ct.* from 
>> PredictTempTable pt,ClusterTempTable ct,GamificationTempTable gt where 
>> gt.vin=pt.vin and pt.cluster=ct.cluster")
>> results_dataframe.coalesce(numPartitions)
>> results_dataframe.persist(StorageLevel.MEMORY_AND_DISK)
>>
>> dataFrame.write.mode(saveMode).format(format)
>>   .option(Codec, compressCodec) //"org.apache.hadoop.io.compress.snappyCodec"
>>   .save(outputPath)
>>
>> It was taking long time and total number of records for  this dataframe is 
>> 4903764
>>
>> I even increased number of partitions from 10 to 20, still no luck. Can 
>> anyone help me in resolving this performance issue
>>
>> Thanks,
>>
>> Asmath
>>
>>
>


-- 

--
Thanks,
Raju Bairishetti,
www.lazada.com