Re: SparkSQL: map type MatchError when inserting into Hive table

2014-09-26 Thread Cheng Lian
Would you mind to provide the DDL of this partitioned table together 
with the query you tried? The stacktrace suggests that the query was 
trying to cast a map into something else, which is not supported in 
Spark SQL. And I doubt whether Hive support casting a complex type to 
some other type.


On 9/27/14 7:48 AM, Du Li wrote:

Hi,

I was loading data into a partitioned table on Spark 1.1.0
beeline-thriftserver. The table has complex data types such as map and array>. The query is like ³insert overwrite
table a partition (Š) select Š² and the select clause worked if run
separately. However, when running the insert query, there was an error as
follows.

The source code of Cast.scala seems to only handle the primitive data
types, which is perhaps why the MatchError was thrown.

I just wonder if this is still work in progress, or I should do it
differently.

Thanks,
Du



scala.MatchError: MapType(StringType,StringType,true) (of class
org.apache.spark.sql.catalyst.types.MapType)

org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:2
47)
 org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247)
 org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263)

org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala
:84)

org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl
y(Projection.scala:66)

org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl
y(Projection.scala:50)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sq
l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sca
la:149)

org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive
File$1.apply(InsertIntoHiveTable.scala:158)

org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive
File$1.apply(InsertIntoHiveTable.scala:158)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
615)
 java.lang.Thread.run(Thread.java:722)






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: problem with HiveContext inside Actor

2014-09-26 Thread Cheng Lian
This fix is reasonable, since the actual constructor gets called is 
|Driver()| rather than |Driver(HiveConf)|. The former initializes the 
|conf| field by:


|conf = SessionState.get().getConf()
|

And |SessionState.get()| reads a TSS value. Thus executing SQL queries 
within another thread causes NPE since the |Driver| is created in a 
thread different from the one |HiveContext| (and the contained 
|SessionState|) gets constructed.


On 9/19/14 3:31 AM, Du Li wrote:


I have figured it out.

As shown in the code below, if the HiveContext hc were created in the 
actor object and used to create db in response to message, it would 
throw null pointer exception. This is fixed by creating the 
HiveContext inside the MyActor class instead. I also tested the code 
by replacing Actor with Thread. The problem and fix are similar.


Du

——
abstract class MyMessage
case object CreateDB extends MyMessage

object MyActor {
  def init(_sc: SparkContext) = {
if( actorSystem == null || actorRef == null ) {
  actorSystem = ActorSystem(“root")
  actorRef = actorSystem.actorOf(Props(new MyActor(_sc)), “myactor")
}
//hc = new MyHiveContext(_sc)
  }

  def !(m: MyMessage) {
actorRef ! m
  }

  //var hc: MyHiveContext = _
  private var actorSystem: ActorSystem = null
  private var actorRef: ActorRef = null
}

class MyActor(sc: SparkContext) extends Actor {
  val hc = new MyHiveContext(sc)
  def receive: Receiver = {
case CreateDB => hc.createDB()
  }
}

class MyHiveContext(sc: SparkContext) extends HiveContext(sc) {
  def createDB() {...}
}


From:  "Chester @work" >

Date:  Thursday, September 18, 2014 at 7:17 AM
To:  Du Li >
Cc:  Michael Armbrust >, "Cheng, Hao" >, "user@spark.apache.org 
" >

Subject:  Re: problem with HiveContext inside Actor


Akka actor are managed under a thread pool, so the same actor can be 
under different thread.


If you create HiveContext in the actor, is it possible that you are 
essentially create different instance of HiveContext ?


Sent from my iPhone

On Sep 17, 2014, at 10:14 PM, Du Li > wrote:




Thanks for your reply.

Michael: No. I only create one HiveContext in the code.

Hao: Yes. I subclass HiveContext and defines own function to create 
database and then subclass akka Actor to call that function in 
response to an abstract message. By your suggestion, I called 
println(sessionState.getConf.getAllProperties) that printed
tons of properties; however, the same NullPointerException was still 
thrown.


As mentioned, the weird thing is that everything worked fine if I 
simply called actor.hiveContext.createDB() directly. But it throws the 
null pointer exception from Driver.java if I do "actor ! 
CreateSomeDB”, which seems to me just the same thing because

the actor does nothing but call createDB().

Du





From: Michael Armbrust >

Date: Wednesday, September 17, 2014 at 7:40 PM
To: "Cheng, Hao" mailto:hao.ch...@intel.com>>
Cc: Du Li >, "user@spark.apache.org 
" >

Subject: Re: problem with HiveContext inside Actor


- dev

Is it possible that you are constructing more than one HiveContext in 
a single JVM?  Due to global state in Hive code this is not allowed.


Michael


On Wed, Sep 17, 2014 at 7:21 PM, Cheng, Hao
mailto:hao.ch...@intel.com>> wrote:

Hi, Du
I am not sure what you mean “triggers the HiveContext to create a 
database”, do you create the sub class
of HiveContext? Just be sure you call the “HiveContext.sessionState” 
eagerly, since it will set the proper “hiveconf” into the 
SessionState, otherwise the HiveDriver will always get the null value 
when retrieving HiveConf.

Cheng Hao
From: Du Li [mailto:l...@yahoo-inc.com.INVALID]

Sent: Thursday, September 18, 2014 7:51 AM
To: user@spark.apache.org ;
d...@spark.apache.org 
Subject: problem with HiveContext inside Actor


Hi,


Wonder anybody had similar experience or any suggestion here.


I have an akka Actor that processes database requests in high-level 
messages. Inside this Actor, it creates a HiveContext object that does the
actual db work. The main thread creates the needed SparkContext and 
passes in to the Actor to create the HiveContext.



When a message is sent to the Actor, it is processed properly except 
that, when the message triggers the HiveContext to create a database, it
throws a NullPointerException in hive.ql.Driver.java which suggests 
that its conf variable is not initialized.



Ironically, it works fine if my main thread directly calls 
actor.hiveContext to create the database. The spark version is 1.1.0.



Thanks,

Du











​


Re: SparkSQL: map type MatchError when inserting into Hive table

2014-09-26 Thread Cheng Lian
Would you mind to provide the DDL of this partitioned table together 
with the query you tried? The stacktrace suggests that the query was 
trying to cast a map into something else, which is not supported in 
Spark SQL. And I doubt whether Hive support casting a complex type to 
some other type.


On 9/27/14 7:48 AM, Du Li wrote:

Hi,

I was loading data into a partitioned table on Spark 1.1.0
beeline-thriftserver. The table has complex data types such as map and array>. The query is like ³insert overwrite
table a partition (Š) select Š² and the select clause worked if run
separately. However, when running the insert query, there was an error as
follows.

The source code of Cast.scala seems to only handle the primitive data
types, which is perhaps why the MatchError was thrown.

I just wonder if this is still work in progress, or I should do it
differently.

Thanks,
Du



scala.MatchError: MapType(StringType,StringType,true) (of class
org.apache.spark.sql.catalyst.types.MapType)
 
org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:2

47)
 org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247)
 org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263)
 
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala

:84)
 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl

y(Projection.scala:66)
 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl

y(Projection.scala:50)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sq

l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sca
la:149)
 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive

File$1.apply(InsertIntoHiveTable.scala:158)
 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive

File$1.apply(InsertIntoHiveTable.scala:158)
 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
 org.apache.spark.scheduler.Task.run(Task.scala:54)
 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1

145)
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:

615)
 java.lang.Thread.run(Thread.java:722)






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: flume spark streaming receiver host random

2014-09-26 Thread centerqi hu
the receiver is not running on the machine I expect



2014-09-26 14:09 GMT+08:00 Sean Owen :
> I think you may be missing a key word here. Are you saying that the machine
> has multiple interfaces and it is not using the one you expect or the
> receiver is not running on the machine you expect?

-- 
cente...@gmail.com|齐忠

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL: map type MatchError when inserting into Hive table

2014-09-26 Thread Du Li

It might be a problem when inserting into a partitioned table. It worked
fine to when the target table was unpartitioned.

Can you confirm this?

Thanks,
Du



On 9/26/14, 4:48 PM, "Du Li"  wrote:

>Hi,
>
>I was loading data into a partitioned table on Spark 1.1.0
>beeline-thriftserver. The table has complex data types such as mapstring> and array>. The query is like ³insert overwrite
>table a partition (Š) select Š² and the select clause worked if run
>separately. However, when running the insert query, there was an error as
>follows.
>
>The source code of Cast.scala seems to only handle the primitive data
>types, which is perhaps why the MatchError was thrown.
>
>I just wonder if this is still work in progress, or I should do it
>differently.
>
>Thanks,
>Du
>
>
>
>scala.MatchError: MapType(StringType,StringType,true) (of class
>org.apache.spark.sql.catalyst.types.MapType)
>
>org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:
>2
>47)
>
>org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247)
>
>org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263)
>
>org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scal
>a
>:84)
>
>org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.app
>l
>y(Projection.scala:66)
>
>org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.app
>l
>y(Projection.scala:50)
>scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>
>org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$s
>q
>l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sc
>a
>la:149)
>
>org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiv
>e
>File$1.apply(InsertIntoHiveTable.scala:158)
>
>org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiv
>e
>File$1.apply(InsertIntoHiveTable.scala:158)
>org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>org.apache.spark.scheduler.Task.run(Task.scala:54)
>
>org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>
>java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:
>1
>145)
>
>java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java
>:
>615)
>java.lang.Thread.run(Thread.java:722)
>
>
>
>
>
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>



SparkSQL: map type MatchError when inserting into Hive table

2014-09-26 Thread Du Li
Hi,

I was loading data into a partitioned table on Spark 1.1.0
beeline-thriftserver. The table has complex data types such as map and array>. The query is like ³insert overwrite
table a partition (Š) select Š² and the select clause worked if run
separately. However, when running the insert query, there was an error as
follows.

The source code of Cast.scala seems to only handle the primitive data
types, which is perhaps why the MatchError was thrown.

I just wonder if this is still work in progress, or I should do it
differently.

Thanks,
Du



scala.MatchError: MapType(StringType,StringType,true) (of class
org.apache.spark.sql.catalyst.types.MapType)

org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:2
47)
org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247)
org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263)

org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala
:84)

org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl
y(Projection.scala:66)

org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.appl
y(Projection.scala:50)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sq
l$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.sca
la:149)

org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive
File$1.apply(InsertIntoHiveTable.scala:158)

org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHive
File$1.apply(InsertIntoHiveTable.scala:158)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1
145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:
615)
java.lang.Thread.run(Thread.java:722)






-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found

2014-09-26 Thread Andy Davidson
Many many thanks

Andy

From:  Nicholas Chammas 
Date:  Friday, September 26, 2014 at 11:24 AM
To:  Andrew Davidson 
Cc:  Davies Liu , "user@spark.apache.org"

Subject:  Re: problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line
magic function `%matplotlib` not found

> Are you able to use the regular PySpark shell on your EC2 cluster? That would
> be the first thing to confirm is working.
> 
> I don’t know whether the version of Python on the cluster would affect whether
> IPython works or not, but if you want to try manually upgrading Python on a
> cluster launched by spark-ec2, there are some instructions in the comments
> here   for doing so.
> 
> Nick
> 
> ​
> 
> On Fri, Sep 26, 2014 at 2:18 PM, Andy Davidson 
> wrote:
>> Hi Davies
>> 
>> The real issue is about cluster management. I am new to the spark world and
>> am not a system administrator.  It seem like the problem is with the
>> spark-ec2 launch script. It is installing  old version of python
>> 
>> In the mean time I am trying to figure out how I can manually install the
>> correct version on all the machines in my cluster
>> 
>> Thanks
>> 
>> Andy
>> 
>> From:  Davies Liu 
>> Date:  Thursday, September 25, 2014 at 9:58 PM
>> To:  Andrew Davidson 
>> Cc:  "user@spark.apache.org" 
>> Subject:  Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found
>> 
>>> Maybe you have Python 2.7 on master but Python 2.6 in cluster,
>>> you should upgrade python to 2.7 in cluster, or use python 2.6 in
>>> master by set PYSPARK_PYTHON=python2.6
>>> 
>>> On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson
>>>  wrote:
  Hi
 
  I am running into trouble using iPython notebook on my cluster. Use the
  following command to set the cluster up
 
  $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE
  --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME
 
 
  On master I launch python as follows
 
  $ IPYTHON_OPTS="notebook --pylab inline --no-browser --port=7000"
  $SPARK_HOME/bin/pyspark
 
 
  It looks like the problem is the cluster is using an old version of python
  and python. Any idea how I can easily upgrade ? The following version
 works
  on my mac
 
  Thanks
 
  Andy
 
  {'commit_hash': '681fd77',
   'commit_source': 'installation',
   'default_encoding': 'UTF-8',
   'ipython_path': '/Library/Python/2.7/site-packages/IPython',
   'ipython_version': '2.1.0',
   'os_name': 'posix',
   'platform': 'Darwin-13.3.0-x86_64-i386-64bit',
   'sys_executable': '/usr/bin/python',
   'sys_platform': 'darwin',
   'sys_version': '2.7.5 (default, Mar  9 2014, 22:15:05) \n[GCC 4.2.1
  Compatible Apple LLVM 5.0 (clang-500.0.68)]’}
 
 
 
 
>>> 
> 




Communication between threads within a worker

2014-09-26 Thread lokesh.gidra
Hello,

Can someone please explain me how the various threads within a single worker
(and hence a single JVM instance) communicate with each other. I mean how do
they send intermediate data/RDDs to each other? Is it done through network?

Please also point me to the location in source code where I can look at the
relevant code.


Thanks,
Lokesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Communication-between-threads-within-a-worker-tp15262.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Spark SQL question: is cached SchemaRDD storage controlled by "spark.storage.memoryFraction"?

2014-09-26 Thread Liquan Pei
-- Forwarded message --
From: Liquan Pei 
Date: Fri, Sep 26, 2014 at 1:33 AM
Subject: Re: Spark SQL question: is cached SchemaRDD storage controlled by
"spark.storage.memoryFraction"?
To: Haopu Wang 


Hi Haopu,

Internally, cactheTable on a schemaRDD is implemented as a cache() on a
MapPartitionsRDD. As memory reserved for caching RDDs is controlled by
spark.storage.memoryFraction,
memory storage of cached schemaRDD is controlled by
spark.storage.memoryFraction.

Hope this helps!
Liquan

On Fri, Sep 26, 2014 at 1:04 AM, Haopu Wang  wrote:

> Hi, I'm querying a big table using Spark SQL. I see very long GC time in
> some stages. I wonder if I can improve it by tuning the storage
> parameter.
>
> The question is: the schemaRDD has been cached with "cacheTable()"
> function. So is the cached schemaRDD part of memory storage controlled
> by the "spark.storage.memoryFraction" parameter?
>
> Thanks!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst



-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


SF Scala: Spark and Machine Learning Videos

2014-09-26 Thread Alexy Khrabrov
Folks -- we're happy to share the videos of Spark talks made at SF
Scala meetup (sfscala.org) and Scala By the Bay conference
(scalabythebay.org).  We thank Databricks for presenting and also
sponsoring the first talk video, which was a joint event with SF Bay
Area Machine Learning meetup.


9/22/2014 -- SF Scala and SF Bay Area Machine Learning, Joseph
Bradley: Decision Trees on Spark

http://functional.tv/post/98342564544/sfscala-sfbaml-joseph-bradley-decision-trees-on-spark


8/9/2014 -- Scala By the Bay, Matei Zaharia: Next-Generation Languages
meet Next-Generation Big Data: Leveraging Scala in Spark

http://functional.tv/post/9769999/scala-by-the-bay2014-matei-zaharia-next-generation-langu


8/8/2014 -- Scala By the Bay, Tathagata Das, Large scale, real-time
stream processing using Spark Streaming

http://functional.tv/post/97739069219/scala-by-the-bay-2014-tathagata-das-large-scale-real-tim


Functional.TV has all of the Scala By the Bay and SF Scala talks, and
we publish them within days of our events.

If you have a great talk on Scala and Spark, let us at SF Scala know
(reply to me with the idea).

Enjoy!
A+

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.NegativeArraySizeException in pyspark

2014-09-26 Thread Brad Miller
> What is the error? Could you file a JIRA for it?

Turns out there's actually 3 separate errors (indicated below), one of
which **silently returns the wrong value to the user*.*  Should I file a
separate JIRA for each one?  What level should I mark these as (critical,
major, etc.)?

I'm not sure that all of these are bugs as much as feature requests since
it looks like the design of FramedSerializer includes some size constraints
(https://github.com/apache/spark/blob/master/python/pyspark/serializers.py
"Serializer that writes objects as a stream of (length, data) pairs, where
C{length} is a 32-bit integer and data is C{length} bytes.").

Attempting to reproduce the bug in isolation in iPython notebook I've
observed the following. Note that I'm running python 2.7.3 on all machines
and using the Spark 1.1.0 binaries.

**BLOCK 1**  [no problem]
import cPickle
from pyspark import SparkContext

def check_pre_serialized(size):
msg = cPickle.dumps(range(2 ** size))
print 'serialized length:', len(msg)
bvar = sc.broadcast(msg)
print 'length recovered from broadcast variable:', len(bvar.value)
print 'correct value recovered:', msg == bvar.value
bvar.unpersist()

def check_unserialized(size):
msg = range(2 ** size)
bvar = sc.broadcast(msg)
print 'correct value recovered:', msg == bvar.value
bvar.unpersist()

SparkContext.setSystemProperty('spark.executor.memory', '15g')
SparkContext.setSystemProperty('spark.cores.max', '5')
sc = SparkContext('spark://crosby.research.intel-research.net:7077',
'broadcast_bug')

**BLOCK 2**  [no problem]
check_pre_serialized(20)
> serialized length: 9374656
> length recovered from broadcast variable: 9374656
> correct value recovered: True

**BLOCK 3**  [no problem]
check_unserialized(20)
> correct value recovered: True

**BLOCK 4**  [no problem]
check_pre_serialized(27)
> serialized length: 1499501632
> length recovered from broadcast variable: 1499501632
> correct value recovered: True

**BLOCK 5**  [no problem]
check_unserialized(27)
> correct value recovered: True

***BLOCK 6**  [ERROR 1: unhandled error from cPickle.dumps inside
sc.broadcast]*
check_pre_serialized(28)
.
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
> 354
> 355 def dumps(self, obj):
> --> 356 return cPickle.dumps(obj, 2)
> 357
> 358 loads = cPickle.loads
>
> SystemError: error return without exception set

**BLOCK 7**  [no problem]
check_unserialized(28)
> correct value recovered: True

***BLOCK 8**  [ERROR 2: no error occurs and *incorrect result* is returned]*
check_pre_serialized(29)
> serialized length: 6331339840
> length recovered from broadcast variable: 2036372544
> correct value recovered: False

***BLOCK 9**  [ERROR 3: unhandled error from zlib.compress inside
sc.broadcast]*
check_unserialized(29)
..
> /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
> 418
> 419 def dumps(self, obj):
> --> 420 return zlib.compress(self.serializer.dumps(obj), 1)
> 421
> 422 def loads(self, obj):
>
> OverflowError: size does not fit in an int

***BLOCK 10**  [ERROR 1]*
check_pre_serialized(30)
...same as above...

***BLOCK 11**  [ERROR 3]*
check_unserialized(30)
...same as above...

On Thu, Sep 25, 2014 at 2:55 PM, Davies Liu  wrote:
>
> On Thu, Sep 25, 2014 at 11:25 AM, Brad Miller
>  wrote:
> > Hi Davies,
> >
> > Thanks for your help.
> >
> > I ultimately re-wrote the code to use broadcast variables, and then
received
> > an error when trying to broadcast self.all_models that the size did not
fit
> > in an int (recall that broadcasts use 32 bit ints to store size),
>
> What is the error? Could you file a JIRA for it?
>
> > that it was in fact over 2G.  I don't know why the previous tests
(described
> > above) where duplicated portions of self.all_models worked (it could
have
> > been an error in either my debugging or notes), but splitting the
> > self.all_models into a separate broadcast variable for each element
worked.
> > I avoided broadcast variables for a while since there was no way to
> > unpersist them in pyspark, but now that there is you're completely right
> > that using broadcast is the correct way to code this.
>
> In 1.1, you could use broadcast.unpersist() to release it, also the
performance
> of Python Broadcast was much improved in 1.1.
>
>
> > best,
> > -Brad
> >
> > On Tue, Sep 23, 2014 at 12:16 PM, Davies Liu 
wrote:
> >>
> >> Or maybe there is a bug related to the base64 in py4j, could you
> >> dumps the serialized bytes of closure to verify this?
> >>
> >> You could add a line in spark/python/pyspark/rdd.py:
> >>
> >> ser = CloudPickleSerializer()
> >> pickled_command = ser.dumps(command)
> >> +  print len(pickled_command), repr(pickled_command)
> >>
> >>
> >> On Tue, Sep 23, 2014 at 11:02 AM, Brad Miller
> >>  wrote:
> >> > Hi Davies,
> >> >
> >> > That's interesting to know.  Here's more details about my code.  The
> 

Re: problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found

2014-09-26 Thread Nicholas Chammas
Are you able to use the regular PySpark shell on your EC2 cluster? That
would be the first thing to confirm is working.

I don’t know whether the version of Python on the cluster would affect
whether IPython works or not, but if you want to try manually upgrading
Python on a cluster launched by spark-ec2, there are some instructions in
the comments here  for
doing so.

Nick
​

On Fri, Sep 26, 2014 at 2:18 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Davies
>
> The real issue is about cluster management. I am new to the spark world
> and am not a system administrator.  It seem like the problem is with the
> spark-ec2 launch script. It is installing  old version of python
>
> In the mean time I am trying to figure out how I can manually install the
> correct version on all the machines in my cluster
>
> Thanks
>
> Andy
>
> From: Davies Liu 
> Date: Thursday, September 25, 2014 at 9:58 PM
> To: Andrew Davidson 
> Cc: "user@spark.apache.org" 
> Subject: Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found
>
> Maybe you have Python 2.7 on master but Python 2.6 in cluster,
> you should upgrade python to 2.7 in cluster, or use python 2.6 in
> master by set PYSPARK_PYTHON=python2.6
>
> On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson
>  wrote:
>
> Hi
>
> I am running into trouble using iPython notebook on my cluster. Use the
> following command to set the cluster up
>
> $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE
> --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME
>
>
> On master I launch python as follows
>
> $ IPYTHON_OPTS="notebook --pylab inline --no-browser --port=7000"
> $SPARK_HOME/bin/pyspark
>
>
> It looks like the problem is the cluster is using an old version of python
> and python. Any idea how I can easily upgrade ? The following version works
> on my mac
>
> Thanks
>
> Andy
>
> {'commit_hash': '681fd77',
>   'commit_source': 'installation',
>   'default_encoding': 'UTF-8',
>   'ipython_path': '/Library/Python/2.7/site-packages/IPython',
>   'ipython_version': '2.1.0',
>   'os_name': 'posix',
>   'platform': 'Darwin-13.3.0-x86_64-i386-64bit',
>   'sys_executable': '/usr/bin/python',
>   'sys_platform': 'darwin',
>   'sys_version': '2.7.5 (default, Mar  9 2014, 22:15:05) \n[GCC 4.2.1
> Compatible Apple LLVM 5.0 (clang-500.0.68)]’}
>
>
>
>
>
>


problem with spark-ec2 launch script Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found

2014-09-26 Thread Andy Davidson
Hi Davies

The real issue is about cluster management. I am new to the spark world and
am not a system administrator.  It seem like the problem is with the
spark-ec2 launch script. It is installing  old version of python

In the mean time I am trying to figure out how I can manually install the
correct version on all the machines in my cluster

Thanks

Andy

From:  Davies Liu 
Date:  Thursday, September 25, 2014 at 9:58 PM
To:  Andrew Davidson 
Cc:  "user@spark.apache.org" 
Subject:  Re: spark-ec2 ERROR: Line magic function `%matplotlib` not found

> Maybe you have Python 2.7 on master but Python 2.6 in cluster,
> you should upgrade python to 2.7 in cluster, or use python 2.6 in
> master by set PYSPARK_PYTHON=python2.6
> 
> On Thu, Sep 25, 2014 at 5:11 PM, Andy Davidson
>  wrote:
>>  Hi
>> 
>>  I am running into trouble using iPython notebook on my cluster. Use the
>>  following command to set the cluster up
>> 
>>  $ ./spark-ec2 --key-pair=$KEY_PAIR --identity-file=$KEY_FILE
>>  --region=$REGION --slaves=$NUM_SLAVES launch $CLUSTER_NAME
>> 
>> 
>>  On master I launch python as follows
>> 
>>  $ IPYTHON_OPTS="notebook --pylab inline --no-browser --port=7000"
>>  $SPARK_HOME/bin/pyspark
>> 
>> 
>>  It looks like the problem is the cluster is using an old version of python
>>  and python. Any idea how I can easily upgrade ? The following version works
>>  on my mac
>> 
>>  Thanks
>> 
>>  Andy
>> 
>>  {'commit_hash': '681fd77',
>>   'commit_source': 'installation',
>>   'default_encoding': 'UTF-8',
>>   'ipython_path': '/Library/Python/2.7/site-packages/IPython',
>>   'ipython_version': '2.1.0',
>>   'os_name': 'posix',
>>   'platform': 'Darwin-13.3.0-x86_64-i386-64bit',
>>   'sys_executable': '/usr/bin/python',
>>   'sys_platform': 'darwin',
>>   'sys_version': '2.7.5 (default, Mar  9 2014, 22:15:05) \n[GCC 4.2.1
>>  Compatible Apple LLVM 5.0 (clang-500.0.68)]¹}
>> 
>> 
>> 
>> 
> 




Re: java.io.IOException Error in task deserialization

2014-09-26 Thread Brad Miller
FWIW I suspect that each count operation is an opportunity for you to
trigger the bug, and each filter operation increases the likelihood of
setting up the bug.  I normally don't come across this error until my job
has been running for an hour or two and had a chance to build up longer
lineages for some RDDs.  It sounds like your data is a bit smaller and it's
more feasible for you to build up longer lineages more quickly.

If you can reduce your number of filter operations (for example by
combining some into a single function) that may help.  It may also help to
introduce persistence or checkpointing at intermediate stages so that the
length of the lineages that have to get replayed isn't as long.

On Fri, Sep 26, 2014 at 11:10 AM, Arun Ahuja  wrote:

> No for me as well it is non-deterministic.  It happens in a piece of code
> that does many filter and counts on a small set of records (~1k-10k).  The
> originally set is persisted in memory and we have a Kryo serializer set for
> it.  The task itself takes in just a few filtering parameters.  This with
> the same setting has sometimes completed to sucess and sometimes failed
> during this step.
>
> Arun
>
> On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller 
> wrote:
>
>> I've had multiple jobs crash due to "java.io.IOException: unexpected
>> exception type"; I've been running the 1.1 branch for some time and am now
>> running the 1.1 release binaries. Note that I only use PySpark. I haven't
>> kept detailed notes or the tracebacks around since there are other problems
>> that have caused my greater grief (namely "key not found" errors).
>>
>> For me the exception seems to occur non-deterministically, which is a bit
>> interesting since the error message shows that the same stage has failed
>> multiple times.  Are you able to consistently re-produce the bug across
>> multiple invocations at the same place?
>>
>> On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja  wrote:
>>
>>> Has anyone else seen this erorr in task deserialization?  The task is
>>> processing a small amount of data and doesn't seem to have much data
>>> hanging to the closure?  I've only seen this with Spark 1.1
>>>
>>> Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, 
>>> most recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): 
>>> java.io.IOException: unexpected exception type
>>> 
>>> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
>>> 
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
>>> 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>>> 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>> 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>> 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>> 
>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>>> 
>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>>> 
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
>>> 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> java.lang.Thread.run(Thread.java:744)
>>>
>>>
>>
>


Re: java.io.IOException Error in task deserialization

2014-09-26 Thread Arun Ahuja
No for me as well it is non-deterministic.  It happens in a piece of code
that does many filter and counts on a small set of records (~1k-10k).  The
originally set is persisted in memory and we have a Kryo serializer set for
it.  The task itself takes in just a few filtering parameters.  This with
the same setting has sometimes completed to sucess and sometimes failed
during this step.

Arun

On Fri, Sep 26, 2014 at 1:32 PM, Brad Miller 
wrote:

> I've had multiple jobs crash due to "java.io.IOException: unexpected
> exception type"; I've been running the 1.1 branch for some time and am now
> running the 1.1 release binaries. Note that I only use PySpark. I haven't
> kept detailed notes or the tracebacks around since there are other problems
> that have caused my greater grief (namely "key not found" errors).
>
> For me the exception seems to occur non-deterministically, which is a bit
> interesting since the error message shows that the same stage has failed
> multiple times.  Are you able to consistently re-produce the bug across
> multiple invocations at the same place?
>
> On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja  wrote:
>
>> Has anyone else seen this erorr in task deserialization?  The task is
>> processing a small amount of data and doesn't seem to have much data
>> hanging to the closure?  I've only seen this with Spark 1.1
>>
>> Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most 
>> recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): 
>> java.io.IOException: unexpected exception type
>> 
>> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
>> 
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>> 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> 
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>> 
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> 
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>> 
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
>> 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> java.lang.Thread.run(Thread.java:744)
>>
>>
>


Re: How to do operations on multiple RDD's

2014-09-26 Thread Daniel Siegmann
There are numerous ways to combine RDDs. In your case, it seems you have
several RDDs of the same type and you want to do an operation across all of
them as if they were a single RDD. The way to do this is SparkContext.union
or RDD.union, which have minimal overhead. The only difference between
these is the latter allows you to only union two at a time (but of course
you can just call reduce on your sequence to union them all).

Keep in mind this won't repartition anything, so if you find you have too
many partitions after the union you could use RDD.coalesce to merge them.

On Fri, Sep 26, 2014 at 11:55 AM, Johan Stenberg 
wrote:

> Hi,
>
> This is my first post to the email list so give me some feedback if I do
> something wrong.
>
> To do operations on two RDD's to produce a new one you can just use
> zipPartitions, but if I have an arbitrary number of RDD's that I would like
> to perform an operation on to produce a single RDD, how do I do that? I've
> been reading the docs but haven't found anything.
>
> For example: if I have a Seq of RDD[Array[Int]]'s and I want to take the
> majority of each array cell. So if all RDD's have one array which are like
> this:
>
> [1, 2, 3]
> [0, 0, 0]
> [1, 2, 0]
>
> Then the resulting RDD would have the array [1, 2, 0]. How do I approach
> this problem? It becomes too heavy to have an accumulator variable I guess?
> Otherwise it could be an array of maps with values as keys and frequency as
> values.
>
> Essentially I want something like zipPartitions but for arbitrarily many
> RDD's, is there any such functionality or how would I approach this problem?
>
> Cheers,
>
> Johan
>



-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: Build error when using spark with breeze

2014-09-26 Thread Xiangrui Meng
We removed commons-math3 from dependencies to avoid version conflict
with hadoop-common. hadoop-common-2.3+ depends on commons-math3-3.1.1,
while breeze depends on commons-math3-3.3. 3.3 is not backward
compatible with 3.1.1. So we removed it because the breeze functions
we use do not touch commons-math3 code. As Sean suggested, please
include breeze in the dependency set of your project. Do not rely on
transitive dependencies. -Xiangrui

On Fri, Sep 26, 2014 at 9:08 AM, Jaonary Rabarisoa  wrote:
> I solve the problem by including the commons-math3 package in my sbt
> dependencies as Sean suggested. Thanks.
>
> On Fri, Sep 26, 2014 at 6:05 PM, Ted Yu  wrote:
>>
>> You can use scope of runtime.
>>
>> See
>> http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope
>>
>> Cheers
>>
>> On Fri, Sep 26, 2014 at 8:57 AM, Jaonary Rabarisoa 
>> wrote:
>>>
>>> Thank Ted. Can you tell me how to adjust the scope ?
>>>
>>> On Fri, Sep 26, 2014 at 5:47 PM, Ted Yu  wrote:

 spark-core's dependency on commons-math3 is @ test scope (core/pom.xml):
 
   org.apache.commons
   commons-math3
   3.3
   test
 

 Adjusting the scope should solve the problem below.

 On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa 
 wrote:
>
> Hi all,
>
> I'm using some functions from Breeze in a spark job but I get the
> following build error :
>
> Error:scalac: bad symbolic reference. A signature in RandBasis.class
> refers to term math3
> in package org.apache.commons which is not available.
> It may be completely missing from the current classpath, or the version
> on
> the classpath might be incompatible with the version used when
> compiling RandBasis.class.
>
> In my case, I just declare a new Gaussian distribution
>
> val g = new Gaussian(0d,1d)
>
> I'm using spark 1.1
>
>
> Any ideas to fix this ?
>
>
> Best regards,
>
>
> Jao


>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.io.IOException Error in task deserialization

2014-09-26 Thread Brad Miller
I've had multiple jobs crash due to "java.io.IOException: unexpected
exception type"; I've been running the 1.1 branch for some time and am now
running the 1.1 release binaries. Note that I only use PySpark. I haven't
kept detailed notes or the tracebacks around since there are other problems
that have caused my greater grief (namely "key not found" errors).

For me the exception seems to occur non-deterministically, which is a bit
interesting since the error message shows that the same stage has failed
multiple times.  Are you able to consistently re-produce the bug across
multiple invocations at the same place?

On Fri, Sep 26, 2014 at 6:11 AM, Arun Ahuja  wrote:

> Has anyone else seen this erorr in task deserialization?  The task is
> processing a small amount of data and doesn't seem to have much data
> hanging to the closure?  I've only seen this with Spark 1.1
>
> Job aborted due to stage failure: Task 975 in stage 8.0 failed 4 times, most 
> recent failure: Lost task 975.3 in stage 8.0 (TID 24777, host.com): 
> java.io.IOException: unexpected exception type
> 
> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
> 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
> 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
> 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> 
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> 
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
> 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:744)
>
>


Re: how to run spark job on yarn with jni lib?

2014-09-26 Thread Marcelo Vanzin
I assume you did those things in all machines, not just on the machine
launching the job?

I've seen that workaround used successfully (well, actually, they
copied the library to /usr/lib or something, but same idea).

On Thu, Sep 25, 2014 at 7:45 PM, taqilabon  wrote:
> You're right, I'm suffering from SPARK-1719.
> I've tried to add their location to /etc/ld.so.conf and I've submitted my
> job as a yarn-client,
> but the problem is the same: my native libraries are not loaded.
> Does this method work in your case?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-spark-job-on-yarn-with-jni-lib-tp15146p15195.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark-ec2 script with Tachyon

2014-09-26 Thread mrm
Hi,

Did you manage to figure this out? I would appreciate if you could share the
answer.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-script-with-Tachyon-tp9996p15249.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Build error when using spark with breeze

2014-09-26 Thread Jaonary Rabarisoa
I solve the problem by including the commons-math3 package in my sbt
dependencies as Sean suggested. Thanks.

On Fri, Sep 26, 2014 at 6:05 PM, Ted Yu  wrote:

> You can use scope of runtime.
>
> See
> http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope
>
> Cheers
>
> On Fri, Sep 26, 2014 at 8:57 AM, Jaonary Rabarisoa 
> wrote:
>
>> Thank Ted. Can you tell me how to adjust the scope ?
>>
>> On Fri, Sep 26, 2014 at 5:47 PM, Ted Yu  wrote:
>>
>>> spark-core's dependency on commons-math3 is @ test scope (core/pom.xml):
>>> 
>>>   org.apache.commons
>>>   commons-math3
>>>   3.3
>>>   test
>>> 
>>>
>>> Adjusting the scope should solve the problem below.
>>>
>>> On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa 
>>> wrote:
>>>
 Hi all,

 I'm using some functions from Breeze in a spark job but I get the
 following build error :

 *Error:scalac: bad symbolic reference. A signature in RandBasis.class
 refers to term math3*
 *in package org.apache.commons which is not available.*
 *It may be completely missing from the current classpath, or the
 version on*
 *the classpath might be incompatible with the version used when
 compiling RandBasis.class.*

 In my case, I just declare a new Gaussian distribution

 *val g = new Gaussian(0d,1d)*

 I'm using spark 1.1


 Any ideas to fix this ?


 Best regards,


 Jao

>>>
>>>
>>
>


Re: Build error when using spark with breeze

2014-09-26 Thread Ted Yu
You can use scope of runtime.

See
http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope

Cheers

On Fri, Sep 26, 2014 at 8:57 AM, Jaonary Rabarisoa 
wrote:

> Thank Ted. Can you tell me how to adjust the scope ?
>
> On Fri, Sep 26, 2014 at 5:47 PM, Ted Yu  wrote:
>
>> spark-core's dependency on commons-math3 is @ test scope (core/pom.xml):
>> 
>>   org.apache.commons
>>   commons-math3
>>   3.3
>>   test
>> 
>>
>> Adjusting the scope should solve the problem below.
>>
>> On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa 
>> wrote:
>>
>>> Hi all,
>>>
>>> I'm using some functions from Breeze in a spark job but I get the
>>> following build error :
>>>
>>> *Error:scalac: bad symbolic reference. A signature in RandBasis.class
>>> refers to term math3*
>>> *in package org.apache.commons which is not available.*
>>> *It may be completely missing from the current classpath, or the version
>>> on*
>>> *the classpath might be incompatible with the version used when
>>> compiling RandBasis.class.*
>>>
>>> In my case, I just declare a new Gaussian distribution
>>>
>>> *val g = new Gaussian(0d,1d)*
>>>
>>> I'm using spark 1.1
>>>
>>>
>>> Any ideas to fix this ?
>>>
>>>
>>> Best regards,
>>>
>>>
>>> Jao
>>>
>>
>>
>


Re: Build error when using spark with breeze

2014-09-26 Thread Sean Owen
Shouldn't the user's application depend on commons-math3 if it uses
it? it shouldn't require a Spark change. Maybe I misunderstand.

On Fri, Sep 26, 2014 at 4:47 PM, Ted Yu  wrote:
> spark-core's dependency on commons-math3 is @ test scope (core/pom.xml):
> 
>   org.apache.commons
>   commons-math3
>   3.3
>   test
> 
>
> Adjusting the scope should solve the problem below.
>
> On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa 
> wrote:
>>
>> Hi all,
>>
>> I'm using some functions from Breeze in a spark job but I get the
>> following build error :
>>
>> Error:scalac: bad symbolic reference. A signature in RandBasis.class
>> refers to term math3
>> in package org.apache.commons which is not available.
>> It may be completely missing from the current classpath, or the version on
>> the classpath might be incompatible with the version used when compiling
>> RandBasis.class.
>>
>> In my case, I just declare a new Gaussian distribution
>>
>> val g = new Gaussian(0d,1d)
>>
>> I'm using spark 1.1
>>
>>
>> Any ideas to fix this ?
>>
>>
>> Best regards,
>>
>>
>> Jao
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is it possible to use Parquet with Dremel encoding

2014-09-26 Thread Frank Austin Nothaft
Matthes,

Ah, gotcha! Repeated items in Parquet seem to correspond to the ArrayType in 
Spark-SQL. I only use Spark, but it does looks like that should be supported in 
Spark-SQL 1.1.0. I’m not sure though if you can apply predicates on repeated 
items from Spark-SQL.

Regards,

Frank Austin Nothaft
fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edu
202-340-0466

On Sep 26, 2014, at 8:48 AM, matthes  wrote:

> Hi Frank,
> 
> thanks al lot for your response, this is a very helpful!
> 
> Actually I'm try to figure out does the current spark version supports
> Repetition levels
> (https://blog.twitter.com/2013/dremel-made-simple-with-parquet) but now it
> looks good to me.
> It is very hard to find some good things about that. Now I found this as
> well: 
> https://git-wip-us.apache.org/repos/asf?p=spark.git;a=blob;f=sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala;h=1dc58633a2a68cd910c1bab01c3d5ee1eb4f8709;hb=f479cf37
> 
> I wasn't sure of that because nested data can be many different things!
> If it works with SQL, to find the firstRepeatedid or secoundRepeatedid would
> be awesome. But if it only works with kind of map/reduce job than it also
> good. The most important thing is to filter the first or secound  repeated
> value as fast as possible and in combination as well.
> I start now to play with this things to get the best search results!
> 
> Me schema looks like this:
> 
> val nestedSchema =
>"""message nestedRowSchema 
> {
> int32 firstRepeatedid;
> repeated group level1
> {
>   int64 secoundRepeatedid;
>   repeated group level2 
> {
>   int64   value1;
>   int32   value2;
> }
> }
>   }
>"""
> 
> Best,
> Matthes
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186p15239.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Build error when using spark with breeze

2014-09-26 Thread Jaonary Rabarisoa
Thank Ted. Can you tell me how to adjust the scope ?

On Fri, Sep 26, 2014 at 5:47 PM, Ted Yu  wrote:

> spark-core's dependency on commons-math3 is @ test scope (core/pom.xml):
> 
>   org.apache.commons
>   commons-math3
>   3.3
>   test
> 
>
> Adjusting the scope should solve the problem below.
>
> On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa 
> wrote:
>
>> Hi all,
>>
>> I'm using some functions from Breeze in a spark job but I get the
>> following build error :
>>
>> *Error:scalac: bad symbolic reference. A signature in RandBasis.class
>> refers to term math3*
>> *in package org.apache.commons which is not available.*
>> *It may be completely missing from the current classpath, or the version
>> on*
>> *the classpath might be incompatible with the version used when compiling
>> RandBasis.class.*
>>
>> In my case, I just declare a new Gaussian distribution
>>
>> *val g = new Gaussian(0d,1d)*
>>
>> I'm using spark 1.1
>>
>>
>> Any ideas to fix this ?
>>
>>
>> Best regards,
>>
>>
>> Jao
>>
>
>


How to do operations on multiple RDD's

2014-09-26 Thread Johan Stenberg
Hi,

This is my first post to the email list so give me some feedback if I do
something wrong.

To do operations on two RDD's to produce a new one you can just use
zipPartitions, but if I have an arbitrary number of RDD's that I would like
to perform an operation on to produce a single RDD, how do I do that? I've
been reading the docs but haven't found anything.

For example: if I have a Seq of RDD[Array[Int]]'s and I want to take the
majority of each array cell. So if all RDD's have one array which are like
this:

[1, 2, 3]
[0, 0, 0]
[1, 2, 0]

Then the resulting RDD would have the array [1, 2, 0]. How do I approach
this problem? It becomes too heavy to have an accumulator variable I guess?
Otherwise it could be an array of maps with values as keys and frequency as
values.

Essentially I want something like zipPartitions but for arbitrarily many
RDD's, is there any such functionality or how would I approach this problem?

Cheers,

Johan


Re: Is it possible to use Parquet with Dremel encoding

2014-09-26 Thread matthes
Hi Frank,

thanks al lot for your response, this is a very helpful!

Actually I'm try to figure out does the current spark version supports
Repetition levels
(https://blog.twitter.com/2013/dremel-made-simple-with-parquet) but now it
looks good to me.
It is very hard to find some good things about that. Now I found this as
well: 
https://git-wip-us.apache.org/repos/asf?p=spark.git;a=blob;f=sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTestData.scala;h=1dc58633a2a68cd910c1bab01c3d5ee1eb4f8709;hb=f479cf37

I wasn't sure of that because nested data can be many different things!
If it works with SQL, to find the firstRepeatedid or secoundRepeatedid would
be awesome. But if it only works with kind of map/reduce job than it also
good. The most important thing is to filter the first or secound  repeated
value as fast as possible and in combination as well.
I start now to play with this things to get the best search results!

Me schema looks like this:

val nestedSchema =
"""message nestedRowSchema 
{
  int32 firstRepeatedid;
  repeated group level1
  {
int64 secoundRepeatedid;
repeated group level2 
  {
int64   value1;
int32   value2;
  }
  }
}
"""

Best,
Matthes



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186p15239.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Build error when using spark with breeze

2014-09-26 Thread Ted Yu
spark-core's dependency on commons-math3 is @ test scope (core/pom.xml):

  org.apache.commons
  commons-math3
  3.3
  test


Adjusting the scope should solve the problem below.

On Fri, Sep 26, 2014 at 8:42 AM, Jaonary Rabarisoa 
wrote:

> Hi all,
>
> I'm using some functions from Breeze in a spark job but I get the
> following build error :
>
> *Error:scalac: bad symbolic reference. A signature in RandBasis.class
> refers to term math3*
> *in package org.apache.commons which is not available.*
> *It may be completely missing from the current classpath, or the version
> on*
> *the classpath might be incompatible with the version used when compiling
> RandBasis.class.*
>
> In my case, I just declare a new Gaussian distribution
>
> *val g = new Gaussian(0d,1d)*
>
> I'm using spark 1.1
>
>
> Any ideas to fix this ?
>
>
> Best regards,
>
>
> Jao
>


Re: rsync problem

2014-09-26 Thread rapelly kartheek
Hi,

This is the command I am using for submitting my application, SimpleApp:

./bin/spark-submit --class org.apache.spark.examples.SimpleApp
--deploy-mode client --master spark://karthik:7077
$SPARK_HOME/examples/*/scala-*/spark-examples-*.jar /text-data


On Thu, Sep 25, 2014 at 6:52 AM, Tobias Pfeiffer  wrote:

> Hi,
>
> I assume you unintentionally did not reply to the list, so I'm adding it
> back to CC.
>
> How do you submit your job to the cluster?
>
> Tobias
>
>
> On Thu, Sep 25, 2014 at 2:21 AM, rapelly kartheek  > wrote:
>
>> How do I find out whether a node in the cluster is a master or slave??
>> Till now I was thinking that slaves file under the conf folder makes the
>> difference. Also, the MASTER_MASTER_IP in the spark-env.sh file.
>>
>> what else differentiates a slave from the master??
>>
>> On Wed, Sep 24, 2014 at 10:46 PM, rapelly kartheek <
>> kartheek.m...@gmail.com> wrote:
>>
>>> The job execution is taking place perfectly. Previously, all my print
>>> statements used to be stored in spark/work/*/stdout file. But, now after
>>> doing the rsync, I find that none of the prtint statements are getting
>>> reflected in the stdout file under work folder. But, when I go to the code,
>>> I find the statements in the code. But, they are not reflected into the
>>> stdout file as before.
>>>
>>> Can you please tell me where I went wrong.  All I want is to see my
>>> mofication in the code getting relected in output
>>> .
>>>
>>> On Wed, Sep 24, 2014 at 10:22 PM, rapelly kartheek <
>>> kartheek.m...@gmail.com> wrote:
>>>
 Hi,

 I have a very important and fundamental doubt: I have rsynced the
 entire spark folder from the master to all slaves in the cluster. When I
 execute a job, its working perfectly. But, when I rsync the entire spark
 folder of the master to all the slaves, is it not that I am sending the
 master configurations to all the slaves and making the slaves behave like
 master??

 First of all, is it correct to rsync the entire spark folder??
 But, if I change only one file, then how do I rsync it to all??

 On Fri, Sep 19, 2014 at 8:44 PM, rapelly kartheek <
 kartheek.m...@gmail.com> wrote:

> Thank you Soumya Simantha and Tobias. I've deleted the contents of the
> work folder in all the nodes.
> Now its working perfectly as it was before.
>
> Thank you
> Karthik
>
> On Fri, Sep 19, 2014 at 4:46 PM, Soumya Simanta <
> soumya.sima...@gmail.com> wrote:
>
>> One possible reason is maybe that the checkpointing directory
>> $SPARK_HOME/work is rsynced as well.
>> Try emptying the contents of the work folder on each node and try
>> again.
>>
>>
>>
>> On Fri, Sep 19, 2014 at 4:53 AM, rapelly kartheek <
>> kartheek.m...@gmail.com> wrote:
>>
>>> I
>>> * followed this command:rsync -avL --progress path/to/spark-1.0.0
>>> username@destinationhostname:*
>>>
>>>
>>> *path/to/destdirectory. Anyway, for now, I did it individually for
>>> each node.*
>>>
>>> I have copied to each node at a time individually using the above
>>> command. So, I guess the copying may not contain any mixture of files.
>>> Also, as of now, I am not facing any MethodNotFound exceptions. But, 
>>> there
>>> is no job execution taking place.
>>>
>>> After sometime, one by one, each goes down and the cluster shuts
>>> down.
>>>
>>> On Fri, Sep 19, 2014 at 2:15 PM, Tobias Pfeiffer 
>>> wrote:
>>>
 Hi,

 On Fri, Sep 19, 2014 at 5:17 PM, rapelly kartheek <
 kartheek.m...@gmail.com> wrote:

> > ,
>
> * you have copied a lot of files from various hosts to
> username@slave3:path*
> only from one node to all the other nodes...
>

 I don't think rsync can do that in one command as you described. My
 guess is that now you have a wild mixture of jar files all across your
 cluster which will lead to fancy exceptions like MethodNotFound etc.,
 that's maybe why your cluster is not working correctly.

 Tobias



>>>
>>
>

>>>
>>
>


Build error when using spark with breeze

2014-09-26 Thread Jaonary Rabarisoa
Hi all,

I'm using some functions from Breeze in a spark job but I get the following
build error :

*Error:scalac: bad symbolic reference. A signature in RandBasis.class
refers to term math3*
*in package org.apache.commons which is not available.*
*It may be completely missing from the current classpath, or the version on*
*the classpath might be incompatible with the version used when compiling
RandBasis.class.*

In my case, I just declare a new Gaussian distribution

*val g = new Gaussian(0d,1d)*

I'm using spark 1.1


Any ideas to fix this ?


Best regards,


Jao


Re: SPARK UI - Details post job processiong

2014-09-26 Thread Chester @work
I am working on a PR that allows one to send the same spark listener event 
message back to the application in yarn cluster mode. 

So far I have put this function in our application, our UI will receive and 
display the same spark job event message such as progress, job start, completed 
etc

Essentially, it establish a communication channel , you can send over progress, 
messages and detailed exceptions from spark job inside yarn to your 
application, on you application side , you can display , or log, make use it in 
other ways. 

You can send send message to the running spark job via the channel. 

I will cleanup the code and send PR soon

Chester
Alpine Data Lab

Sent from my iPhone

> On Sep 26, 2014, at 7:38 AM, Matt Narrell  wrote:
> 
> Yes, I’m running Hadoop’s Timeline server that does this for the YARN/Hadoop 
> logs (and works very nicely btw).  Are you saying I can do the same for the 
> SparkUI as well?  Also, where do I set these Spark configurations since this 
> will be executed inside a YARN container?  On the “client” machine via 
> spark-env.sh?  Do I pass these as command line arguments to spark-submit?  Do 
> I set them explicitly on my SparkConf?
> 
> Thanks in advance.
> 
> mn
> 
>> On Sep 25, 2014, at 9:13 PM, Andrew Ash  wrote:
>> 
>> Matt you should be able to set an HDFS path so you'll get logs written to a 
>> unified place instead of to local disk on a random box on the cluster.
>> 
>>> On Thu, Sep 25, 2014 at 1:38 PM, Matt Narrell  
>>> wrote:
>>> How does this work with a cluster manager like YARN?
>>> 
>>> mn
>>> 
 On Sep 25, 2014, at 2:23 PM, Andrew Or  wrote:
 
 Hi Harsha,
 
 You can turn on `spark.eventLog.enabled` as documented here: 
 http://spark.apache.org/docs/latest/monitoring.html. Then, if you are 
 running standalone mode, you can access the finished SparkUI through the 
 Master UI. Otherwise, you can start a HistoryServer to display finished 
 UIs.
 
 -Andrew
 
 2014-09-25 12:55 GMT-07:00 Harsha HN <99harsha.h@gmail.com>:
> Hi,
> 
> Details laid out in Spark UI for the job in progress is really 
> interesting and very useful. 
> But this gets vanished once the job is done. 
> Is there a way to get job details post processing? 
> 
> Looking for Spark UI data, not standard input,output and error info.
> 
> Thanks,
> Harsha
> 


Re: Is it possible to use Parquet with Dremel encoding

2014-09-26 Thread Frank Austin Nothaft
Hi Matthes,

Can you post an example of your schema? When you refer to nesting, are you 
referring to optional columns, nested schemas, or tables where there are 
repeated values? Parquet uses run-length encoding to compress down columns with 
repeated values, which is the case that your example seems to refer to. The 
point Matt is making in his post is that if you have a Parquet files with 
contain records with a nested schema, e.g.:

record MyNestedSchema {
  int nestedSchemaField;
}

record MySchema {
  int nonNestedField;
  MyNestedSchema nestedRecord;
}

Not all systems support queries against these schemas. If you want to load the 
data directly into Spark, it isn’t an issue. I’m not familiar with how SparkSQL 
is handling this, but I believe the bit you quoted is saying that support for 
nested queries (e.g., select ... from … where nestedRecord.nestedSchemaField == 
0) will be added in Spark 1.0.1 (which is currently available, BTW).

Regards,

Frank Austin Nothaft
fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edu
202-340-0466

On Sep 26, 2014, at 7:38 AM, matthes  wrote:

> Thank you Jey,
> 
> That is a nice introduction but it is a may be to old (AUG 21ST, 2013)
> 
> "Note: If you keep the schema flat (without nesting), the Parquet files you
> create can be read by systems like Shark and Impala. These systems allow you
> to query Parquet files as tables using SQL-like syntax. The Parquet files
> created by this sample application could easily be queried using Shark for
> example."
> 
> But in this post
> (http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-CaseClass-Parquet-failure-td8377.html)
> I found this: Nested parquet is not supported in 1.0, but is part of the
> upcoming 1.0.1 release.
> 
> So the question now is, can I use it in the benefit way of nested parquet
> files to find fast with sql or do I have to write a special map/reduce job
> to transform and find my data?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186p15234.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Is it possible to use Parquet with Dremel encoding

2014-09-26 Thread matthes
Thank you Jey,

That is a nice introduction but it is a may be to old (AUG 21ST, 2013)

"Note: If you keep the schema flat (without nesting), the Parquet files you
create can be read by systems like Shark and Impala. These systems allow you
to query Parquet files as tables using SQL-like syntax. The Parquet files
created by this sample application could easily be queried using Shark for
example."

But in this post
(http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Nested-CaseClass-Parquet-failure-td8377.html)
I found this: Nested parquet is not supported in 1.0, but is part of the
upcoming 1.0.1 release.

So the question now is, can I use it in the benefit way of nested parquet
files to find fast with sql or do I have to write a special map/reduce job
to transform and find my data?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-use-Parquet-with-Dremel-encoding-tp15186p15234.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SPARK UI - Details post job processiong

2014-09-26 Thread Matt Narrell
Yes, I’m running Hadoop’s Timeline server that does this for the YARN/Hadoop 
logs (and works very nicely btw).  Are you saying I can do the same for the 
SparkUI as well?  Also, where do I set these Spark configurations since this 
will be executed inside a YARN container?  On the “client” machine via 
spark-env.sh?  Do I pass these as command line arguments to spark-submit?  Do I 
set them explicitly on my SparkConf?

Thanks in advance.

mn

On Sep 25, 2014, at 9:13 PM, Andrew Ash  wrote:

> Matt you should be able to set an HDFS path so you'll get logs written to a 
> unified place instead of to local disk on a random box on the cluster.
> 
> On Thu, Sep 25, 2014 at 1:38 PM, Matt Narrell  wrote:
> How does this work with a cluster manager like YARN?
> 
> mn
> 
> On Sep 25, 2014, at 2:23 PM, Andrew Or  wrote:
> 
>> Hi Harsha,
>> 
>> You can turn on `spark.eventLog.enabled` as documented here: 
>> http://spark.apache.org/docs/latest/monitoring.html. Then, if you are 
>> running standalone mode, you can access the finished SparkUI through the 
>> Master UI. Otherwise, you can start a HistoryServer to display finished UIs.
>> 
>> -Andrew
>> 
>> 2014-09-25 12:55 GMT-07:00 Harsha HN <99harsha.h@gmail.com>:
>> Hi,
>> 
>> Details laid out in Spark UI for the job in progress is really interesting 
>> and very useful. 
>> But this gets vanished once the job is done. 
>> Is there a way to get job details post processing? 
>> 
>> Looking for Spark UI data, not standard input,output and error info.
>> 
>> Thanks,
>> Harsha
>> 
> 
> 



Re: mappartitions data size

2014-09-26 Thread Daniel Siegmann
Use RDD.repartition (see here:
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
).

On Fri, Sep 26, 2014 at 10:19 AM, jamborta  wrote:

> Hi all,
>
> I am using mappartitions to do some heavy computing on subsets of the data.
> I have a dataset with about 1m rows, running on a 32 core cluster.
> Unfortunately, is seems that mappartitions splits the data into two sets so
> it is only running on two cores.
>
> Is there a way to force it to split into smaller chunks?
>
> thanks,
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/mappartitions-data-size-tp15231.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


mappartitions data size

2014-09-26 Thread jamborta
Hi all,

I am using mappartitions to do some heavy computing on subsets of the data.
I have a dataset with about 1m rows, running on a 32 core cluster.
Unfortunately, is seems that mappartitions splits the data into two sets so
it is only running on two cores. 

Is there a way to force it to split into smaller chunks? 

thanks,




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mappartitions-data-size-tp15231.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL question: is cached SchemaRDD storage controlled by "spark.storage.memoryFraction"?

2014-09-26 Thread Cheng Lian
Yes it is. The in-memory storage used with |SchemaRDD| also uses 
|RDD.cache()| under the hood.


On 9/26/14 4:04 PM, Haopu Wang wrote:


Hi, I'm querying a big table using Spark SQL. I see very long GC time in
some stages. I wonder if I can improve it by tuning the storage
parameter.

The question is: the schemaRDD has been cached with "cacheTable()"
function. So is the cached schemaRDD part of memory storage controlled
by the "spark.storage.memoryFraction" parameter?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


​


Re: Access file name in map function

2014-09-26 Thread Cheng Lian
If the size of each file is small, you may try 
|SparkContext.wholeTextFiles|. Otherwise you can try something like this:


|val  filenames:  Seq[String] = ...
val  combined:  RDD[(String,String)] = filenames.map { name =>
  sc.textFile(name).map(line => name -> line)
}.reduce(_ ++ _)
|

On 9/26/14 6:45 PM, Shekhar Bansal wrote:


Hi
In one of our usecase, filename contains timestamp and we have to 
append it in the record for aggregation.

How can I access filename in map function?

Thanks!


​


java.io.IOException Error in task deserialization

2014-09-26 Thread Arun Ahuja
Has anyone else seen this erorr in task deserialization?  The task is
processing a small amount of data and doesn't seem to have much data
hanging to the closure?  I've only seen this with Spark 1.1

Job aborted due to stage failure: Task 975 in stage 8.0 failed 4
times, most recent failure: Lost task 975.3 in stage 8.0 (TID 24777,
host.com): java.io.IOException: unexpected exception type

java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1538)
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1025)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)


Re: Systematic error when re-starting Spark stream unless I delete all checkpoints

2014-09-26 Thread Svend Vanderveken
Hi all,

I apologise for re-posting this, I realise some mail systems are filtering
all the code samples from the original post.

I would greatly appreciate any pointer regarding, this issue basically
renders spark streaming not fault-tolerant for us.

Thanks in advance,

S



---

"
I experience spark streaming restart issues similar to what is discussed in
the 2 threads below (in which I failed to find a solution). Could anybody
let me know if anything is wrong in the way I start/stop or if this could
be a spark bug?

http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html

My stream reads a Kafka topic, does some processing involving an
updatStateByKey and saves the result to HDFS.

The context is (re)-created at startup as follows:

def streamContext() = {

def newContext() = {
  val ctx = new StreamingContext(sparkConf, Duration(1))
  ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/")
  ctx
}


StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/",
newContext)
  }


And the start-up and shutdown of the stream is handled as follows:

try {

val sparkContext = streamContext()

[.. build stream here...]

sparkContext.start()
sparkContext.awaitTermination()

  } catch {
  case e: Throwable =>
log.error("shutting down tabulation stream...", e)
sparkContext.stop()
log.info("...waiting termination...")
sparkContext.awaitTermination()
log.info("...tabulation stream stopped")
  }



When starting the stream for the first time (with spark-submit), the
processing happens successfully, folders are created on the target HDFS
folder and streaming stats are visible on http://sparkhost:4040/streaming.

After letting the streaming work several minutes and then stopping it
(ctrl-c on the command line), the following info is visible in the
checkpoint folder:

mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 11 items
drwxr-xr-x   - mnubohadoop hadoop  0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
-rw-r--r--   3 mnubohadoop hadoop   5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229
-rw-r--r--   3 mnubohadoop hadoop   5512 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229.bk
-rw-r--r--   3 mnubohadoop hadoop   5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230
-rw-r--r--   3 mnubohadoop hadoop   5507 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230.bk
-rw-r--r--   3 mnubohadoop hadoop   5476 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231
-rw-r--r--   3 mnubohadoop hadoop   5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231.bk
-rw-r--r--   3 mnubohadoop hadoop   5477 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232
-rw-r--r--   3 mnubohadoop hadoop   5506 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232.bk
-rw-r--r--   3 mnubohadoop hadoop   5484 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165233
-rw-r--r--   3 mnubohadoop hadoop   5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165233.bk
mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
14/09/25 09:42:08 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 2 items
drwxr-xr-x   - mnubohadoop hadoop  0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8438
drwxr-xr-x   - mnubohadoop hadoop  0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8542


(checkpoint clean-up seems to happen since the stream ran for much more
than 5 times 10 seconds)

When re-starting the stream, the startup fails with the error below,
http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is
added in the target folder and no new checkpoint are created:

09:45:05.038 [main] ERROR c.mnubo.analytic.tabulate.StreamApp -
shutting down tabulation stream...
org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.FilteredDStream@e8949a1 has not
been initialized
   

Re: problem with HiveContext inside Actor

2014-09-26 Thread Cheng Lian
This is reasonable, since the actual constructor gets called is 
|Driver()| rather than |Driver(HiveConf)|. The former initializes the 
|conf| field by:


|conf = SessionState.get().getConf()
|

And |SessionState.get()| reads a TSS value. Thus executing SQL queries 
within another thread causes NPE since the |Driver| is created in a 
thread different from the one |HiveContext| (and the contained 
|SessionState|) gets constructed.


On 9/19/14 3:31 AM, Du Li wrote:


I have figured it out.

As shown in the code below, if the HiveContext hc were created in the 
actor object and used to create db in response to message, it would 
throw null pointer exception. This is fixed by creating the 
HiveContext inside the MyActor class instead. I also tested the code 
by replacing Actor with Thread. The problem and fix are similar.


Du

——
abstract class MyMessage
case object CreateDB extends MyMessage

object MyActor {
  def init(_sc: SparkContext) = {
if( actorSystem == null || actorRef == null ) {
  actorSystem = ActorSystem(“root")
  actorRef = actorSystem.actorOf(Props(new MyActor(_sc)), “myactor")
}
//hc = new MyHiveContext(_sc)
  }

  def !(m: MyMessage) {
actorRef ! m
  }

  //var hc: MyHiveContext = _
  private var actorSystem: ActorSystem = null
  private var actorRef: ActorRef = null
}

class MyActor(sc: SparkContext) extends Actor {
  val hc = new MyHiveContext(sc)
  def receive: Receiver = {
case CreateDB => hc.createDB()
  }
}

class MyHiveContext(sc: SparkContext) extends HiveContext(sc) {
  def createDB() {...}
}


From:  "Chester @work" >

Date:  Thursday, September 18, 2014 at 7:17 AM
To:  Du Li >
Cc:  Michael Armbrust >, "Cheng, Hao" >, "user@spark.apache.org 
" >

Subject:  Re: problem with HiveContext inside Actor


Akka actor are managed under a thread pool, so the same actor can be 
under different thread.


If you create HiveContext in the actor, is it possible that you are 
essentially create different instance of HiveContext ?


Sent from my iPhone

On Sep 17, 2014, at 10:14 PM, Du Li > wrote:




Thanks for your reply.

Michael: No. I only create one HiveContext in the code.

Hao: Yes. I subclass HiveContext and defines own function to create 
database and then subclass akka Actor to call that function in 
response to an abstract message. By your suggestion, I called 
println(sessionState.getConf.getAllProperties) that printed
tons of properties; however, the same NullPointerException was still 
thrown.


As mentioned, the weird thing is that everything worked fine if I 
simply called actor.hiveContext.createDB() directly. But it throws the 
null pointer exception from Driver.java if I do "actor ! 
CreateSomeDB”, which seems to me just the same thing because

the actor does nothing but call createDB().

Du





From: Michael Armbrust >

Date: Wednesday, September 17, 2014 at 7:40 PM
To: "Cheng, Hao" mailto:hao.ch...@intel.com>>
Cc: Du Li >, "user@spark.apache.org 
" >

Subject: Re: problem with HiveContext inside Actor


- dev

Is it possible that you are constructing more than one HiveContext in 
a single JVM?  Due to global state in Hive code this is not allowed.


Michael


On Wed, Sep 17, 2014 at 7:21 PM, Cheng, Hao
mailto:hao.ch...@intel.com>> wrote:

Hi, Du
I am not sure what you mean “triggers the HiveContext to create a 
database”, do you create the sub class
of HiveContext? Just be sure you call the “HiveContext.sessionState” 
eagerly, since it will set the proper “hiveconf” into the 
SessionState, otherwise the HiveDriver will always get the null value 
when retrieving HiveConf.

Cheng Hao
From: Du Li [mailto:l...@yahoo-inc.com.INVALID]

Sent: Thursday, September 18, 2014 7:51 AM
To: user@spark.apache.org ;
d...@spark.apache.org 
Subject: problem with HiveContext inside Actor


Hi,


Wonder anybody had similar experience or any suggestion here.


I have an akka Actor that processes database requests in high-level 
messages. Inside this Actor, it creates a HiveContext object that does the
actual db work. The main thread creates the needed SparkContext and 
passes in to the Actor to create the HiveContext.



When a message is sent to the Actor, it is processed properly except 
that, when the message triggers the HiveContext to create a database, it
throws a NullPointerException in hive.ql.Driver.java which suggests 
that its conf variable is not initialized.



Ironically, it works fine if my main thread directly calls 
actor.hiveContext to create the database. The spark version is 1.1.0.



Thanks,

Du











​


How to run hive scripts pro-grammatically in Spark 1.1.0 ?

2014-09-26 Thread Sherine
I am unable to run hive scripts in Spark 1.1.0 pro-grammatically in hadoop
prompt but I could do it manually.
Can anyone help me to run hive scripts pro-grammatically in spark1.1.0
cluster on EMR?

Manual running steps:-


hadoop@ip-10-151-71-224:~/tmpSpark/spark1.1/spark$ ./bin/spark-shell
--driver-memory 4G --executor-memory 4G 

Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
14/09/26 11:54:29 INFO SecurityManager: Changing view acls to: hadoop,
14/09/26 11:54:29 INFO SecurityManager: Changing modify acls to: hadoop,
14/09/26 11:54:29 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(hadoop, );
users with modify permissions: Set(hadoop, )
14/09/26 11:54:29 INFO HttpServer: Starting HTTP Server
14/09/26 11:54:29 INFO Utils: Successfully started service 'HTTP class
server' on port 52081.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.1.0
  /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java
1.7.0_40)
Type in expressions to have them evaluated.
Type :help for more information.
14/09/26 11:54:34 INFO SecurityManager: Changing view acls to: hadoop,
14/09/26 11:54:34 INFO SecurityManager: Changing modify acls to: hadoop,
14/09/26 11:54:34 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(hadoop, );
users with modify permissions: Set(hadoop, )
14/09/26 11:54:35 INFO Slf4jLogger: Slf4jLogger started
14/09/26 11:54:35 INFO Remoting: Starting remoting
14/09/26 11:54:35 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@ip-10-151-71-224.ec2.internal:46137]
14/09/26 11:54:35 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkDriver@ip-10-151-71-224.ec2.internal:46137]
14/09/26 11:54:35 INFO Utils: Successfully started service 'sparkDriver' on
port 46137.
14/09/26 11:54:35 INFO SparkEnv: Registering MapOutputTracker
14/09/26 11:54:35 INFO SparkEnv: Registering BlockManagerMaster
14/09/26 11:54:35 INFO DiskBlockManager: Created local directory at
/tmp/spark-local-20140926115435-fa1a
14/09/26 11:54:35 INFO Utils: Successfully started service 'Connection
manager for block manager' on port 47623.
14/09/26 11:54:35 INFO ConnectionManager: Bound socket to port 47623 with id
= ConnectionManagerId(ip-10-151-71-224.ec2.internal,47623)
14/09/26 11:54:35 INFO MemoryStore: MemoryStore started with capacity 2.1 GB
14/09/26 11:54:35 INFO BlockManagerMaster: Trying to register BlockManager
14/09/26 11:54:35 INFO BlockManagerMasterActor: Registering block manager
ip-10-151-71-224.ec2.internal:47623 with 2.1 GB RAM
14/09/26 11:54:35 INFO BlockManagerMaster: Registered BlockManager
14/09/26 11:54:35 INFO HttpFileServer: HTTP File server directory is
/tmp/spark-dc2260ea-18cc-4204-8f02-36bcc1df1126
14/09/26 11:54:35 INFO HttpServer: Starting HTTP Server
14/09/26 11:54:36 INFO Utils: Successfully started service 'HTTP file
server' on port 49299.
14/09/26 11:54:41 INFO Utils: Successfully started service 'SparkUI' on port
4040.
14/09/26 11:54:41 INFO SparkUI: Started SparkUI at
http://ip-10-151-71-224.ec2.internal:4040
14/09/26 11:54:41 INFO Executor: Using REPL class URI:
http://10.151.71.224:52081
14/09/26 11:54:41 INFO AkkaUtils: Connecting to HeartbeatReceiver:
akka.tcp://sparkDriver@ip-10-151-71-224.ec2.internal:46137/user/HeartbeatReceiver
14/09/26 11:54:41 INFO SparkILoop: Created spark context..
Spark context available as sc.

scala> 

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc);
hiveContext: org.apache.spark.sql.hive.HiveContext =
org.apache.spark.sql.hive.HiveContext@3e77175c

scala> hiveContext.hql("CREATE EXTERNAL TABLE IF NOT EXISTS test (time
string, id string) ROW FORMAT DELIMITED STORED AS TEXTFILE LOCATION
's3n://output/test/'"); 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-run-hive-scripts-pro-grammatically-in-Spark-1-1-0-tp15225.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: executorAdded event to DAGScheduler

2014-09-26 Thread praveen seluka
In Yarn, we can easily  have multiple containers allocated in the same node.

On Fri, Sep 26, 2014 at 6:05 PM, Nan Zhu  wrote:

>  just a quick reply, we cannot start two executors in the same host for a
> single application in the standard deployment (one worker per machine)
>
> I’m not sure if it will create an issue when you have multiple workers in
> the same host, as submitWaitingStages is called everywhere and I never
> try such a deployment mode
>
> Best,
>
> --
> Nan Zhu
>
> On Friday, September 26, 2014 at 8:02 AM, praveen seluka wrote:
>
> Can someone explain the motivation behind passing executorAdded event to
> DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded
> *method is called by *TaskSchedulerImpl*. I see some issue in the below
> code,
>
> *TaskSchedulerImpl.scala code*
> if (!executorsByHost.contains(o.host)) {
> executorsByHost(o.host) = new HashSet[String]()
> executorAdded(o.executorId, o.host)
> newExecAvail = true
>   }
>
> Note that executorAdded is called only when there is a new host and not
> for every new executor. For instance, there can be two executors in the
> same host and in this case. (But DAGScheduler executorAdded is notified
> only for new host - so only once in this case). If this is indeed an issue,
> I would like to submit a patch for this quickly. [cc Andrew Or]
>
> - Praveen
>
>
>
>


Re: executorAdded event to DAGScheduler

2014-09-26 Thread Nan Zhu
just a quick reply, we cannot start two executors in the same host for a single 
application in the standard deployment (one worker per machine)  

I’m not sure if it will create an issue when you have multiple workers in the 
same host, as submitWaitingStages is called everywhere and I never try such a 
deployment mode

Best,  

--  
Nan Zhu


On Friday, September 26, 2014 at 8:02 AM, praveen seluka wrote:

> Can someone explain the motivation behind passing executorAdded event to 
> DAGScheduler ? DAGScheduler does submitWaitingStages when executorAdded 
> method is called by TaskSchedulerImpl. I see some issue in the below code,
>  
> TaskSchedulerImpl.scala code
> if (!executorsByHost.contains(o.host)) {
> executorsByHost(o.host) = new HashSet[String]()
> executorAdded(o.executorId, o.host)
> newExecAvail = true
>   }
>  
>  
> Note that executorAdded is called only when there is a new host and not for 
> every new executor. For instance, there can be two executors in the same host 
> and in this case. (But DAGScheduler executorAdded is notified only for new 
> host - so only once in this case). If this is indeed an issue, I would like 
> to submit a patch for this quickly. [cc Andrew Or]
>  
> - Praveen
>  
>  



Re: SparkSQL Thriftserver in Mesos

2014-09-26 Thread Cheng Lian
You can avoid install Spark on each node by uploading Spark distribution 
tarball file to HDFS setting |spark.executor.uri| to the HDFS location. 
In this way, Mesos will download and the tarball file before launching 
containers. Please refer to this Spark documentation page 
 for details.


However, using |spark.executor.uri| together with fine-grained mode 
(which is the default mode) really kills performance, because Mesos 
downloads and extracts the tarball every time a Spark /task/ (not 
application) is launched.


On 9/21/14 1:16 AM, John Omernik wrote:

I am running the Thrift server in SparkSQL, and running it on the node 
I compiled spark on.  When I run it, tasks only work if they landed on 
that node, other executors started on nodes I didn't compile spark on 
(and thus don't have the compile directory) fail.  Should spark be 
distributed properly with the executor uri in my spark-defaults for 
mesos?


Here is the error on nodes with Lost executors

sh: 1: /opt/mapr/spark/spark-1.1.0-SNAPSHOT/sbin/spark-executor: not found


​


Re: executorAdded event to DAGScheduler

2014-09-26 Thread praveen seluka
Some corrections.

On Fri, Sep 26, 2014 at 5:32 PM, praveen seluka 
wrote:

> Can someone explain the motivation behind passing executorAdded event to
> DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded
> *method is called by *TaskSchedulerImpl*. I see some issue in the below
> code,
>
> *TaskSchedulerImpl.scala code*
> if (!executorsByHost.contains(o.host)) {
> executorsByHost(o.host) = new HashSet[String]()
> executorAdded(o.executorId, o.host)
> newExecAvail = true
>   }
>
> Note that executorAdded is called only when there is a new host and not
> for every new executor. For instance, there can be two executors in the
> same host and in this case the DAGscheduler is notified only once. If this
> is indeed an issue, I would like to submit a patch for this quickly. [cc
> Andrew Or]
>
> - Praveen
>
>
>


executorAdded event to DAGScheduler

2014-09-26 Thread praveen seluka
Can someone explain the motivation behind passing executorAdded event to
DAGScheduler ? *DAGScheduler *does *submitWaitingStages *when *executorAdded
*method is called by *TaskSchedulerImpl*. I see some issue in the below
code,

*TaskSchedulerImpl.scala code*
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
  }

Note that executorAdded is called only when there is a new host and not for
every new executor. For instance, there can be two executors in the same
host and in this case. (But DAGScheduler executorAdded is notified only for
new host - so only once in this case). If this is indeed an issue, I would
like to submit a patch for this quickly. [cc Andrew Or]

- Praveen


Re: Using one sql query's result inside another sql query

2014-09-26 Thread Cheng Lian

H Twinkle,

The failure is caused by case sensitivity. The temp table actually 
stores the original un-analyzed logical plan, thus field names remain 
capital (F1, F2, etc.). I believe this issue has already been fixed by 
PR #2382 . As a workaround, 
you can use lowercase letters in field names instead.


Cheng

On 9/25/14 1:18 PM, twinkle sachdeva wrote:


Hi,

I am using Hive Context to fire the sql queries inside spark. I have 
created a schemaRDD( Let's call it cachedSchema ) inside my code.

If i fire a sql query ( Query 1 ) on top of it, then it works.

But if I refer to Query1's result inside another sql, that fails. Note 
that I have already registered Query1's result as temp table.


registerTempTable(cachedSchema)
Queryresult1 = Query1 using cachedSchema  [ works ]
registerTempTable(Queryresult1)

Queryresult2 = Query2 using Queryresult1  [ FAILS ]

Is it expected?? Any known work around?

Following is the exception I am receiving :


*org.apache.spark.sql.catalyst.errors.package$TreeNodeException: 
Unresolved attributes: 'f1,'f2,'f3,'f4, tree:*


*Project ['f1,'f2,'f3,'f4]*

* Filter ('count > 3)*

*  LowerCaseSchema *

*   Subquery x*

*Project ['F1,'F2,'F3,'F4,'F6,'Count]*

* LowerCaseSchema *

*  Subquery src*

*   SparkLogicalPlan (ExistingRdd 
[F1#0,F2#1,F3#2,F4#3,F5#4,F6#5,Count#6], MappedRDD[4] at map at 
SQLBlock.scala:64)*



*at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:72)*


*at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$anonfun$apply$1.applyOrElse(Analyzer.scala:70)*


*at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)*


*at 
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)*


*at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:70)*


*at 
org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:68)*


*at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:61)*


*at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1$anonfun$apply$2.apply(RuleExecutor.scala:59)*


*at 
scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)*


*at 
scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)*


*at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34)*

*at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:59)*


*at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$anonfun$apply$1.apply(RuleExecutor.scala:51)*


*at scala.collection.immutable.List.foreach(List.scala:318)*

*at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)*


*at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:397)*


*at 
org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:397)*


*at 
org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358)*


*at 
org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357)*


*at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)*


*at 
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)*


*at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)*


*at 
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)*


*at 
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)*


*at 
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)*


*at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:120)*

*at org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:191)*

*at org.apache.spark.rdd.RDD$anonfun$dependencies$2.apply(RDD.scala:189)*


​


Re: Issue with Spark-1.1.0 and the start-thriftserver.sh script

2014-09-26 Thread Cheng Lian

Hi Helene,

Thanks for the report. In Spark 1.1, we use a special exit code to 
indicate |SparkSubmit| fails because of class not found. But 
unfortunately I chose a not so special exit code — 1… So whenever the 
process exit with 1 as exit code, the |-Phive| error message is shown. A 
PR that changes 1 to 101 has been merged to master, hopefully to reduce 
potential exit code conflicts.


Cheng

On 9/25/14 5:44 PM, Hélène Delanoeye wrote:


Hi

We've just experienced an issue with the new Spark-1.1.0 and the 
start-thriftserver.sh script.



We tried to launch start-thriftserver.sh with "--master yarn" option 
and got the following error message :


/Failed to load Hive Thrift server main class 
org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.

You need to build Spark with -Phive. /


In fact Spark was built with -Phive option, but the real problem was 
this one :


/Application appattempt_1411058337040_0118_01 submitted by user 
x  to unknown queue: default/


So the solution was to specified the queue, and it works :
/opt/spark/sbin/start-thriftserver.sh --master yarn --queue spark-batch

Hope this could help, as the error message is not really clear (and 
rather wrong).


Helene

--
Kelkoo  

*Hélène Delanoeye *Software engineer / Search team

*E*helene.delano...@kelkoo.com  
*Y!Messenger* kelkoohelened

*T* (+33) 4 56 09 07 57
*A* 6, rue des Méridiens 38130 Echirolles FRANCE





Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à 
l'attention exclusive de leurs destinataires. Si vous n'êtes pas le 
destinataire de ce message, merci de le détruire et d'en avertir 
l'expéditeur.


​


Access file name in map function

2014-09-26 Thread Shekhar Bansal
Hi
In one of our usecase, filename contains timestamp and we have to append it in 
the record for aggregation.
How can I access filename in map function?

Thanks!

Re: Job cancelled because SparkContext was shut down

2014-09-26 Thread jamborta
Just wanted to answer my question in case someone else runs into the same
problem.

It is related to the problem discussed here:

http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html

and here:

https://issues.apache.org/jira/browse/SPARK-2121

seems yarn kills some of the executors as they request more memory than
expected.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Job-cancelled-because-SparkContext-was-shut-down-tp15189p15216.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Log hdfs blocks sending

2014-09-26 Thread Alexey Romanchuk
Hello Andrew!

Thanks for reply. Which logs and on what level should I check? Driver,
master or worker?

I found this on master node, but there is only ANY locality requirement.
Here it is the driver (spark sql) log -
https://gist.github.com/13h3r/c91034307caa33139001 and one of the workers
log - https://gist.github.com/13h3r/6e5053cf0dbe33f2

Do you have any idea where to look at?

Thanks!

On Fri, Sep 26, 2014 at 10:35 AM, Andrew Ash  wrote:

> Hi Alexey,
>
> You should see in the logs a locality measure like NODE_LOCAL,
> PROCESS_LOCAL, ANY, etc.  If your Spark workers each have an HDFS data node
> on them and you're reading out of HDFS, then you should be seeing almost
> all NODE_LOCAL accesses.  One cause I've seen for mismatches is if Spark
> uses short hostnames and Hadoop uses FQDNs -- in that case Spark doesn't
> think the data is local and does remote reads which really kills
> performance.
>
> Hope that helps!
> Andrew
>
> On Thu, Sep 25, 2014 at 12:09 AM, Alexey Romanchuk <
> alexey.romanc...@gmail.com> wrote:
>
>> Hello again spark users and developers!
>>
>> I have standalone spark cluster (1.1.0) and spark sql running on it. My
>> cluster consists of 4 datanodes and replication factor of files is 3.
>>
>> I use thrift server to access spark sql and have 1 table with 30+
>> partitions. When I run query on whole table (something simple like select
>> count(*) from t) spark produces a lot of network activity filling all
>> available 1gb link. Looks like spark sent data by network instead of local
>> reading.
>>
>> Is it any way to log which blocks were accessed locally and which are not?
>>
>> Thanks!
>>
>
>


Spark SQL question: is cached SchemaRDD storage controlled by "spark.storage.memoryFraction"?

2014-09-26 Thread Haopu Wang
Hi, I'm querying a big table using Spark SQL. I see very long GC time in
some stages. I wonder if I can improve it by tuning the storage
parameter.

The question is: the schemaRDD has been cached with "cacheTable()"
function. So is the cached schemaRDD part of memory storage controlled
by the "spark.storage.memoryFraction" parameter?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Access by name in "tuples" in Scala with Spark

2014-09-26 Thread Sean Owen
I think you are simply looking for a case class in Scala. It is a
simple way to define an object with named, typed fields.

On Fri, Sep 26, 2014 at 8:31 AM, rzykov  wrote:
> Could you advise the  best practice of using some "named tuples" in Scala
> with Spark RDD.
> Currently we can access by a field number in a tuple:
>
> RDD.map{_.2}
>
> But want to see such construction:
>
> RDD.map{_.itemId}
>
> This one will be helpful for debugging purposes.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Access-by-name-in-tuples-in-Scala-with-Spark-tp15212.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Access by name in "tuples" in Scala with Spark

2014-09-26 Thread rzykov
Could you advise the  best practice of using some "named tuples" in Scala
with Spark RDD.
Currently we can access by a field number in a tuple:

RDD.map{_.2}

But want to see such construction:

RDD.map{_.itemId}

This one will be helpful for debugging purposes.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Access-by-name-in-tuples-in-Scala-with-Spark-tp15212.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming + Actors

2014-09-26 Thread Madabhattula Rajesh Kumar
Hi Team,

Could you please respond on my below request.

Regards,
Rajesh



On Thu, Sep 25, 2014 at 11:38 PM, Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi Team,
>
> Can I use Actors in Spark Streaming based on events type? Could you please
> review below Test program and let me know if any thing I need to change
> with respect to best practices
>
> import akka.actor.Actor
> import akka.actor.{ActorRef, Props}
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.StreamingContext
> import org.apache.spark.streaming.Seconds
> import akka.actor.ActorSystem
>
> case class one(r: org.apache.spark.rdd.RDD[String])
> case class two(s: org.apache.spark.rdd.RDD[String])
>
> class Events extends Actor
> {
>   def receive = {
> // Based on event type - Invoke respective methods asynchronously
> case one(r) => println("ONE COUNT" + r.count) // Invoke respective
> functions
> case two(s) => println("TWO COUNT" + s.count) // Invoke respective
> functions
>   }
> }
>
> object Test {
>
> def main(args: Array[String]) {
> val system = ActorSystem("System")
> val event: ActorRef = system.actorOf(Props[Events], "events")
> val sparkConf = new SparkConf() setAppName("AlertsLinesCount")
> setMaster("local")
> val ssc = new StreamingContext(sparkConf, Seconds(30))
> val lines = ssc
> textFileStream("hdfs://localhost:9000/user/rajesh/EventsDirectory/")
> lines foreachRDD(x => {
>   event ! one(x)
>   event ! two(x)
> })
> ssc.start
> ssc.awaitTermination
> }
> }
>
> Regards,
> Rajesh
>