Re: How to set java.library.path in a spark cluster

2014-09-09 Thread qihong
Add something like following to spark-env.sh
export LD_LIBRARY_PATH=:$LD_LIBRARY_PATH

(and remove all 5 exports you listed). Then restart all worker nodes, and
try
again.

Good luck!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-java-library-path-in-a-spark-cluster-tp13854p13857.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Records - Input Byte

2014-09-09 Thread Mayur Rustagi
What do you mean by "control your input”, are you trying to pace your spark 
streaming by number of words. If so that is not supported as of now, you can 
only control time & consume all files within that time period. 
-- 
Regards,
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi

On Tue, Sep 9, 2014 at 2:24 AM, danilopds  wrote:

> Hi,
> I was reading the paper of Spark Streaming:
> "Discretized Streams: Fault-Tolerant Streaming Computation at Scale"
> So,
> I read that performance evaluation used 100-byte input records in test Grep
> and WordCount.
> I don't have much experience and I'd like to know how can I control this
> value in my records (like words in an input file)?
> Can anyone suggest me something to start?
> Thanks!
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Records-Input-Byte-tp13733.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

Re: Table not found: using jdbc console to query sparksql hive thriftserver

2014-09-09 Thread Du Li
You need to run mvn install so that the package you built is put into the
local maven repo. Then when compiling your own app (with the right
dependency specified), the package will be retrieved.



On 9/9/14, 8:16 PM, "alexandria1101"  wrote:

>I think the package does not exist because I need to change the pom file:
>
>   org.apache.spark
>   spark-assembly_2.10
>   1.0.1
>   pom
>   provided
>  
>
>I changed the version number to 1.1.1, yet still that causes the build
>error:
>
>Failure to find org.apache.spark:spark-assembly_2.10:pom:1.1.1 in
>http://repo.maven.apache.org/maven2 was cached in the local repository,
>resolution will not be reattempted until the update interval of central
>has
>elapsed or updates are forced -> [Help 1]
>
>Is there a way to get past this?
>
>
>
>--
>View this message in context:
>http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using-
>jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13851.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to choose right DStream batch interval

2014-09-09 Thread qihong
Hi Mayur,

Thanks for your response. I did write a simple test that set up a DStream
with 
5 batches; The batch duration is 1 second, and the 3rd batch will take extra
2 seconds, the output of the test shows that the 3rd batch causes backlog,
and spark streaming does catch up on 4th and 5th batch (DStream.print 
was modified to output system time)

---
Time: 1409959708000 ms, system time: 1409959708269
---
1155
---
Time: 1409959709000 ms, system time: 1409959709033
---
2255
delay 2000 ms
---
Time: 140995971 ms, system time: 1409959712036
---
3355
---
Time: 1409959711000 ms, system time: 1409959712059
---
4455
---
Time: 1409959712000 ms, system time: 1409959712083
---


Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13855.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to set java.library.path in a spark cluster

2014-09-09 Thread ayandas84
Hi,

I am working on a 3 machine cloudera cluster. Whenever I submit a spark job
as a jar file with native dependency on mosek it shows the following error.

java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path

How should I set the java.library.path. I printed the environment variable
and it shows;
-Djava.library.path= -Xms512m -Xmx512m,

I added the following lines to the spark-env.sh file but it was of no help.
The path contains both the mosek.jar and the libmosek_7.0.so files.

export
SPARK_LIBRARY_PATH=${SPARK_HOME}/lib:/home/chanda/mosek/7/tools/platform/linux64x86/bin
export
SPARK_MASTER_OPTS='-Djava.library.path="/home/chanda/mosek/7/tools/platform/linux64x86/bin'
export
SPARK_WORKER_OPTS='/home/chanda/mosek/7/tools/platform/linux64x86/bin'
export
SPARK_HISTORY_OPTS='/home/chanda/mosek/7/tools/platform/linux64x86/bin'
export
SPARK_DAEMON_JAVA_OPTS='/home/chanda/mosek/7/tools/platform/linux64x86/bin'

Please help




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-java-library-path-in-a-spark-cluster-tp13854.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to setup steady state stream partitions

2014-09-09 Thread qihong
Thanks for your response. I do have something like:

val inputDStream = ...
val keyedDStream = inputDStream.map(...)  // use sensorId as key
val partitionedDStream = keyedDstream.transform(rdd => rdd.partitionBy(new
MyPartitioner(...)))
val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction)

The partitionedDStream does have steady partitions, but stateDStream does
not
have steady partitions, i.e., in the partition 0 of partitionedDStream,
there's only
data for sensors 0 to 999, but the partition 0 of stateDStream contains data
for some sensors from 0 to 999 range, and lot of sensor from other
partitions of
partitionedDStream. 

I wish the partition 0 of stateDStream only contains the data from the
partition 0
of partitionedDStream, partiton 1 of stateDStream only from partition 1 of 
partitionedDStream, and so on. Anyone knows how to implement that?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13853.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to setup steady state stream partitions

2014-09-09 Thread x
Using your own partitioner didn't work?

e.g.
YourRDD.partitionBy(new HashPartitioner(your number))

xj @ Tokyo

On Wed, Sep 10, 2014 at 12:03 PM, qihong  wrote:

> I'm working on a DStream application.  The input are sensors' measurements,
> the data format is 
>
> There are 10 thousands sensors, and updateStateByKey is used to maintain
> the states of sensors, the code looks like following:
>
> val inputDStream = ...
> val keyedDStream = inputDStream.map(...)  // use sensorId as key
> val stateDStream = keyedDStream.updateStateByKey[...](udpateFunction)
>
> Here's the question:
> In a cluster with 10 worker nodes, is it possible to partition the input
> dstream, so that node 1 handles sendor 0-999, node 2 handles 1000-1999,
> and so on?
>
> Also, is it possible to keep state stream for sensor 0 - 999 on node 1,
> 1000
> to 1999 on node 2, and etc. Right now, I see sensor state stream is
> shuffled
> for every batch, which used lot of network bandwidth and it's unnecessary.
>
> Any suggestions?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Table not found: using jdbc console to query sparksql hive thriftserver

2014-09-09 Thread alexandria1101
I think the package does not exist because I need to change the pom file:

   org.apache.spark
   spark-assembly_2.10
   1.0.1
   pom
   provided
  

I changed the version number to 1.1.1, yet still that causes the build
error:

Failure to find org.apache.spark:spark-assembly_2.10:pom:1.1.1 in
http://repo.maven.apache.org/maven2 was cached in the local repository,
resolution will not be reattempted until the update interval of central has
elapsed or updates are forced -> [Help 1]

Is there a way to get past this?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using-jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13851.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to setup steady state stream partitions

2014-09-09 Thread qihong
I'm working on a DStream application.  The input are sensors' measurements, 
the data format is 

There are 10 thousands sensors, and updateStateByKey is used to maintain
the states of sensors, the code looks like following:

val inputDStream = ...
val keyedDStream = inputDStream.map(...)  // use sensorId as key
val stateDStream = keyedDStream.updateStateByKey[...](udpateFunction)

Here's the question:
In a cluster with 10 worker nodes, is it possible to partition the input
dstream, so that node 1 handles sendor 0-999, node 2 handles 1000-1999,
and so on?

Also, is it possible to keep state stream for sensor 0 - 999 on node 1, 1000
to 1999 on node 2, and etc. Right now, I see sensor state stream is shuffled
for every batch, which used lot of network bandwidth and it's unnecessary.

Any suggestions?

Thanks! 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming for synchronous API

2014-09-09 Thread Tobias Pfeiffer
Hi again,

On Tue, Sep 9, 2014 at 2:20 PM, Tobias Pfeiffer  wrote:
>
> On Tue, Sep 9, 2014 at 2:02 PM, Ron's Yahoo!  wrote:
>>
>>   For example, let’s say there’s a particular topic T1 in a Kafka queue.
>> If I have a new set of requests coming from a particular client A, I was
>> wondering if I could create a partition A.
>>   The streaming job is submitted to listen to T1.A and will write to a
>> topic T2.A, which the REST endpoint would be listening on.
>>
>
> That doesn't seem like a good way to use Kafka. It may be possible, but I
> am pretty sure you should create a new topic T_A instead of a partition A
> in an existing topic. With some modifications of Spark Streaming's
> KafkaReceiver you *might* be able to get it to work as you imagine, but it
> was not meant to be that way, I think.
>

Maybe I was wrong about a new topic being the better way. Looking, for
example, at the way that Samza consumes Kafka streams <
http://samza.incubator.apache.org/learn/documentation/latest/introduction/concepts.html>,
it seems like there is one task per partition and data can go into
partitions keyed by user ID. So maybe a new partition is actually the
conceptually better way.

Nonetheless, the built-in KafkaReceiver doesn't support assignment of
partitions to receivers AFAIK ;-)

Tobias


Re: Spark EC2 standalone - Utils.fetchFile no such file or directory

2014-09-09 Thread luanjunyi
I've encountered probably the same problem and just figured out the solution.

The error was caused because Spark tried to write to the scratch directory
but the path didn't exist.

It's likely you are running the app on the master node only. In the
spark-ec2 setting, the scratch directory for Spark(spark.local.dir) is set
to /mnt/spark in conf/spark-env.sh. This path exists on all slave nodes but
not the master node, hence the error.

So if you set the master URL to spark://your-master-node-domain:7077, the
error will be gone since all the slave instances are in slave nodes. If you
need to test on the master node, either create /mnt/spark your self or
change the entry(SPARK_LOCAL_DIRS) in conf/spark-env.sh to some existing
path with write permission.

Note that the environment variables defined in conf/spark-env.sh are meant
for machine-specific settings thus they will override the settings in the
SparkConf object even if you provided one.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-EC2-standalone-Utils-fetchFile-no-such-file-or-directory-tp12683p13848.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Table not found: using jdbc console to query sparksql hive thriftserver

2014-09-09 Thread alexandria1101
Thanks so much!

That makes complete sense.  However, when I compile I get an error "package
org.apache.spark.sql.hive does not exist."

Does anyone else have this and any idea why this might be so?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Table-not-found-using-jdbc-console-to-query-sparksql-hive-thriftserver-tp13840p13847.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-09 Thread Shao, Saisai
Hi Luis,

The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used 
to remove useless timeout streaming data, the difference is that 
“spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming 
input data, but also Spark’s useless metadata; while 
“spark.streaming.unpersist” is reference-based cleaning mechanism, streaming 
data will be removed when out of slide duration.

Both these two parameter can alleviate the memory occupation of Spark 
Streaming. But if the data is flooded into Spark Streaming when start up like 
your situation using Kafka, these two parameters cannot well mitigate the 
problem. Actually you need to control the input data rate to not inject so 
fast, you can try “spark.straming.receiver.maxRate” to control the inject rate.

Thanks
Jerry

From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
Sent: Wednesday, September 10, 2014 5:21 AM
To: user@spark.apache.org
Subject: spark.cleaner.ttl and spark.streaming.unpersist

The executors of my spark streaming application are being killed due to memory 
issues. The memory consumption is quite high on startup because is the first 
run and there are quite a few events on the kafka queues that are consumed at a 
rate of 100K events per sec.

I wonder if it's recommended to use spark.cleaner.ttl and 
spark.streaming.unpersist together to mitigate that problem. And I also wonder 
if new RDD are being batched while a RDD is being processed.
Regards,

Luis


Re: Table not found: using jdbc console to query sparksql hive thriftserver

2014-09-09 Thread Du Li
Your tables were registered in the SqlContext, whereas the thrift server
works with HiveContext. They seem to be in two different worlds today.



On 9/9/14, 5:16 PM, "alexandria1101"  wrote:

>Hi,
>
>I want to use the sparksql thrift server in my application and make sure
>everything is loading and working. I built Spark 1.1 SNAPSHOT and ran the
>thrift server using ./sbin/start-thrift-server.  In my application I load
>tables into schemaRDDs and I expect that the thrift-server should pick
>them
>up.   In the app I then perform SQL queries on a table called mutation
>(the
>same name as the table I registered from the schemaRDD).
>
>I set the driver to "org.apache.hive.jdbc.HiveDriver" and the url to
>"jdbc:hive2://localhost:1/mutation?zeroDateTimeBehavior=convertToNull"
>.
>
>When I check the terminal for the thrift server output, it gets the
>query. 
>However, I cannot use a jdbc console to communicate with it to show all of
>the databases and tables to see if mutation is loaded.
>
>
>I get the following errors:
>
>14/09/09 16:51:02 WARN component.AbstractLifeCycle: FAILED
>SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address
>already
>in use
>java.net.BindException: Address already in use
>   at sun.nio.ch.Net.bind0(Native Method)
>   at sun.nio.ch.Net.bind(Net.java:444)
>   at sun.nio.ch.Net.bind(Net.java:436)
>   at
>sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
>   at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
>   at
>org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConn
>ector.java:187)
>   at
>org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:
>316)
>   at
>org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelC
>onnector.java:265)
>   at
>org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle
>.java:64)
>   at org.eclipse.jetty.server.Server.doStart(Server.java:293)
>   at
>org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle
>.java:64)
>   at
>org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(
>JettyUtils.scala:192)
>   at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202)
>   at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202)
>   at
>org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Ut
>ils.scala:1446)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442)
>   at 
> org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202)
>   at org.apache.spark.ui.WebUI.bind(WebUI.scala:102)
>   at org.apache.spark.SparkContext.(SparkContext.scala:224)
>   at
>org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:5
>3)
>   at com.illumina.phoenix.util.Runner.createSparkContext(Runner.java:144)
>   at
>com.illumina.phoenix.etl.EtlPipelineRunner.main(EtlPipelineRunner.java:116
>)
>1053 [main] WARN org.eclipse.jetty.util.component.AbstractLifeCycle  -
>FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException:
>Address
>already in use
>java.net.BindException: Address already in use
>   at sun.nio.ch.Net.bind0(Native Method)
>   at sun.nio.ch.Net.bind(Net.java:444)
>   at sun.nio.ch.Net.bind(Net.java:436)
>   at
>sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
>   at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
>   at
>org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConn
>ector.java:187)
>   at
>org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:
>316)
>   at
>org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelC
>onnector.java:265)
>   at
>org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle
>.java:64)
>   at org.eclipse.jetty.server.Server.doStart(Server.java:293)
>   at
>org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle
>.java:64)
>   at
>org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(
>JettyUtils.scala:192)
>   at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202)
>   at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202)
>   at
>org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Ut
>ils.scala:1446)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>   at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442)
>   at 
> org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202)
>   at org.apache.spark.ui.WebUI.bind(WebUI.scala:102)
>   at org.apache.spark.SparkContext.(SparkContext.scala:224)
>   at
>org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:5
>3)
> 

EOFException when reading from HDFS

2014-09-09 Thread kent
I ran the SimpleApp program from spark tutorial
(https://spark.apache.org/docs/1.0.0/quick-start.html), which works fine. 

However, if I change the file location from local to hdfs, then I get an
EOFException. 

I did some search online which suggests this error is caused by hadoop
version conflicts, I made the suggested modification in my sbt file, but
still get the same error. 

libraryDependencies += "org.apache.hadoop" % "hadoop-client" %
"2.3.0-cdh5.1.0" 

I am using CDH5.1, full error message is below.  Any help is greatly
appreciated. 

Thanks 


[hdfs@plogs001 test1]$ spark-submit --class SimpleApp --master
spark://172.16.30.164:7077 target/scala-2.10/simple-project_2.10-1.0.jar 
14/09/09 16:56:41 INFO spark.SecurityManager: Changing view acls to: hdfs 
14/09/09 16:56:41 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hdfs) 
14/09/09 16:56:41 INFO slf4j.Slf4jLogger: Slf4jLogger started 
14/09/09 16:56:41 INFO Remoting: Starting remoting 
14/09/09 16:56:41 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sp...@plogs001.sjc.domain.com:34607] 
14/09/09 16:56:41 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sp...@plogs001.sjc.domain.com:34607] 
14/09/09 16:56:41 INFO spark.SparkEnv: Registering MapOutputTracker 
14/09/09 16:56:41 INFO spark.SparkEnv: Registering BlockManagerMaster 
14/09/09 16:56:41 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20140909165641-375e 
14/09/09 16:56:41 INFO storage.MemoryStore: MemoryStore started with
capacity 294.9 MB. 
14/09/09 16:56:41 INFO network.ConnectionManager: Bound socket to port 40833
with id = ConnectionManagerId(plogs001.sjc.domain.com,40833) 
14/09/09 16:56:41 INFO storage.BlockManagerMaster: Trying to register
BlockManager 
14/09/09 16:56:41 INFO storage.BlockManagerInfo: Registering block manager
plogs001.sjc.domain.com:40833 with 294.9 MB RAM 
14/09/09 16:56:41 INFO storage.BlockManagerMaster: Registered BlockManager 
14/09/09 16:56:41 INFO spark.HttpServer: Starting HTTP Server 
14/09/09 16:56:42 INFO server.Server: jetty-8.y.z-SNAPSHOT 
14/09/09 16:56:42 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:47419 
14/09/09 16:56:42 INFO broadcast.HttpBroadcast: Broadcast server started at
http://172.16.30.161:47419
14/09/09 16:56:42 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-7026d0b6-777e-4dd3-9bbb-e79d7487e7d7 
14/09/09 16:56:42 INFO spark.HttpServer: Starting HTTP Server 
14/09/09 16:56:42 INFO server.Server: jetty-8.y.z-SNAPSHOT 
14/09/09 16:56:42 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:42388 
14/09/09 16:56:42 INFO server.Server: jetty-8.y.z-SNAPSHOT 
14/09/09 16:56:42 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040 
14/09/09 16:56:42 INFO ui.SparkUI: Started SparkUI at
http://plogs001.sjc.domain.com:4040
14/09/09 16:56:42 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable 
14/09/09 16:56:42 INFO spark.SparkContext: Added JAR
file:/home/hdfs/kent/test1/target/scala-2.10/simple-project_2.10-1.0.jar at
http://172.16.30.161:42388/jars/simple-project_2.10-1.0.jar with timestamp
1410307002737 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Connecting to master
spark://plogs004.sjc.domain.com:7077... 
14/09/09 16:56:42 INFO storage.MemoryStore: ensureFreeSpace(155704) called
with curMem=0, maxMem=309225062 
14/09/09 16:56:42 INFO storage.MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 152.1 KB, free 294.8 MB) 
14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Connected to
Spark cluster with app ID app-20140909165642-0041 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor added:
app-20140909165642-0041/0 on
worker-20140902113555-plogs005.sjc.domain.com-7078
(plogs005.sjc.domain.com:7078) with 24 cores 
14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20140909165642-0041/0 on hostPort plogs005.sjc.domain.com:7078 with
24 cores, 1024.0 MB RAM 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor added:
app-20140909165642-0041/1 on
worker-20140902113555-plogs006.sjc.domain.com-7078
(plogs006.sjc.domain.com:7078) with 24 cores 
14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20140909165642-0041/1 on hostPort plogs006.sjc.domain.com:7078 with
24 cores, 1024.0 MB RAM 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor added:
app-20140909165642-0041/2 on
worker-20140902113556-plogs004.sjc.domain.com-7078
(plogs004.sjc.domain.com:7078) with 24 cores 
14/09/09 16:56:42 INFO cluster.SparkDeploySchedulerBackend: Granted executor
ID app-20140909165642-0041/2 on hostPort plogs004.sjc.domain.com:7078 with
24 cores, 1024.0 MB RAM 
14/09/09 16:56:42 INFO client.AppClient$ClientActor: Executor updated:
app-20140909165642-0041/2 is n

serialization changes -- OOM

2014-09-09 Thread Manku Timma
Has anything changed in the last 30 days w.r.t serialization? I had 620MB
of compressed data which used to get serialized-in-spark-memory with 4GB
executor memory. Now it fails to get serialized in memory even at 10GB of
executor memory.

-- Bharath


Re: Cannot run SimpleApp as regular Java app

2014-09-09 Thread ericacm
Hi Yana - 

I added the following to spark-class:

echo RUNNER: $RUNNER
echo CLASSPATH: $CLASSPATH
echo JAVA_OPTS: $JAVA_OPTS
echo '$@': $@

Here's the output:

$ ./spark-submit --class experiments.SimpleApp --master
spark://myhost.local:7077
/IdeaProjects/spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar

Spark assembly has been built with Hive, including Datanucleus jars on
classpath

RUNNER:
/Library/Java/JavaVirtualMachines/jdk1.7.0_13.jdk/Contents/Home/bin/java

CLASSPATH:
::/dev/spark-1.0.2-bin-hadoop2/conf:/dev/spark-1.0.2-bin-hadoop2/lib/spark-assembly-1.0.2-hadoop2.2.0.jar:/dev/spark-1.0.2-bin-hadoop2/lib/datanucleus-api-jdo-3.2.1.jar:/dev/spark-1.0.2-bin-hadoop2/lib/datanucleus-core-3.2.2.jar:/dev/spark-1.0.2-bin-hadoop2/lib/datanucleus-rdbms-3.2.1.jar

JAVA_OPTS: -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m

$@: org.apache.spark.deploy.SparkSubmit --class experiments.SimpleApp
--master spark://myhost.local:7077
/IdeaProjects/spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar

The differences I can see in the code that runs via my standalone Java app:
- Does not have -Djava.library.path=  (should not make a difference)
- Main class is org.apache.spark.executor.CoarseGrainedExecutorBackend
instead of org.apache.spark.deploy.SparkSubmit (should not make a
difference)
- My jar's classes are directly available when running via spark-submit (it
runs the Jar so it they will be in the main classloader) but they are only
available via conf.setJars() in the standalone Java app.  But they should be
available indirectly in the classloader that is created in the executor:

14/09/08 10:04:06 INFO Executor: Adding
file:/dev/spark-1.0.2-bin-hadoop2/work/app-20140908100358-0002/1/./spark-experiments-1.0-SNAPSHOT.jar
to class loader

I've been assuming that my conf.setJars() is the proper way to provide my
code to Spark.  

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-run-SimpleApp-as-regular-Java-app-tp13695p13842.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to run python examples in spark 1.1?

2014-09-09 Thread freedafeng
I'm mostly interested in the hbase examples in the repo. I saw two examples
hbase_inputformat.py and hbase_outputformat.py in the 1.1 branch. Can you
show me how to run them? 

Compile step is done. I tried to run the examples, but failed. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-run-python-examples-in-spark-1-1-tp13841.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Table not found: using jdbc console to query sparksql hive thriftserver

2014-09-09 Thread alexandria1101
Hi,

I want to use the sparksql thrift server in my application and make sure
everything is loading and working. I built Spark 1.1 SNAPSHOT and ran the
thrift server using ./sbin/start-thrift-server.  In my application I load
tables into schemaRDDs and I expect that the thrift-server should pick them
up.   In the app I then perform SQL queries on a table called mutation (the
same name as the table I registered from the schemaRDD).

I set the driver to "org.apache.hive.jdbc.HiveDriver" and the url to
"jdbc:hive2://localhost:1/mutation?zeroDateTimeBehavior=convertToNull".

When I check the terminal for the thrift server output, it gets the query. 
However, I cannot use a jdbc console to communicate with it to show all of
the databases and tables to see if mutation is loaded.


I get the following errors:

14/09/09 16:51:02 WARN component.AbstractLifeCycle: FAILED
SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already
in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:444)
at sun.nio.ch.Net.bind(Net.java:436)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at
org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at
org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at
org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.eclipse.jetty.server.Server.doStart(Server.java:293)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at
org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:192)
at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202)
at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442)
at 
org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202)
at org.apache.spark.ui.WebUI.bind(WebUI.scala:102)
at org.apache.spark.SparkContext.(SparkContext.scala:224)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:53)
at com.illumina.phoenix.util.Runner.createSparkContext(Runner.java:144)
at
com.illumina.phoenix.etl.EtlPipelineRunner.main(EtlPipelineRunner.java:116)
1053 [main] WARN org.eclipse.jetty.util.component.AbstractLifeCycle  -
FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address
already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:444)
at sun.nio.ch.Net.bind(Net.java:436)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
at
org.eclipse.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187)
at
org.eclipse.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316)
at
org.eclipse.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at org.eclipse.jetty.server.Server.doStart(Server.java:293)
at
org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
at
org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:192)
at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202)
at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:202)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1446)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1442)
at 
org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:202)
at org.apache.spark.ui.WebUI.bind(WebUI.scala:102)
at org.apache.spark.SparkContext.(SparkContext.scala:224)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:53)
at com.illumina.phoenix.util.Runner.createSparkContext(Runner.java:144)
at
com.illumina.phoenix.etl.EtlPipelineRunner.main(EtlPipelineRunner.java:116)
14/09/09 16:51:02 WARN component.AbstractLifeCycle: FAILED
org.eclipse.jetty.server.Server@35241119: java.net.BindExcep

Spark + AccumuloInputFormat

2014-09-09 Thread Russ Weeks
Hi,

I'm trying to execute Spark SQL queries on top of the AccumuloInputFormat.
Not sure if I should be asking on the Spark list or the Accumulo list, but
I'll try here. The problem is that the workload to process SQL queries
doesn't seem to be distributed across my cluster very well.

My Spark SQL app is running in yarn-client mode. The query I'm running is
"select count(*) from audit_log" (or a similarly simple query) where my
audit_log table has 14.3M rows, 504M key value pairs spread fairly evenly
across 8 tablet servers. Looking at the Accumulo monitor app, I only ever
see a maximum of 2 tablet servers with active scans. Since the data is
spread across all the tablet servers, I hoped to see 8!

I realize there are a lot of moving parts here but I'd any advice about
where to start looking.

Using Spark 1.0.1 with Accumulo 1.6.

Thanks!
-Russ


Re: Crawler and Scraper with different priorities

2014-09-09 Thread Peng Cheng
Hi Sandeep,

would you be interesting in joining my open source project?

https://github.com/tribbloid/spookystuff

IMHO spark is indeed not for general purpose crawling, of which distributed
job is highly homogeneous. But good enough for directional scraping which
involves heterogeneous input and deep graph following & extraction. Please
drop me a line if you have a user case, as I'll try to integrate it as a
feature.

Yours Peng



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Crawler-Scraper-with-different-priorities-tp13645p13838.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Deregistered receiver for stream 0: Stopped by driver

2014-09-09 Thread Sing Yip
When I stop Spark Streaming Context by calling stop(), I always get the 
following error:

ERROR Deregistered receiver for stream 0: Stopped by driver  
class=org.apache.spark.streaming.scheduler.ReceiverTracker
WARN Stopped executor without error  
class=org.apache.spark.streaming.receiver.ReceiverSupervisorImpl
WARN All of the receivers have not deregistered, Map(0 -> 
ReceiverInfo(0,EventReceiver-0,null,false,localhost,Stopped by driver,))  
class=org.apache.spark.streaming.scheduler.ReceiverTracker

Is there a way to avoid these error and warning messages?

Thanks,
-Sing

Spark caching questions

2014-09-09 Thread Vladimir Rodionov
Hi, users

1. Disk based cache eviction policy? The same LRU?

2. What is the scope of a cached RDD? Does it survive application? What
happen if I run Java app next time? Will RRD be created or read from cache?

If , answer is YES, then ...


3. Is there are any way to invalidate cached RDD automatically? RDD
partitions? Some API kind of : RDD.isValid()?

4. HadoopRDD InputFormat - based. Some partitions (splits) may become
invalid in cache. Can we reload only those partitions? Into cache?

-Vladimir


Re: clarification for some spark on yarn configuration options

2014-09-09 Thread Andrew Or
Hi Greg,

SPARK_EXECUTOR_INSTANCES is the total number of workers in the cluster. The
equivalent "spark.executor.instances" is just another way to set the same
thing in your spark-defaults.conf. Maybe this should be documented. :)

"spark.yarn.executor.memoryOverhead" is just an additional margin added to
"spark.executor.memory" for the container. In addition to the executor's
memory, the container in which the executor is launched needs some extra
memory for system processes, and this is what this "overhead" (somewhat of
a misnomer) is for. The same goes for the driver equivalent.

"spark.driver.memory" behaves differently depending on which version of
Spark you are using. If you are using Spark 1.1+ (this was released very
recently), you can directly set "spark.driver.memory" and this will take
effect. Otherwise, setting this doesn't actually do anything for client
deploy mode, and you have two alternatives: (1) set the environment
variable equivalent SPARK_DRIVER_MEMORY in spark-env.sh, and (2) if you are
using Spark submit (or bin/spark-shell, or bin/pyspark, which go through
bin/spark-submit), pass the "--driver-memory" command line argument.

If you want your PySpark application (driver) to pick up extra class path,
you can pass the "--driver-class-path" to Spark submit. If you are using
Spark 1.1+, you may set "spark.driver.extraClassPath" in your
spark-defaults.conf. There is also an environment variable you could set
(SPARK_CLASSPATH), though this is now deprecated.

Let me know if you have more questions about these options,
-Andrew


2014-09-08 6:59 GMT-07:00 Greg Hill :

>  Is SPARK_EXECUTOR_INSTANCES the total number of workers in the cluster
> or the workers per slave node?
>
>  Is spark.executor.instances an actual config option?  I found that in a
> commit, but it's not in the docs.
>
>  What is the difference between spark.yarn.executor.memoryOverhead and 
> spark.executor.memory
> ?  Same question for the 'driver' variant, but I assume it's the same
> answer.
>
>  Is there a spark.driver.memory option that's undocumented or do you have
> to use the environment variable SPARK_DRIVER_MEMORY?
>
>  What config option or environment variable do I need to set to get
> pyspark interactive to pick up the yarn class path?  The ones that work for
> spark-shell and spark-submit don't seem to work for pyspark.
>
>  Thanks in advance.
>
>  Greg
>


Re: spark-streaming "Could not compute split" exception

2014-09-09 Thread Marcelo Vanzin
Your executor is exiting or crashing unexpectedly:

On Tue, Sep 9, 2014 at 3:13 PM, Penny Espinoza
 wrote:
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
> code from container container_1410224367331_0006_01_03 is : 1
> 2014-09-09 21:47:26,345 WARN
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor:
> Exception from container-launch with container ID:
> container_1410224367331_0006_01_03 and exit code: 1

You can check the app logs (yarn logs --applicationId [id]) and see
why the container is exiting. There's probably an exception happening
somewhere.


-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Yarn Driver OOME (Java heap space) when executors request map output locations

2014-09-09 Thread Kostas Sakellis
Hey,

If you are interested in more details there is also a thread about this
issue here:
http://apache-spark-developers-list.1001551.n3.nabble.com/Eliminate-copy-while-sending-data-any-Akka-experts-here-td7127.html

Kostas

On Tue, Sep 9, 2014 at 3:01 PM, jbeynon  wrote:

> Thanks Marcelo, that looks like the same thing. I'll follow the Jira ticket
> for updates.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Driver-OOME-Java-heap-space-when-executors-request-map-output-locations-tp13827p13829.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark-streaming "Could not compute split" exception

2014-09-09 Thread Penny Espinoza
The node manager log looks like this - not exactly sure what this means, but 
the container messages seem to indicate there is still plenty of memory.

2014-09-09 21:47:00,718 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 26819 for container-id 
container_1410224367331_0006_01_03: 319.7 MB of 1.5 GB physical memory 
used; 1.7 GB of 7.5 GB virtual memory used
2014-09-09 21:47:03,728 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 26819 for container-id 
container_1410224367331_0006_01_03: 321.2 MB of 1.5 GB physical memory 
used; 1.7 GB of 7.5 GB virtual memory used
2014-09-09 21:47:06,736 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 26819 for container-id 
container_1410224367331_0006_01_03: 321.5 MB of 1.5 GB physical memory 
used; 1.7 GB of 7.5 GB virtual memory used
2014-09-09 21:47:09,743 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 26819 for container-id 
container_1410224367331_0006_01_03: 321.5 MB of 1.5 GB physical memory 
used; 1.7 GB of 7.5 GB virtual memory used
2014-09-09 21:47:12,755 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 26819 for container-id 
container_1410224367331_0006_01_03: 321.9 MB of 1.5 GB physical memory 
used; 1.7 GB of 7.5 GB virtual memory used
2014-09-09 21:47:15,762 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 26819 for container-id 
container_1410224367331_0006_01_03: 322.8 MB of 1.5 GB physical memory 
used; 1.7 GB of 7.5 GB virtual memory used
2014-09-09 21:47:18,430 INFO 
org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl: Sending out 
status for container: container_id { app_attempt_id { application_id { id: 6 
cluster_timestamp: 1410224367331 } attemptId: 1 } id: 3 } state: C_RUNNING 
diagnostics: "" exit_status: -1000
2014-09-09 21:47:18,769 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 26819 for container-id 
container_1410224367331_0006_01_03: 322.8 MB of 1.5 GB physical memory 
used; 1.7 GB of 7.5 GB virtual memory used
2014-09-09 21:47:21,777 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 26819 for container-id 
container_1410224367331_0006_01_03: 322.8 MB of 1.5 GB physical memory 
used; 1.7 GB of 7.5 GB virtual memory used
2014-09-09 21:47:24,784 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl:
 Memory usage of ProcessTree 26819 for container-id 
container_1410224367331_0006_01_03: 324.1 MB of 1.5 GB physical memory 
used; 1.7 GB of 7.5 GB virtual memory used
2014-09-09 21:47:26,345 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code 
from container container_1410224367331_0006_01_03 is : 1
2014-09-09 21:47:26,345 WARN 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception 
from container-launch with container ID: container_1410224367331_0006_01_03 
and exit code: 1
org.apache.hadoop.util.Shell$ExitCodeException:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:464)
at org.apache.hadoop.util.Shell.run(Shell.java:379)
at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:283)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:79)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
2014-09-09 21:47:26,345 INFO 
org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor:
2014-09-09 21:47:26,345 WARN 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Container exited with a non-zero exit code 1
2014-09-09 21:47:26,346 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: 
Container container_1410224367331_0006_01_03 transitioned from RUNNING to 
EXITED_WITH_FAILURE
2014-09-09 21:47:26,346 INFO 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
 Cleaning up container 

Re: Yarn Driver OOME (Java heap space) when executors request map output locations

2014-09-09 Thread jbeynon
Thanks Marcelo, that looks like the same thing. I'll follow the Jira ticket
for updates.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Driver-OOME-Java-heap-space-when-executors-request-map-output-locations-tp13827p13829.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Yarn Driver OOME (Java heap space) when executors request map output locations

2014-09-09 Thread Marcelo Vanzin
Hi,

Yes, this is a problem, and I'm not aware of any simple workarounds
(or complex one for that matter). There are people working to fix
this, you can follow progress here:
https://issues.apache.org/jira/browse/SPARK-1239

On Tue, Sep 9, 2014 at 2:54 PM, jbeynon  wrote:
> I'm running on Yarn with relatively small instances with 4gb memory. I'm not
> caching any data but when the map stage ends and shuffling begins all of the
> executors request the map output locations at the same time which seems to
> kill the driver when the number of executors is turned up.
>
> For example, the "size of output statuses" is about 10mb and with 500
> executors the driver appears to be making 500 (5gb of data) copies of this
> data to send out and running out of memory. When I turn down the number of
> executors everything runs fine.
>
> Has anyone else run into this? Maybe I'm misunderstanding the underlying
> cause. I don't have a copy of the stack trace handy but can recreate it if
> necessary. It was somewhere in the  for HeapByteBuffer. Any advice
> would be helpful.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Driver-OOME-Java-heap-space-when-executors-request-map-output-locations-tp13827.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Yarn Driver OOME (Java heap space) when executors request map output locations

2014-09-09 Thread jbeynon
I'm running on Yarn with relatively small instances with 4gb memory. I'm not
caching any data but when the map stage ends and shuffling begins all of the
executors request the map output locations at the same time which seems to
kill the driver when the number of executors is turned up.

For example, the "size of output statuses" is about 10mb and with 500
executors the driver appears to be making 500 (5gb of data) copies of this
data to send out and running out of memory. When I turn down the number of
executors everything runs fine.

Has anyone else run into this? Maybe I'm misunderstanding the underlying
cause. I don't have a copy of the stack trace handy but can recreate it if
necessary. It was somewhere in the  for HeapByteBuffer. Any advice
would be helpful.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-Driver-OOME-Java-heap-space-when-executors-request-map-output-locations-tp13827.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark.cleaner.ttl and spark.streaming.unpersist

2014-09-09 Thread Luis Ángel Vicente Sánchez
The executors of my spark streaming application are being killed due to
memory issues. The memory consumption is quite high on startup because is
the first run and there are quite a few events on the kafka queues that are
consumed at a rate of 100K events per sec.

I wonder if it's recommended to use spark.cleaner.ttl and
spark.streaming.unpersist together to mitigate that problem. And I also
wonder if new RDD are being batched while a RDD is being processed.

Regards,

Luis


Re: spark-streaming "Could not compute split" exception

2014-09-09 Thread Marcelo Vanzin
This has all the symptoms of Yarn killing your executors due to them
exceeding their memory limits. Could you check your RM/NM logs to see
if that's the case?

(The error was because of an executor at
domU-12-31-39-0B-F1-D1.compute-1.internal, so you can check that NM's
log file.)

If that's the case, you'll need to play with the
"spark.yarn.executor.memoryOverhead" config (see
http://spark.apache.org/docs/latest/running-on-yarn.html).

On Tue, Sep 9, 2014 at 1:13 PM, Penny Espinoza
 wrote:
> Hey - I have a Spark 1.0.2 job (using spark-streaming-kafka) that runs
> successfully using master = local[4].  However, when I run it on a Hadoop
> 2.2 EMR cluster using master yarn-client, it fails after running for about 5
> minutes.  My main method does something like this:
>
> gets streaming context
> creates a DStream from KafkaUtils.createStream (let’s call this a)
> creates another DStream from a.flatMap (let’s call this b)
> creates another DStream from b.updateStateByKey (c)
> creates another DStream from c.flatMap (d)
> runs d.foreachRDD, and uses Tuplejump’s Calliope cql3SaveToCassandra to save
> data to Cassandra
>
>
> Here’s the error message:
>
> 14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to
> java.lang.Exception
> java.lang.Exception: Could not compute split, block input-0-1410293154000
> not found
> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at
> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
> at org.apache.spark.scheduler.Task.run(Task.scala:51)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
>
> According to logs, the block referenced in the error message was added to
> memory only about 10 seconds before this error appears, and there is no
> evidence in the log of any blocks being cleared from memory, or of it
> running out of memory.  Below is a snippet of the log:
>
> 14/09/09 20:05:54 INFO storage.BlockManagerInfo: Added input-0-1410293154000
> in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 214.4
> KB, free: 490.8 MB)
> … ( several other messages like the one above, but for different blocks)
> 14/09/09 20:05:58 INFO storage.BlockManagerInfo: Added input-0-1410293158000
> in memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 3.1 MB,
> free: 457.3 MB)
> 14/09/09 20:06:00 INFO scheduler.ReceiverTracker: Stream 0 received 21
> blocks
> 14/09/09 20:06:00 INFO dstream.StateDStream: Persisting RDD 57 for time
> 141029316 ms to StorageLevel(false, true, false, false, 1) at time
> 141029316 ms
> 14/09/09 20:06:00 INFO dstream.StateDStream: Marking RDD 57 for time
> 141029316 ms for checkpointing at time 141029316 ms
> 14/09/09 20:06:00 INFO scheduler.JobScheduler: Added jobs for time
> 141029316 ms
> 14/09/09 20:06:00 INFO scheduler.JobGenerator: Checkpointing graph for time
> 141029316 ms
> 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updating checkpoint data for
> time 141029316 ms
> 14/09/09 20:06:00 INFO streaming.DStreamGraph: Updated checkpoint data for
> time 141029316 ms
> 14/09/09 20:06:00 INFO scheduler.JobScheduler: Starting job streaming job
> 141029316 ms.0 from job set of time 141029316 ms
> 14/09/09 20:06:00 INFO streaming.CheckpointWriter: Saving checkpoint for
> time 141029316 ms to file
> 'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-141029316'
> 14/09/09 20:06:00 INFO spark.SparkContext: Starting job:
> saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Registering RDD 54 (flatMap
> at FlatMappedDStream.scala:35)
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Got job 12
> (saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) with 2 output
> partitions (allowLocal=false)
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Final stage: Stage
> 23(saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203)
> 14/09/09 20:06:00 INFO scheduler.DAGScheduler: Parents of final stage:
> List(Stage 24)
> 14/09/09 20:06:00 INFO

Spark HiveQL support plan

2014-09-09 Thread XUE, Xiaohui

Hi,

In Spark website, there’s a plan to support HiveQL on top of Spark SQL and also 
to support JDBC/ODBC.

I would like to know if in this “HiveQL” supported by Spark (or Spark SQL 
accessible through JDBC/ODBC), is there a plan to add extensions to leverage 
different Spark features like machine learning and stream?
I’m asking this as we’re considering using Spark from remote machine, so this 
would be easier through JDBC/ODBC. But it’ll be good to also take benefice of 
other Spark features than the ones exist in HiveQL.

Thanks,
Xiaohui Xue


Distributed Deep Learning Workshop with Scala, Akka, and Spark

2014-09-09 Thread Alexy Khrabrov
On September 25-26, SF Scala teams up with Adam Gibson, the creator of
deeplearning4j.org, to teach the first ever Distributed Deep Learning
with Scala Akka, and Spark workshop.  Deep Learning is enabling
break-through advances in the areas such as image recognition and
natural language processing.  It is a renaissance of neural networks,
made possible by the vast quantities of data to train them, so that
they match human performance on a variety of tasks.  Deep Learning is
causing a lot of interest across a variety startups understanding
domains backed by “big" data, as it’s one of the “easiest” ways to
make sense out of data — once you put the algorithms and systems in
place making it possible.  It turns out that in order to take
advantage of the “big” data available, you need to scale out properly
— and this is where Scala, Akka, and Spark shine.  We’ll present a
distributed system approach which makes you, armed with Scala stack, a
rival of Google.  As we’re offering this course for the first time,
we’re keeping the early bird pricing longer for this workshop.
Seating is very limited as we’ll work collaboratively.  Please sign up
at

http://bythebay.ticketleap.com/deep-learning-september-2014/

SF Scala is also offering the Typesafe-certified course, Fast Track to Scala.

http://bythebay.ticketleap.com/fast-track-to-scala/

Scala is underpinning multiple Machine Learning stacks such as
FACTORIE, ScalaNLP, and has fantastic numerical libraries such as
Breeze, Spire, and Saddle.  Scala is at the heart of Spark and BDAS.
It is the choice of data scientists who want their prototypes work at
web-scale and be ready for production.  This hands-on workshop will
get you from zero to Scala hero in just two days.

If you have any questions about these courses, please email
train...@bythebay.io.

A+ (Alexy)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: prepending jars to the driver class path for spark-submit on YARN

2014-09-09 Thread Penny Espinoza
I finally seem to have gotten past this issue.  Here’s what I did:

  *   rather than using the binary distribution, I built Spark from scratch to 
eliminate the 4.1 version of org.apache.httpcomponents from the assembly
 *   git clone https://github.com/apache/spark.git
 *   cd spark
 *   git checkout v1.0.2
 *   edited pom.xml to remove the modules sql/hive and examples
 *   export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M 
-XX:ReservedCodeCacheSize=512m”
 *   mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean 
package
  *   rebuilt my own assembly, eliminating all exclusions I had previously 
included to force use of org.apache.httpcomponents 4.1

On Sep 8, 2014, at 12:03 PM, Penny Espinoza 
mailto:pesp...@societyconsulting.com>> wrote:

I have tried using the spark.files.userClassPathFirst option (which, 
incidentally, is documented now, but marked as experimental), but it just 
causes different errors.  I am using spark-streaming-kafka.  If I mark 
spark-core and spark-streaming as provided and also exclude them from the 
spark-streaming-kafka dependency, I get this error:

14/09/08 18:34:23 WARN scheduler.TaskSetManager: Loss was due to 
java.lang.ClassCastException
java.lang.ClassCastException: cannot assign instance of 
com.oncue.rna.realtime.streaming.spark.BaseKafkaExtractorJob$$anonfun$getEventsStream$1
 to fie
ld org.apache.spark.rdd.MappedRDD.f of type scala.Function1 in instance of 
org.apache.spark.rdd.MappedRDD
   at 
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
   at 
java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
   at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
   at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
   at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
   at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
   at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:606)
   at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
   at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
   at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
   at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
   at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
   at java.io.ObjectInputStream.readObje

spark-streaming "Could not compute split" exception

2014-09-09 Thread Penny Espinoza
Hey - I have a Spark 1.0.2 job (using spark-streaming-kafka) that runs 
successfully using master = local[4].  However, when I run it on a Hadoop 2.2 
EMR cluster using master yarn-client, it fails after running for about 5 
minutes.  My main method does something like this:


  1.  gets streaming context
  2.  creates a DStream from KafkaUtils.createStream (let’s call this a)
  3.  creates another DStream from a.flatMap (let’s call this b)
  4.  creates another DStream from b.updateStateByKey (c)
  5.  creates another DStream from c.flatMap (d)
  6.  runs d.foreachRDD, and uses Tuplejump’s Calliope cql3SaveToCassandra to 
save data to Cassandra

Here’s the error message:

14/09/09 20:06:04 WARN scheduler.TaskSetManager: Loss was due to 
java.lang.Exception
java.lang.Exception: Could not compute split, block input-0-1410293154000 not 
found
at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

According to logs, the block referenced in the error message was added to 
memory only about 10 seconds before this error appears, and there is no 
evidence in the log of any blocks being cleared from memory, or of it running 
out of memory.  Below is a snippet of the log:

14/09/09 20:05:54 INFO storage.BlockManagerInfo: Added input-0-1410293154000 in 
memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 214.4 KB, 
free: 490.8 MB)
… ( several other messages like the one above, but for different blocks)
14/09/09 20:05:58 INFO storage.BlockManagerInfo: Added input-0-1410293158000 in 
memory on domU-12-31-39-07-B0-11.compute-1.internal:55682 (size: 3.1 MB, free: 
457.3 MB)
14/09/09 20:06:00 INFO scheduler.ReceiverTracker: Stream 0 received 21 blocks
14/09/09 20:06:00 INFO dstream.StateDStream: Persisting RDD 57 for time 
141029316 ms to StorageLevel(false, true, false, false, 1) at time 
141029316 ms
14/09/09 20:06:00 INFO dstream.StateDStream: Marking RDD 57 for time 
141029316 ms for checkpointing at time 141029316 ms
14/09/09 20:06:00 INFO scheduler.JobScheduler: Added jobs for time 
141029316 ms
14/09/09 20:06:00 INFO scheduler.JobGenerator: Checkpointing graph for time 
141029316 ms
14/09/09 20:06:00 INFO streaming.DStreamGraph: Updating checkpoint data for 
time 141029316 ms
14/09/09 20:06:00 INFO streaming.DStreamGraph: Updated checkpoint data for time 
141029316 ms
14/09/09 20:06:00 INFO scheduler.JobScheduler: Starting job streaming job 
141029316 ms.0 from job set of time 141029316 ms
14/09/09 20:06:00 INFO streaming.CheckpointWriter: Saving checkpoint for time 
141029316 ms to file 
'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-141029316'
14/09/09 20:06:00 INFO spark.SparkContext: Starting job: saveAsNewAPIHadoopFile 
at CassandraRDDFunctions.scala:203
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Registering RDD 54 (flatMap at 
FlatMappedDStream.scala:35)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Got job 12 
(saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203) with 2 output 
partitions (allowLocal=false)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Final stage: Stage 
23(saveAsNewAPIHadoopFile at CassandraRDDFunctions.scala:203)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Parents of final stage: 
List(Stage 24)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Missing parents: List(Stage 24)
14/09/09 20:06:00 INFO scheduler.DAGScheduler: Submitting Stage 24 
(FlatMappedRDD[54] at flatMap at FlatMappedDStream.scala:35), which has no 
missing parents
14/09/09 20:06:00 INFO streaming.CheckpointWriter: Deleting 
hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-141029311.bk
14/09/09 20:06:00 INFO streaming.CheckpointWriter: Checkpoint for time 
141029316 ms saved to file 
'hdfs://10.240.49.124:9000/user/hadoop/checkpoint/checkpoint-141029316', 
took 4351 bytes and 55 ms
14/09/09 20:06:00 INFO streaming.D

Re: Spark processes not doing on killing corresponding YARN application

2014-09-09 Thread didata
I figured out this issue (in our case) ...And I'll vent a little in my reply
here... =:)Fedora's well-intentioned firewall (firewall-cmd) requires you to
open (enable) any port/services on a host that you need to connect to
(including SSH/22 - which is enabled by default, of course). So when
launching client applications that use ephemeral ports to connect back to
(as a Spark App does for remote YARN ResourceManager/NodeManagers to connect
back to), you can't know what that port will be to enable it, unless the
application allows you to specify that as a launch property (which you can
for Spark Apps via -- -Dspark.driver.port="N").Again, well intentioned,
but always a pain.So... you have to either disable the firewall capability
in Fedora; or you open/enable a range of ports and tell your applications to
use one of those.Also note that as of this writing, firewall-cmd's ability
to port-forwarding from the HOST to GUESTS in Libvirt/KVM-based
Hadoop/YARN/HDFS test/dev clusters, doesn't work (it never has -- it's on
the TODO list). It's another capability that you'll need in order to reach
daemon ports running *inside* the KVM cluster (for example, UI ports). The
work-around here (besides, again, disabling the Fedora Firewall altogether)
is to use same-subnet BRIDGING (not NAT-ting). Doing that will eliminate the
need for port-forawrding (which again doesn't work). I've filed bugs in the
past for this.So that is why YARN applications weren't terminating correctly
for Spark Aps, or for that matter working at all since it uses ephemeral
ports (by necessity).So whatever the port your Spark application uses,
remember to issue the command:use@driverHost$ sudo firewall-cmd
--zone=public --add-port=/SparkAppPort//tcpor, better yet, use a
port-deterministic strategy mentioned earlier.(Hopefully the verbosity here
will help someone in their furute search. Fedora aside, the original problem
here can be network related, as I discovered).sincerely,didata



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-processes-not-doing-on-killing-corresponding-YARN-application-tp13443p13819.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

spark on yarn history server + hdfs permissions issue

2014-09-09 Thread Greg Hill
I am running Spark on Yarn with the HDP 2.1 technical preview.  I'm having 
issues getting the spark history server permissions to read the spark event 
logs from hdfs.  Both sides are configured to write/read logs from:

hdfs:///apps/spark/events

The history server is running as user spark, the jobs are running as user 
lavaqe.  Both users are in the  hdfs group on all the nodes in the cluster.

That root logs folder is globally writeable, but owned by the spark user:

drwxrwxrwx   - spark hdfs  0 2014-09-09 18:19 /apps/spark/events

All good so far.  Spark jobs create subfolders and put their event logs in 
there just fine.  The problem is that the history server, running as the spark 
user, cannot read those logs.  They're written as the user that initiates the 
job, but still in the same hdfs group:

drwxrwx---   - lavaqe hdfs  0 2014-09-09 19:24 
/apps/spark/events/spark-pi-1410290714996

The files are group readable/writable, but this is the error I get:

Permission denied: user=spark, access=READ_EXECUTE, 
inode="/apps/spark/events/spark-pi-1410290714996":lavaqe:hdfs:drwxrwx---

So, two questions, I guess:

1. Do group permissions just plain not work in hdfs or am I missing something?
2. Is there a way to tell Spark to log with more permissive permissions so the 
history server can read the generated logs?

Greg


Re: streaming: code to simulate a network socket data source

2014-09-09 Thread danilopds
I utilize this code in separated but the program block in this character:
val socket = listener.accept()

Do you have any suggestion?
Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-code-to-simulate-a-network-socket-data-source-tp3431p13817.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Problem in running mosek in spark cluster - java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738)

2014-09-09 Thread Yana Kadiyska
If that library has native dependencies you'd need to make sure that the
native code is on all boxes and in the path with export
SPARK_LIBRARY_PATH=...

On Tue, Sep 9, 2014 at 10:17 AM, ayandas84  wrote:

>  We have a small apache spark cluster of 6 computers. We are trying to
> solve
> a distributed problem which requires solving a optimization problem at each
> machine during a spark map operation.
>
> We decided to use mosek as the solver and I collected an academic license
> to
> this end. We observed that mosek works fine in a single system. However,
> when we prepare a jar file, include the mosek.jar into the library and try
> to run the jar in the cluster as a spark job it gives errors.
>
> java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path
>
> Does this problem has any thing to do with the license? We have set the
> necessary path variables i n the profile of the user in the master machine
> but we are not sure about what changes needs to be made to the other
> machines in the cluster.
>
> We shall be greatly obliged if you please suggest the necessary solution
> and
> help us out.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-running-mosek-in-spark-cluster-java-lang-UnsatisfiedLinkError-no-mosekjava7-0-in-java-lib-tp13799.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Cannot run SimpleApp as regular Java app

2014-09-09 Thread Yana Kadiyska
spark-submit is a script which calls spark-class script. Can you output the
command that spark-class runs (say, by putting set -x before the very last
line?). You should see the java command that is being run. The scripts do
some parameter setting so it's possible you're missing something. It seems
to me you think your worker memory is 2G but the executor is clearly
launched with -Xms512M" "-Xmx512M"...so that's all you'd get.

On Mon, Sep 8, 2014 at 10:16 AM, ericacm  wrote:

> Dear all:
>
> I am a brand new Spark user trying out the SimpleApp from the Quick Start
> page.
>
> Here is the code:
>
> object SimpleApp {
>   def main(args: Array[String]) {
> val logFile = "/dev/spark-1.0.2-bin-hadoop2/README.md" // Should be
> some
> file on your system
> val conf = new SparkConf()
> .setAppName("Simple Application")
> .set("spark.executor.memory", "512m")
> .setMaster("spark://myhost.local:7077")
>
>
> .setJars(Seq("/spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar"))
> val sc = new SparkContext(conf)
> try {
>   val logData = sc.textFile(logFile, 2).cache()
>   val numAs = logData.filter(line => line.contains("a")).count()
>   val numBs = logData.filter(line => line.contains("b")).count()
>   println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
> } finally {
>   sc.stop()
> }
>   }
> }
>
> I am using Spark 1.0.2 and Scala 2.10.4.  In spark-env.sh I have
> SPARK_WORKER_MEMORY=2g.
>
> I am trying to run this as a standalone Java app in my IDE.
>
> Note that this code *does* work when I either
> - Change the master to "local" (works running from IDE)
> - Run it using spark-submit
>
> The application/driver log is:
>
> 14/09/08 10:03:55 INFO spark.SecurityManager: Changing view acls to: eric
> 14/09/08 10:03:55 INFO spark.SecurityManager: SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(eric)
> 14/09/08 10:03:56 INFO slf4j.Slf4jLogger: Slf4jLogger started
> 14/09/08 10:03:56 INFO Remoting: Starting remoting
> 14/09/08 10:03:56 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://spark@10.0.1.5:61645]
> 14/09/08 10:03:56 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://spark@10.0.1.5:61645]
> 14/09/08 10:03:56 INFO spark.SparkEnv: Registering MapOutputTracker
> 14/09/08 10:03:56 INFO spark.SparkEnv: Registering BlockManagerMaster
> 14/09/08 10:03:56 INFO storage.DiskBlockManager: Created local directory at
>
> /var/folders/j1/5rzyf1x97q9_7gj3mdc79t3cgn/T/spark-local-20140908100356-2496
> 14/09/08 10:03:56 INFO storage.MemoryStore: MemoryStore started with
> capacity 279.5 MB.
> 14/09/08 10:03:56 INFO network.ConnectionManager: Bound socket to port
> 61646
> with id = ConnectionManagerId(10.0.1.5,61646)
> 14/09/08 10:03:56 INFO storage.BlockManagerMaster: Trying to register
> BlockManager
> 14/09/08 10:03:56 INFO storage.BlockManagerInfo: Registering block manager
> 10.0.1.5:61646 with 279.5 MB RAM
> 14/09/08 10:03:56 INFO storage.BlockManagerMaster: Registered BlockManager
> 14/09/08 10:03:56 INFO spark.HttpServer: Starting HTTP Server
> 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031
> 14/09/08 10:03:57 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:61647
> 14/09/08 10:03:57 INFO broadcast.HttpBroadcast: Broadcast server started at
> http://10.0.1.5:61647
> 14/09/08 10:03:57 INFO spark.HttpFileServer: HTTP File server directory is
>
> /var/folders/j1/5rzyf1x97q9_7gj3mdc79t3cgn/T/spark-d5637279-5caa-4c14-a00f-650f1dd915bc
> 14/09/08 10:03:57 INFO spark.HttpServer: Starting HTTP Server
> 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031
> 14/09/08 10:03:57 INFO server.AbstractConnector: Started
> SocketConnector@0.0.0.0:61648
> 14/09/08 10:03:57 INFO server.Server: jetty-8.1.14.v20131031
> 14/09/08 10:03:57 INFO server.AbstractConnector: Started
> SelectChannelConnector@0.0.0.0:4040
> 14/09/08 10:03:57 INFO ui.SparkUI: Started SparkUI at http://10.0.1.5:4040
> 2014-09-08 10:03:57.567 java[58736:1703] Unable to load realm info from
> SCDynamicStore
> 14/09/08 10:03:57 INFO spark.SparkContext: Added JAR
> /spark-experiments/target/spark-experiments-1.0-SNAPSHOT.jar at
> http://10.0.1.5:61648/jars/spark-experiments-1.0-SNAPSHOT.jar with
> timestamp
> 1410185037723
> 14/09/08 10:03:57 INFO client.AppClient$ClientActor: Connecting to master
> spark://myhost.local:7077...
> 14/09/08 10:03:57 INFO storage.MemoryStore: ensureFreeSpace(32960) called
> with curMem=0, maxMem=293063884
> 14/09/08 10:03:57 INFO storage.MemoryStore: Block broadcast_0 stored as
> values to memory (estimated size 32.2 KB, free 279.5 MB)
> 14/09/08 10:03:58 INFO cluster.SparkDeploySchedulerBackend: Connected to
> Spark cluster with app ID app-20140908100358-0002
> 14/09/08 10:03:58 INFO client.AppClient$ClientActor: Executor added:
> app-20140908100358-0002/0 on worker-20140908100129-10.0.1.5-61526
> (10.0.1.5:61526) with 8 cor

Re: streaming: code to simulate a network socket data source

2014-09-09 Thread danilopds
Hello Diana,

How can I include this implementation in my code, in terms of start this
task together the NetworkWordCount.

In my case, I have a directory with several files.

Then,
I include this line:
  StreamingDataGenerator.streamingGenerator(NetPort, BytesSecond, DirFiles)

But the program stays in my loop of files.
And after returning to NetworkWordCount.

Can you suggest me something to start these tasks in parallel?
Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/streaming-code-to-simulate-a-network-socket-data-source-tp3431p13814.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: PySpark on Yarn - how group by data properly

2014-09-09 Thread Davies Liu
On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets  wrote:
> Hi ,
>
>I came from map/reduce background and try to do quite trivial thing:
>
> I have a lot of files ( on hdfs ) - format is :
>
>1 , 2 , 3
>2 , 3 , 5
>1 , 3,  5
> 2, 3 , 4
> 2 , 5, 1
>
>   I am actually need to group by key (first column) :
>   key   values
>   1 --> (2,3),(3,5)
>   2 --> (3,5),(3,4),(5,1)
>
>   and I need to process (pass)  values to the function f ( my custom
> function)
>   outcome of  function f()  should be  to hdfs with corresponding key:
> 1 --> f() outcome
> 2 --> f() outcome.
>
> My code is :
>
>   def doSplit(x):
> y = x.split(',')
> if(len(y)==3):
>return  y[0],(y[1],y[2])
>
>
> lines = sc.textFile(filename,1)
> counts = lines.map(doSplit).groupByKey()
> output = counts.collect()
>
> for (key, value) in output:
> print 'build model for key ->' , key
> print value
> f(str(key) , value))
>
>
> Questions:
>1) lines.map(doSplit).groupByKey() - I didn't  find the option to use
> groupByKey( f() ) to process grouped values? how can I process grouped keys
> by custom function? function f has some not trivial logic.

The result of groupByKey() is still RDD with (key, ResultIterable(values)),
so you can continue to call map() or mapValues() on it:

lines.map(doSplit).groupByKey().map(f)

But your `f` need two parameters, the map() will assume that `f`
take one parameter, so you need to build a wrapper for `f`:

lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs))

If the `f` only accept values as list, then you need to convert `vs` into list:

result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, list(vs)))

finally, you could save the `result` into HDFS:

result.saveAsPickleFile(path, batch=1024)

> 2) Using output ( I really don't like this approach )  to pass to
> function looks like not scalable and executed only on one machine?  What is
> the way using PySpark process grouped keys in distributed fashion.
> Multiprocessing and on different machine of the cluster.
>
> 3)In case of  processing output how data can be stored on hdfs?

Currently, it's not easy to access files in HDFS, you could do it by

sc.parallelize(local_data).map(str).saveAsTextFile()

> Thanks
> Oleg.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accuracy hit in classification with Spark

2014-09-09 Thread jatinpreet
I have also ran some tests on the other algorithms available with MLlib but
got dismal accuracy. Is the method of creating LabeledPoint RDD different
for other algorithms such as, LinearRegressionWithSGD?

Any help is appreciated.



-
Novice Big Data Programmer
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p13812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accuracy hit in classification with Spark

2014-09-09 Thread jatinpreet
Thanks for the information Xiangrui. I am using the following example to
classify documents.

http://chimpler.wordpress.com/2014/06/11/classifiying-documents-using-naive-bayes-on-apache-spark-mllib/

I am not sure if this is the best way to convert textual data into vectors.
Can you please confirm if this is the ideal solution as I could not identify
any shortcomings.

Also, I am splitting the data into 70/30 sets, which is same for Mahout so
it should not have an impact on accuracy.

Thanks,
Jatin




-
Novice Big Data Programmer
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p13811.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RDD memory questions

2014-09-09 Thread Davies Liu
On Tue, Sep 9, 2014 at 10:07 AM, Boxian Dong  wrote:
> I currently working on a machine learning project, which require the RDDs'
> content to be (mostly partially) updated during each iteration. Because the
> program will be converted directly from "traditional" python object-oriented
> code, the content of the RDD will be modified in the mapping function. To
> test the functionality and memory , I writed a testing program:
>
>class TestClass(object):
> def __init__(self):
> self.data = []
>
> def setup(self):
> self.data = range(2)
> return self
>
> def addNumber(self, number):
> length = len(self.data)
> for i in range(length):
> self.data[i] += number
> return self
>
> def sumUp(self):
> totoal = 0
> for n in self.data:
> totoal += n
> return totoal
>
> and Spark main:
>
> origData = []
> for i in range(50):
> origData.append((i, TestClass()))
> # create the RDD and cache it
> rddData = sc.parallelize(origData).mapValues(lambda v : v.setup())
> rddData.cache()
>
> # modifying the content of RDD in map function
> scD = rddData
> for i in range(100):
>  scD = scD.mapValues(lambda v : v.addNumber(10))
>
> scDSum = scD.map(lambda (k, v) : v.sumUp())
> v = scDSum.reduce(lambda a, b: a + b)
>
> print " -- after the transfermation, the value is ", v
>
> scDSum = rddData .map(lambda (k, v) : v.sumUp())
> v = scDSum.reduce(lambda a, b: a + b)
>
> print " -- after the transformation, the cached value is ", v
>
>   - By judging the results, it seems to me that when the RDDs is cached, the
> directed modification doesn't affect the content
>   - By the monitoring of the memory usage, it seems to me that the memory
> are not duplicated during each RDD (narrow dependence) transformation (or I
> am wrong)
>
> therefore, my question is:
>   - how the cache works, does it make a copy of the data separately ?
>   - How the memory is managed in the MAP function? (in narrow dependence)
> Are the entire RDDs first duplicated, modified and then assigned to the new
> RDDs, and afterward the old RDDs will be deleted from the memory. Or the new
> RDDs will reuse the same memory of the old RDDs, without the
> duplication/copy of the memory?

I'm trying to answer some of your questions:

The RDD is cached in JVM (after serialized by pickle). In Python, it
reads the serialized
data from socket then deserialized it into Python objects, call mapper
or reducer on
them, finally sending them back to JVM via socket. The Python process only hold
a batch of objects, so the memory usage will be smaller than the whole
partition.

The cache in JVM is created after first iteration of them. So when you
process them
the second time (or even more), they will be read from cache in JVM.

RDDs are read only, you can not modify them, each transformation will create new
RDDs. During MAP function, the objects in RDDs are throwed away after accessing,
any modification to them will be lost.

>   - If the new generated RDDs directly use the memory of the old RDDs (in
> narrow dependence) , why the cached RDDs still reserve old content. Are the
> cached RDDs treated differently from uncached RDDs in memory management.

There is no two RDDs sharing any memory, they are totally separated.

>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: groupBy gives non deterministic results

2014-09-09 Thread Davies Liu
Which version of Spark are you using?

This bug had been fixed in 0.9.2, 1.0.2 and 1.1, could you upgrade to
one of these versions
to verify it?

Davies

On Tue, Sep 9, 2014 at 7:03 AM, redocpot  wrote:
> Thank you for your replies.
>
> More details here:
>
> The prog is executed on local mode (single node). Default env params are
> used.
>
> The test code and the result are in this gist:
> https://gist.github.com/coderh/0147467f0b185462048c
>
> Here is 10 first lines of the data: 3 fields each row, the delimiter is ";"
>
> 3801959;11775022;118
> 3801960;14543202;118
> 3801984;11781380;20
> 3801984;13255417;20
> 3802003;11777557;91
> 3802055;11781159;26
> 3802076;11782793;102
> 3802086;17881551;102
> 3802087;19064728;99
> 3802105;12760994;99
> ...
>
> There are 27 partitions(small files). Total size is about 100 Mb.
>
> We find that this problem is highly probably caused by the bug SPARK-2043:
> https://issues.apache.org/jira/browse/SPARK-2043
>
> Could someone give more details on this bug ?
>
> The pull request say:
>
> The current implementation reads one key with the next hash code as it
> finishes reading the keys with the current hash code, which may cause it to
> miss some matches of the next key. This can cause operations like join to
> give the wrong result when reduce tasks spill to disk and there are hash
> collisions, as values won't be matched together. This PR fixes it by not
> reading in that next key, using a peeking iterator instead.
>
> I don't understand why reading a key with the next hash code will cause it
> to miss some matches of the next key. If someone could show me some code to
> dig in, it's highly appreciated. =)
>
> Thank you.
>
> Hao.
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13797.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filter function problem

2014-09-09 Thread Burak Yavuz
Hi,

val test = persons.value
  .map{tuple => (tuple._1, tuple._2
  .filter{event => *inactiveIDs.filter(event2 => event2._1 == 
tuple._1).count() != 0})}

Your problem is right between the asterisk. You can't make an RDD operation 
inside an RDD operation, because RDD's can't be serialized. 
Therefore you are receiving the NullPointerException. Try joining the RDDs 
based on `event` and then filter based on that.

Best,
Burak

- Original Message -
From: "Blackeye" 
To: u...@spark.incubator.apache.org
Sent: Tuesday, September 9, 2014 3:34:58 AM
Subject: Re: Filter function problem

In order to help anyone to answer i could say that i checked the
inactiveIDs.filter operation seperated, and I found that it doesn't return
null in any case. In addition i don't how to handle (or check) whether a RDD
is null. I find the debugging to complicated to point the error. Any ideas
how to find the null pointer? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Filter-function-problem-tp13787p13789.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filter function problem

2014-09-09 Thread Daniel Siegmann
You should not be broadcasting an RDD. You also should not be passing an
RDD in a lambda to another RDD. If you want, can call RDD.collect and then
broadcast those values (of course you must be able to fit all those values
in memory).

On Tue, Sep 9, 2014 at 6:34 AM, Blackeye  wrote:

> In order to help anyone to answer i could say that i checked the
> inactiveIDs.filter operation seperated, and I found that it doesn't return
> null in any case. In addition i don't how to handle (or check) whether a
> RDD
> is null. I find the debugging to complicated to point the error. Any ideas
> how to find the null pointer?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Filter-function-problem-tp13787p13789.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegm...@velos.io W: www.velos.io


Re: If for YARN you use 'spark.yarn.jar', what is the LOCAL equivalent to that property ...

2014-09-09 Thread Marcelo Vanzin
Yes, that's how file: URLs are interpreted everywhere in Spark. (It's also
explained in the link to the docs I posted earlier.)

The second interpretation below is "local:" URLs in Spark, but that doesn't
work with Yarn on Spark 1.0 (so it won't work with CDH 5.1 and older
either).

On Mon, Sep 8, 2014 at 6:00 PM, Dimension Data, LLC. <
subscripti...@didata.us> wrote:

>  Even when using 'file:///...' nomenclature in SPARK_JAR (instead of
> through the yet-to-be implemented 'spark.yarn.jar'\
> property), it's interpretation still seems to be:
>
>'Tell me where the local spark jar is located so that I can upload it
> (i.e. hdfs dfs -put) to a HDFS staging area for you'.
>   -as oppoese to-
>'Tell me where the local spark jar is located on the NMs, and I will
> look for it at that UNIX path'.
>

-- 
Marcelo


RDD memory questions

2014-09-09 Thread Boxian Dong
I currently working on a machine learning project, which require the RDDs'
content to be (mostly partially) updated during each iteration. Because the
program will be converted directly from "traditional" python object-oriented
code, the content of the RDD will be modified in the mapping function. To
test the functionality and memory , I writed a testing program:

   class TestClass(object):
def __init__(self):
self.data = []

def setup(self):
self.data = range(2)
return self

def addNumber(self, number):
length = len(self.data)
for i in range(length):
self.data[i] += number
return self

def sumUp(self):
totoal = 0
for n in self.data:
totoal += n
return totoal

and Spark main:

origData = []
for i in range(50):
origData.append((i, TestClass()))
# create the RDD and cache it   
rddData = sc.parallelize(origData).mapValues(lambda v : v.setup())
rddData.cache()

# modifying the content of RDD in map function 
scD = rddData
for i in range(100):
 scD = scD.mapValues(lambda v : v.addNumber(10))

scDSum = scD.map(lambda (k, v) : v.sumUp())
v = scDSum.reduce(lambda a, b: a + b)

print " -- after the transfermation, the value is ", v

scDSum = rddData .map(lambda (k, v) : v.sumUp())
v = scDSum.reduce(lambda a, b: a + b)

print " -- after the transformation, the cached value is ", v

  - By judging the results, it seems to me that when the RDDs is cached, the
directed modification doesn't affect the content
  - By the monitoring of the memory usage, it seems to me that the memory
are not duplicated during each RDD (narrow dependence) transformation (or I
am wrong)

therefore, my question is:
  - how the cache works, does it make a copy of the data separately ? 
  - How the memory is managed in the MAP function? (in narrow dependence)
Are the entire RDDs first duplicated, modified and then assigned to the new
RDDs, and afterward the old RDDs will be deleted from the memory. Or the new
RDDs will reuse the same memory of the old RDDs, without the
duplication/copy of the memory?
  - If the new generated RDDs directly use the memory of the old RDDs (in
narrow dependence) , why the cached RDDs still reserve old content. Are the
cached RDDs treated differently from uncached RDDs in memory management. 






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-memory-questions-tp13805.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark on Yarn - how group by data properly

2014-09-09 Thread Oleg Ruchovets
Hi ,

   I came from map/reduce background and try to do quite trivial thing:

I have a lot of files ( on hdfs ) - format is :

   1 , 2 , 3
   2 , 3 , 5
   1 , 3,  5
2, 3 , 4
2 , 5, 1

  I am actually need to group by key (first column) :
  key   values
  1 --> (2,3),(3,5)
  2 --> (3,5),(3,4),(5,1)

  and I need to process (pass)  values to the function f ( my custom
function)
  outcome of  function f()  should be  to hdfs with corresponding key:
1 --> f() outcome
2 --> f() outcome.

My code is :

  def doSplit(x):
y = x.split(',')
if(len(y)==3):
   return  y[0],(y[1],y[2])


lines = sc.textFile(filename,1)
counts = lines.map(doSplit).groupByKey()
output = counts.collect()

for (key, value) in output:
print 'build model for key ->' , key
print value
f(str(key) , value))


Questions:
   1) lines.map(doSplit).groupByKey() - I didn't  find the option to use
groupByKey( f() ) to process grouped values? how can I process grouped keys
by custom function? function f has some not trivial logic.

2) Using output ( I really don't like this approach )  to pass to
function looks like not scalable and executed only on one machine?  What is
the way using PySpark process grouped keys in distributed fashion.
Multiprocessing and on different machine of the cluster.

3)In case of  processing output how data can be stored on hdfs?

Thanks
Oleg.


Re: Querying a parquet file in s3 with an ec2 install

2014-09-09 Thread Jim Carroll
Okay,

This seems to be either a code version issue or a communication issue. It
works if I execute the spark shell from the master node. It doesn't work if
I run it from my laptop and connect to the master node. 

I had opened the ports for the WebUI (8080) and the cluster manager (7077)
for the master node or it fails much sooner. Do I need to open up the ports
for the workers as well?

I used the spark-ec2 install script with --spark-version using both 1.0.2
and then again with the git hash tag that corresponds to 1.1.0rc4
(2f9b2bd7844ee8393dc9c319f4fefedf95f5e460). In both cases I rebuilt from
source using the same codebase on my machine and moved the entire project
into /root/spark (since to run the spark-shell it needs to match the same
path as the install on ec2). Could I have missed something here?

Thanks.
Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13802.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Huge matrix

2014-09-09 Thread Reza Zadeh
Better to do it in a PR of your own, it's not sufficiently related to dimsum

On Tue, Sep 9, 2014 at 7:03 AM, Debasish Das 
wrote:

> Cool...can I add loadRowMatrix in your PR ?
>
> Thanks.
> Deb
>
> On Tue, Sep 9, 2014 at 1:14 AM, Reza Zadeh  wrote:
>
>> Hi Deb,
>>
>> Did you mean to message me instead of Xiangrui?
>>
>> For TS matrices, dimsum with positiveinfinity and computeGramian have the
>> same cost, so you can do either one. For dense matrices with say, 1m
>> columns this won't be computationally feasible and you'll want to start
>> sampling with dimsum.
>>
>> It would be helpful to have a loadRowMatrix function, I would use it.
>>
>> Best,
>> Reza
>>
>> On Tue, Sep 9, 2014 at 12:05 AM, Debasish Das 
>> wrote:
>>
>>> Hi Xiangrui,
>>>
>>> For tall skinny matrices, if I can pass a similarityMeasure to
>>> computeGrammian, I could re-use the SVD's computeGrammian for similarity
>>> computation as well...
>>>
>>> Do you recommend using this approach for tall skinny matrices or just
>>> use the dimsum's routines ?
>>>
>>> Right now RowMatrix does not have a loadRowMatrix function like the one
>>> available in LabeledPoint...should I add one ? I want to export the matrix
>>> out from my stable code and then test dimsum...
>>>
>>> Thanks.
>>> Deb
>>>
>>>
>>>
>>> On Fri, Sep 5, 2014 at 9:43 PM, Reza Zadeh  wrote:
>>>
 I will add dice, overlap, and jaccard similarity in a future PR,
 probably still for 1.2


 On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das 
 wrote:

> Awesome...Let me try it out...
>
> Any plans of putting other similarity measures in future (jaccard is
> something that will be useful) ? I guess it makes sense to add some
> similarity measures in mllib...
>
>
> On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh 
> wrote:
>
>> Yes you're right, calling dimsum with gamma as PositiveInfinity turns
>> it into the usual brute force algorithm for cosine similarity, there is 
>> no
>> sampling. This is by design.
>>
>>
>> On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das <
>> debasish.da...@gmail.com> wrote:
>>
>>> I looked at the code: similarColumns(Double.posInf) is generating
>>> the brute force...
>>>
>>> Basically dimsum with gamma as PositiveInfinity will produce the
>>> exact same result as doing catesian products of RDD[(product, vector)] 
>>> and
>>> computing similarities or there will be some approximation ?
>>>
>>> Sorry I have not read your paper yet. Will read it over the weekend.
>>>
>>>
>>>
>>> On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh 
>>> wrote:
>>>
 For 60M x 10K brute force and dimsum thresholding should be fine.

 For 60M x 10M probably brute force won't work depending on the
 cluster's power, and dimsum thresholding should work with appropriate
 threshold.

 Dimensionality reduction should help, and how effective it is will
 depend on your application and domain, it's worth trying if the direct
 computation doesn't work.

 You can also try running KMeans clustering (perhaps after
 dimensionality reduction) if your goal is to find batches of similar 
 points
 instead of all pairs above a threshold.




 On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das <
 debasish.da...@gmail.com> wrote:

> Also for tall and wide (rows ~60M, columns 10M), I am considering
> running a matrix factorization to reduce the dimension to say ~60M x 
> 50 and
> then run all pair similarity...
>
> Did you also try similar ideas and saw positive results ?
>
>
>
> On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das <
> debasish.da...@gmail.com> wrote:
>
>> Ok...just to make sure I have RowMatrix[SparseVector] where rows
>> are ~ 60M and columns are 10M say with billion data points...
>>
>> I have another version that's around 60M and ~ 10K...
>>
>> I guess for the second one both all pair and dimsum will run
>> fine...
>>
>> But for tall and wide, what do you suggest ? can dimsum handle it
>> ?
>>
>> I might need jaccard as well...can I plug that in the PR ?
>>
>>
>>
>> On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh 
>> wrote:
>>
>>> You might want to wait until Wednesday since the interface will
>>> be changing in that PR before Wednesday, probably over the weekend, 
>>> so that
>>> you don't have to redo your code. Your call if you need it before a 
>>> week.
>>> Reza
>>>
>>>
>>> On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das <
>>> debasish.da...@gmail.com> wrote:
>>>
 Ohh 

Re: Accuracy hit in classification with Spark

2014-09-09 Thread Xiangrui Meng
If you are using the Mahout's Multinomial Naive Bayes, it should be
the same as MLlib's. I tried MLlib with news20.scale downloaded from
http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass.html
and the test accuracy is 82.4%. -Xiangrui

On Tue, Sep 9, 2014 at 4:58 AM, jatinpreet  wrote:
> Hi,
>
> I tried running the classification program on the famous newsgroup data.
> This had an even more drastic effect on the accuracy, as it dropped from
> ~82% in Mahout to ~72% in Spark MLlib.
>
> Please help me in this regard as I have to use Spark in a production system
> very soon and this is a blocker for me.
>
> Thanks,
> Jatin
>
>
>
> -
> Novice Big Data Programmer
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p13793.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Problem in running mosek in spark cluster - java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1738)

2014-09-09 Thread ayandas84
 We have a small apache spark cluster of 6 computers. We are trying to solve
a distributed problem which requires solving a optimization problem at each
machine during a spark map operation.

We decided to use mosek as the solver and I collected an academic license to
this end. We observed that mosek works fine in a single system. However,
when we prepare a jar file, include the mosek.jar into the library and try
to run the jar in the cluster as a spark job it gives errors.

java.lang.UnsatisfiedLinkError: no mosekjava7_0 in java.library.path

Does this problem has any thing to do with the license? We have set the
necessary path variables i n the profile of the user in the master machine
but we are not sure about what changes needs to be made to the other
machines in the cluster.

We shall be greatly obliged if you please suggest the necessary solution and
help us out.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-running-mosek-in-spark-cluster-java-lang-UnsatisfiedLinkError-no-mosekjava7-0-in-java-lib-tp13799.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Huge matrix

2014-09-09 Thread Debasish Das
Cool...can I add loadRowMatrix in your PR ?

Thanks.
Deb

On Tue, Sep 9, 2014 at 1:14 AM, Reza Zadeh  wrote:

> Hi Deb,
>
> Did you mean to message me instead of Xiangrui?
>
> For TS matrices, dimsum with positiveinfinity and computeGramian have the
> same cost, so you can do either one. For dense matrices with say, 1m
> columns this won't be computationally feasible and you'll want to start
> sampling with dimsum.
>
> It would be helpful to have a loadRowMatrix function, I would use it.
>
> Best,
> Reza
>
> On Tue, Sep 9, 2014 at 12:05 AM, Debasish Das 
> wrote:
>
>> Hi Xiangrui,
>>
>> For tall skinny matrices, if I can pass a similarityMeasure to
>> computeGrammian, I could re-use the SVD's computeGrammian for similarity
>> computation as well...
>>
>> Do you recommend using this approach for tall skinny matrices or just use
>> the dimsum's routines ?
>>
>> Right now RowMatrix does not have a loadRowMatrix function like the one
>> available in LabeledPoint...should I add one ? I want to export the matrix
>> out from my stable code and then test dimsum...
>>
>> Thanks.
>> Deb
>>
>>
>>
>> On Fri, Sep 5, 2014 at 9:43 PM, Reza Zadeh  wrote:
>>
>>> I will add dice, overlap, and jaccard similarity in a future PR,
>>> probably still for 1.2
>>>
>>>
>>> On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das 
>>> wrote:
>>>
 Awesome...Let me try it out...

 Any plans of putting other similarity measures in future (jaccard is
 something that will be useful) ? I guess it makes sense to add some
 similarity measures in mllib...


 On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh  wrote:

> Yes you're right, calling dimsum with gamma as PositiveInfinity turns
> it into the usual brute force algorithm for cosine similarity, there is no
> sampling. This is by design.
>
>
> On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das  > wrote:
>
>> I looked at the code: similarColumns(Double.posInf) is generating the
>> brute force...
>>
>> Basically dimsum with gamma as PositiveInfinity will produce the
>> exact same result as doing catesian products of RDD[(product, vector)] 
>> and
>> computing similarities or there will be some approximation ?
>>
>> Sorry I have not read your paper yet. Will read it over the weekend.
>>
>>
>>
>> On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh 
>> wrote:
>>
>>> For 60M x 10K brute force and dimsum thresholding should be fine.
>>>
>>> For 60M x 10M probably brute force won't work depending on the
>>> cluster's power, and dimsum thresholding should work with appropriate
>>> threshold.
>>>
>>> Dimensionality reduction should help, and how effective it is will
>>> depend on your application and domain, it's worth trying if the direct
>>> computation doesn't work.
>>>
>>> You can also try running KMeans clustering (perhaps after
>>> dimensionality reduction) if your goal is to find batches of similar 
>>> points
>>> instead of all pairs above a threshold.
>>>
>>>
>>>
>>>
>>> On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das <
>>> debasish.da...@gmail.com> wrote:
>>>
 Also for tall and wide (rows ~60M, columns 10M), I am considering
 running a matrix factorization to reduce the dimension to say ~60M x 
 50 and
 then run all pair similarity...

 Did you also try similar ideas and saw positive results ?



 On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das <
 debasish.da...@gmail.com> wrote:

> Ok...just to make sure I have RowMatrix[SparseVector] where rows
> are ~ 60M and columns are 10M say with billion data points...
>
> I have another version that's around 60M and ~ 10K...
>
> I guess for the second one both all pair and dimsum will run
> fine...
>
> But for tall and wide, what do you suggest ? can dimsum handle it ?
>
> I might need jaccard as well...can I plug that in the PR ?
>
>
>
> On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh 
> wrote:
>
>> You might want to wait until Wednesday since the interface will
>> be changing in that PR before Wednesday, probably over the weekend, 
>> so that
>> you don't have to redo your code. Your call if you need it before a 
>> week.
>> Reza
>>
>>
>> On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das <
>> debasish.da...@gmail.com> wrote:
>>
>>> Ohh coolall-pairs brute force is also part of this PR ? Let
>>> me pull it in and test on our dataset...
>>>
>>> Thanks.
>>> Deb
>>>
>>>
>>> On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh 
>>> wrote:
>>>
 Hi Deb,

 We are adding 

Re: groupBy gives non deterministic results

2014-09-09 Thread redocpot
Thank you for your replies.

More details here:

The prog is executed on local mode (single node). Default env params are
used.

The test code and the result are in this gist:
https://gist.github.com/coderh/0147467f0b185462048c

Here is 10 first lines of the data: 3 fields each row, the delimiter is ";"

3801959;11775022;118
3801960;14543202;118
3801984;11781380;20
3801984;13255417;20
3802003;11777557;91
3802055;11781159;26
3802076;11782793;102
3802086;17881551;102
3802087;19064728;99
3802105;12760994;99
...

There are 27 partitions(small files). Total size is about 100 Mb.

We find that this problem is highly probably caused by the bug SPARK-2043:
https://issues.apache.org/jira/browse/SPARK-2043

Could someone give more details on this bug ?

The pull request say: 

The current implementation reads one key with the next hash code as it
finishes reading the keys with the current hash code, which may cause it to
miss some matches of the next key. This can cause operations like join to
give the wrong result when reduce tasks spill to disk and there are hash
collisions, as values won't be matched together. This PR fixes it by not
reading in that next key, using a peeking iterator instead.

I don't understand why reading a key with the next hash code will cause it
to miss some matches of the next key. If someone could show me some code to
dig in, it's highly appreciated. =)

Thank you.

Hao.











--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13797.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark functionality similar to hadoop's RecordWriter close method

2014-09-09 Thread Sean Owen
You're mixing the Java and Scala APIs here. Your call to foreach() is
expecting a Scala function and you're giving it a Java Function.
Ideally you just use the Scala API, of course. Before explaining how
to actually use a Java function here, maybe clarify that you have to
do it and can't use Scala for some reason? since your declaration of
the Java function also seems like it isn't intentional -- Unit and Seq
are not Java types.

There is no callback, and no direct analog of RecordWriter.close(). If
you are writing a "foreach" function, then you want to use
"foreachPartition", and then after writing a partition's worth of
records, you could do something at the end of the function. This may
or may not suit your purpose as it is not necessarily called in the
same way that RecordWriter.close() was. But ideally you're not relying
on that kind of thing anyway.

On Tue, Sep 9, 2014 at 1:39 PM, robertberta  wrote:
> I want to call a function for batches of elements from an rdd
>
> val javaClass:org.apache.spark.api.java.function.Function[Seq[String],Unit]
> = new JavaClass()
> rdd.mapPartitions(_.grouped(5)).foreach(javaClass)
>
> 1.This worked fine in spark 0.9.1 , when we upgrade to spark 1.0.2 ,
> Function changed from class to interface and we get :
>
> type mismatch;
> found   : org.apache.spark.api.java.function.Function[Seq[String],Unit]
> required: Seq[String] => Unit
>
> We are using Java 1.7
> We use that class for one time initialization method call on each executor
> and for batch processing .
>
> 2. Previously on hadoop by RecordWriter.close() we get a callback method for
> every executor that processed map/reduce operations. We would like this in
> spark too , is it possible?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-functionality-similar-to-hadoop-s-RecordWriter-close-method-tp13795.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark functionality similar to hadoop's RecordWriter close method

2014-09-09 Thread robertberta
I want to call a function for batches of elements from an rdd

val javaClass:org.apache.spark.api.java.function.Function[Seq[String],Unit]
= new JavaClass()
rdd.mapPartitions(_.grouped(5)).foreach(javaClass)

1.This worked fine in spark 0.9.1 , when we upgrade to spark 1.0.2 ,
Function changed from class to interface and we get :

type mismatch;
found   : org.apache.spark.api.java.function.Function[Seq[String],Unit]
required: Seq[String] => Unit

We are using Java 1.7
We use that class for one time initialization method call on each executor
and for batch processing .

2. Previously on hadoop by RecordWriter.close() we get a callback method for
every executor that processed map/reduce operations. We would like this in
spark too , is it possible? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-functionality-similar-to-hadoop-s-RecordWriter-close-method-tp13795.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accuracy hit in classification with Spark

2014-09-09 Thread jatinpreet
Hi,

I tried running the classification program on the famous newsgroup data.
This had an even more drastic effect on the accuracy, as it dropped from
~82% in Mahout to ~72% in Spark MLlib.

Please help me in this regard as I have to use Spark in a production system
very soon and this is a blocker for me.

Thanks,
Jatin 



-
Novice Big Data Programmer
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p13793.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accuracy hit in classification with Spark

2014-09-09 Thread jatinpreet
Hi,

I tried running the classification program on the famous newsgroup data.
This had an even more drastic effect on the accuracy, as it dropped from
~82% in Mahout to ~72% in Spark MLlib.

Please help me in this regard as I have to use Spark in a production system
very soon and this is a blocker for me.

Thanks,
Jatin 



-
Novice Big Data Programmer
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773p13792.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Querying a parquet file in s3 with an ec2 install

2014-09-09 Thread Jim Carroll

>Why I think its the number of files is that I believe that a
> all of those or large part of those files are read when 
>you run sqlContext.parquetFile() and the time it would 
>take in s3 for that to happen is a lot so something 
>internally is timing out.. 

I'll create the parquet files with Drill instead of Spark which will give me
(somewhat) better control over the slice sizes and see what happens.

That said, this behavior seems wrong to me. First, exiting due to inactivity
on a job seems like (perhaps?) the wrong fix to a former problem.  Second,
there IS activity if it's reading the slice headers but the job is exiting
anyway. So if this fixes the problem the measure of "activity" seems wrong.

Ian and Manu, thanks for your help. I'll post back and let you know if that
fixes it.

Jim




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13791.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Querying a parquet file in s3 with an ec2 install

2014-09-09 Thread Jim Carroll
My apologies to the list. I replied to Manu's question and it went directly
to him rather than the list.

In case anyone else has this issue here is my reply and Manu's reply to me.
This also answers Ian's question.

---

Hi Manu,

The dataset is 7.5 million rows and 500 columns. In parquet form it's about
1.1 Gig. It was created with Spark and copied up to s3. It has about 4600
parts (which I'd also like to gain some control over). I can try a smaller
dataset, however it works when I run it locally, even with the file out on
s3. It just takes a while.

I can try copying it to HDFS first but that wont help longer term.

Thanks
Jim

-
Manu's response:
-

I am pretty sure it is due to the number of parts you have.. I have a
parquet data set that is 250M rows  and 924 columns and it is ~2500 files... 

I recommend creating a tables in HIve with that data set and doing an insert
overwrite so you can get a data set with more manageable files..

Why I think its the number of files is that I believe that a all of those or
large part of those files are read when you run sqlContext.parquetFile() and
the time it would take in s3 for that to happen is a lot so something
internally is timing out..

-Manu



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Querying-a-parquet-file-in-s3-with-an-ec2-install-tp13737p13790.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filter function problem

2014-09-09 Thread Blackeye
In order to help anyone to answer i could say that i checked the
inactiveIDs.filter operation seperated, and I found that it doesn't return
null in any case. In addition i don't how to handle (or check) whether a RDD
is null. I find the debugging to complicated to point the error. Any ideas
how to find the null pointer? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Filter-function-problem-tp13787p13789.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Filter function problem

2014-09-09 Thread Blackeye
I have the following code written in scala in Spark:

(inactiveIDs is a RDD[(Int, Seq[String])], persons is a Broadcast[RDD[(Int,
Seq[Event])]] and Event is a class that I have created)

val test = persons.value
  .map{tuple => (tuple._1, tuple._2
  .filter{event => inactiveIDs.filter(event2 => event2._1 ==
tuple._1).count() != 0})}

and the following error:

java.lang.NullPointerException

Any ideas?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Filter-function-problem-tp13787.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming: size of DStream

2014-09-09 Thread julyfire
yes, I agree to directly transform on DStream even there is no data injected
in this batch duration.
while my situation is :
Spark receive flume stream continurously, and I use updateStateByKey
function to collect data for a key among several batches, then I will handle
the collected data after waiting a specified time (which I use a counter to
measure) since the first time no data updated in the updateStateByKey
operation.  Normally, when the waiting time is up, I should collected all
data for a key. But if the flume data source is broken for a while, and if
this interval is over the waiting time, then I will only get partial data
for a key. So I need a way to determine whether current flume stream batch
contains data, if no, it means the flume data source is broken, then I can
skip the updateStateByKey operation, till the flume data source is
reconnected, then the counter in the updateStateByKey function can count
again. In this way I could get the intack data.

another question, why the count variable in map cannot work but it effects
in the foreachRDD in my previous code?

thanks :P 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13785.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming: size of DStream

2014-09-09 Thread Shao, Saisai
I think you should clarify some things in Spark Streaming:

1. closure in map is running in the remote side, so modify count var will only 
take effect in remote side. You will always get -1 in driver side.
2. some codes in closure in foreachRDD is lazily executed in each batch 
duration, while the if (...) code outside the closure is executed once 
immediately and will never executed again, so your code logic is wrong as 
expected.
3. I don't think you need to judge whether there is data feed in to do some 
transformations, you can directly transform on DStream even there is no data 
injected in this batch duration, it's only an empty transformation, no more 
specific overhead.

Thanks
Jerry

-Original Message-
From: julyfire [mailto:hellowe...@gmail.com] 
Sent: Tuesday, September 09, 2014 4:20 PM
To: u...@spark.incubator.apache.org
Subject: RE: Spark streaming: size of DStream

i'm sorry I have some error in my code, update here:

var count = -1L // a global variable in the main object 

val currentBatch = some_DStream
val countDStream = currentBatch.map(o=>{ 
  count = 0L  // reset the count variable in each batch 
  o 
})
countDStream.foreachRDD(rdd=> count += rdd.count())

if (count > 0) {
  currentBatch.map(...).someOtherTransformation
}

two problems:
1. the variable count just go on accumulate and no reset in each batch 2. 
if(count > 0) only evaluate in the beginning of running the program, so the 
next statement will never run

Can you all give me some suggestion? thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming: size of DStream

2014-09-09 Thread Luis Ángel Vicente Sánchez
If you take into account what streaming means in spark, your goal doesn't
really make sense; you have to assume that your streams are infinite and
you will have to process them till the end of the days. Operations on a
DStream define what you want to do with each element of each RDD, but spark
streaming is smart enough to not apply the transformations if RDD are empty.

The only time where you probably want to know the size of the RDD is when
you are going to perform a side-effect like storing something in a
database, using foreachRDD, i.e:

val flumeStream = ...

val transformedStream = flumeStream.map(... some transformation
...).flatMap(... some other transformation).distinct().

transformedStream.foreachRDD { rdd =>
  if (rdd.count() != 0) {
// perform some side effect that shouldn't be done if a transformed
batch is empty
  }
}

2014-09-09 9:20 GMT+01:00 julyfire :

> i'm sorry I have some error in my code, update here:
>
> var count = -1L // a global variable in the main object
>
> val currentBatch = some_DStream
> val countDStream = currentBatch.map(o=>{
>   count = 0L  // reset the count variable in each batch
>   o
> })
> countDStream.foreachRDD(rdd=> count += rdd.count())
>
> if (count > 0) {
>   currentBatch.map(...).someOtherTransformation
> }
>
> two problems:
> 1. the variable count just go on accumulate and no reset in each batch
> 2. if(count > 0) only evaluate in the beginning of running the program, so
> the next statement will never run
>
> Can you all give me some suggestion? thanks
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: groupBy gives non deterministic results

2014-09-09 Thread Ye Xianjin
Can you provide small sample or test data that reproduce this problem? and 
what's your env setup? single node or cluster?

Sent from my iPhone

> On 2014年9月8日, at 22:29, redocpot  wrote:
> 
> Hi,
> 
> I have a key-value RDD called rdd below. After a groupBy, I tried to count
> rows.
> But the result is not unique, somehow non deterministic.
> 
> Here is the test code:
> 
>  val step1 = ligneReceipt_cleTable.persist
>  val step2 = step1.groupByKey
> 
>  val s1size = step1.count
>  val s2size = step2.count
> 
>  val t = step2 // rdd after groupBy
> 
>  val t1 = t.count
>  val t2 = t.count
>  val t3 = t.count
>  val t4 = t.count
>  val t5 = t.count
>  val t6 = t.count
>  val t7 = t.count
>  val t8 = t.count
> 
>  println("s1size = " + s1size)
>  println("s2size = " + s2size)
>  println("1 => " + t1)
>  println("2 => " + t2)
>  println("3 => " + t3)
>  println("4 => " + t4)
>  println("5 => " + t5)
>  println("6 => " + t6)
>  println("7 => " + t7)
>  println("8 => " + t8)
> 
> Here are the results:
> 
> s1size = 5338864
> s2size = 5268001
> 1 => 5268002
> 2 => 5268001
> 3 => 5268001
> 4 => 5268002
> 5 => 5268001
> 6 => 5268002
> 7 => 5268002
> 8 => 5268001
> 
> Even if the difference is just one row, that's annoying.  
> 
> Any idea ?
> 
> Thank you.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming: size of DStream

2014-09-09 Thread julyfire
i'm sorry I have some error in my code, update here:

var count = -1L // a global variable in the main object 

val currentBatch = some_DStream 
val countDStream = currentBatch.map(o=>{ 
  count = 0L  // reset the count variable in each batch 
  o 
}) 
countDStream.foreachRDD(rdd=> count += rdd.count())

if (count > 0) {
  currentBatch.map(...).someOtherTransformation
}

two problems:
1. the variable count just go on accumulate and no reset in each batch
2. if(count > 0) only evaluate in the beginning of running the program, so
the next statement will never run

Can you all give me some suggestion? thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13781.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Huge matrix

2014-09-09 Thread Reza Zadeh
Hi Deb,

Did you mean to message me instead of Xiangrui?

For TS matrices, dimsum with positiveinfinity and computeGramian have the
same cost, so you can do either one. For dense matrices with say, 1m
columns this won't be computationally feasible and you'll want to start
sampling with dimsum.

It would be helpful to have a loadRowMatrix function, I would use it.

Best,
Reza

On Tue, Sep 9, 2014 at 12:05 AM, Debasish Das 
wrote:

> Hi Xiangrui,
>
> For tall skinny matrices, if I can pass a similarityMeasure to
> computeGrammian, I could re-use the SVD's computeGrammian for similarity
> computation as well...
>
> Do you recommend using this approach for tall skinny matrices or just use
> the dimsum's routines ?
>
> Right now RowMatrix does not have a loadRowMatrix function like the one
> available in LabeledPoint...should I add one ? I want to export the matrix
> out from my stable code and then test dimsum...
>
> Thanks.
> Deb
>
>
>
> On Fri, Sep 5, 2014 at 9:43 PM, Reza Zadeh  wrote:
>
>> I will add dice, overlap, and jaccard similarity in a future PR, probably
>> still for 1.2
>>
>>
>> On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das 
>> wrote:
>>
>>> Awesome...Let me try it out...
>>>
>>> Any plans of putting other similarity measures in future (jaccard is
>>> something that will be useful) ? I guess it makes sense to add some
>>> similarity measures in mllib...
>>>
>>>
>>> On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh  wrote:
>>>
 Yes you're right, calling dimsum with gamma as PositiveInfinity turns
 it into the usual brute force algorithm for cosine similarity, there is no
 sampling. This is by design.


 On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das 
 wrote:

> I looked at the code: similarColumns(Double.posInf) is generating the
> brute force...
>
> Basically dimsum with gamma as PositiveInfinity will produce the exact
> same result as doing catesian products of RDD[(product, vector)] and
> computing similarities or there will be some approximation ?
>
> Sorry I have not read your paper yet. Will read it over the weekend.
>
>
>
> On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh 
> wrote:
>
>> For 60M x 10K brute force and dimsum thresholding should be fine.
>>
>> For 60M x 10M probably brute force won't work depending on the
>> cluster's power, and dimsum thresholding should work with appropriate
>> threshold.
>>
>> Dimensionality reduction should help, and how effective it is will
>> depend on your application and domain, it's worth trying if the direct
>> computation doesn't work.
>>
>> You can also try running KMeans clustering (perhaps after
>> dimensionality reduction) if your goal is to find batches of similar 
>> points
>> instead of all pairs above a threshold.
>>
>>
>>
>>
>> On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das <
>> debasish.da...@gmail.com> wrote:
>>
>>> Also for tall and wide (rows ~60M, columns 10M), I am considering
>>> running a matrix factorization to reduce the dimension to say ~60M x 50 
>>> and
>>> then run all pair similarity...
>>>
>>> Did you also try similar ideas and saw positive results ?
>>>
>>>
>>>
>>> On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das <
>>> debasish.da...@gmail.com> wrote:
>>>
 Ok...just to make sure I have RowMatrix[SparseVector] where rows
 are ~ 60M and columns are 10M say with billion data points...

 I have another version that's around 60M and ~ 10K...

 I guess for the second one both all pair and dimsum will run fine...

 But for tall and wide, what do you suggest ? can dimsum handle it ?

 I might need jaccard as well...can I plug that in the PR ?



 On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh 
 wrote:

> You might want to wait until Wednesday since the interface will be
> changing in that PR before Wednesday, probably over the weekend, so 
> that
> you don't have to redo your code. Your call if you need it before a 
> week.
> Reza
>
>
> On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das <
> debasish.da...@gmail.com> wrote:
>
>> Ohh coolall-pairs brute force is also part of this PR ? Let
>> me pull it in and test on our dataset...
>>
>> Thanks.
>> Deb
>>
>>
>> On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh 
>> wrote:
>>
>>> Hi Deb,
>>>
>>> We are adding all-pairs and thresholded all-pairs via dimsum in
>>> this PR: https://github.com/apache/spark/pull/1778
>>>
>>> Your question wasn't entirely clear - does this answer it?
>>>
>>> Best,
>>> Reza
>>>
>>>
>>> On Fri, 

Re: Spark driver application can not connect to Spark-Master

2014-09-09 Thread niranda
Hi,

I had the same issue in my Java code while I was trying to connect to a
locally hosted spark server (using sbin/start-all.sh etc) using an IDE
(IntelliJ). 

I packaged my app into a jar and used spark-submit (in bin/) and it worked! 

Hope this helps

Rgds




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-driver-application-can-not-connect-to-Spark-Master-tp13226p13779.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming: size of DStream

2014-09-09 Thread julyfire
Thanks all,

yes, i did using foreachRDD, the following is my code:

var count = -1L // a global variable in the main object

val currentBatch = some_DStream
val countDStream = currentBatch.map(o=>{
  *count = 0L  *// reset the count variable in each batch
  o
})
countDStream.foreachRDD(rdd=>{println(s); s += rdd.count()})

the variable count stores the number of records of each batch, but it can't
be reset to 0.
I mean this statement *count = 0L *does not work. is my code right?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13778.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming: size of DStream

2014-09-09 Thread Shao, Saisai
Hi,

I think all the received stream will generate a RDD in each batch duration even 
there is no data feed in (an empty RDD will be generated). So you cannot use 
number of RDD to judge whether there is any data received.

One way is to do this in DStream/foreachRDD(), like

a.foreachRDD { r =>
if (r.count() == 0) {
   do something
  } else {
   do some other things.
  }
}

You can try it.

Thanks
Jerry


-Original Message-
From: julyfire [mailto:hellowe...@gmail.com] 
Sent: Tuesday, September 09, 2014 3:42 PM
To: u...@spark.incubator.apache.org
Subject: RE: Spark streaming: size of DStream

Hi Jerry,

Thanks for your reply.
I use spark streaming to receive the flume stream, then I need to do a 
judgement, in each batchDuration, if the received stream has data, then I 
should do something, if no data, do the other thing. Then I thought the
count() can give me the measure, but it returns a DStream, not a number.
so is there a way to achieve this case?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming: size of DStream

2014-09-09 Thread Sean Owen
How about calling foreachRDD, and processing whatever data is in each
RDD normally, and also keeping track within the foreachRDD function of
whether any RDD had a count() > 0? if not, then you can execute at the
end your alternate logic in the case of no data. I don't think you
want to operate at the DStream level.

On Tue, Sep 9, 2014 at 8:41 AM, julyfire  wrote:
> Hi Jerry,
>
> Thanks for your reply.
> I use spark streaming to receive the flume stream, then I need to do a
> judgement, in each batchDuration, if the received stream has data, then I
> should do something, if no data, do the other thing. Then I thought the
> count() can give me the measure, but it returns a DStream, not a number.
> so is there a way to achieve this case?
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming: size of DStream

2014-09-09 Thread julyfire
Hi Jerry,

Thanks for your reply.
I use spark streaming to receive the flume stream, then I need to do a
judgement, in each batchDuration, if the received stream has data, then I
should do something, if no data, do the other thing. Then I thought the
count() can give me the measure, but it returns a DStream, not a number.
so is there a way to achieve this case?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769p13775.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming: size of DStream

2014-09-09 Thread Shao, Saisai
Hi, 

Is there any specific scenario which needs to know the RDD numbers in the 
DStream? According to my knowledge DStream will generate one RDD in each right 
batchDuration, some old rdd will be remembered for windowing-like function, and 
will be removed when useless. The hashmap generatedRDDs in DStream.scala 
contains the rdd as you wanted, though you cannot call it from app. 

Besides the count() API returns the records number of this DStream's each RDD, 
not the number of RDD, the number of RDD should always be 1 as I understand.

Thanks
Jerry

-Original Message-
From: julyfire [mailto:hellowe...@gmail.com] 
Sent: Tuesday, September 09, 2014 2:42 PM
To: u...@spark.incubator.apache.org
Subject: Spark streaming: size of DStream

I want to implement the following logic:

val stream = getFlumeStream() // a DStream

if(size_of_stream > 0)  // if the DStream contains some RDD

  stream.someTransfromation

stream.count() can figure out the number of RDD in a DStream, but it return a 
DStream[Long] and can't compare with a number.

does anyone know how to get the number of RDD in a DStream?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-size-of-DStream-tp13769.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Accuracy hit in classification with Spark

2014-09-09 Thread jatinpreet
Hi,

I had been using Mahout's Naive Bayes algorithm to classify document data.
For a specific train and test set, I was getting accuracy in the range of
86%. When I shifted to Spark's MLlib, the accuracy dropped to the vicinity
of 82%.

I am using same version of Lucene and logic to generate TFIDF vectors. I
tried fiddling with the smoothing parameter but to no avail. 

My question is if the underlying algorithm is same in both Mahout and MLlib,
why this accuracy dip is being observed?



-
Novice Big Data Programmer
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Accuracy-hit-in-classification-with-Spark-tp13773.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: groupBy gives non deterministic results

2014-09-09 Thread Davies Liu
What's the type of the key?

If the hash of key is different across slaves, then you could get this confusing
results. We had met this similar results in Python, because of hash of None
is different across machines.

Davies

On Mon, Sep 8, 2014 at 8:16 AM, redocpot  wrote:
> Update:
>
> Just test with HashPartitioner(8) and count on each partition:
>
> List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
> *(5,657591*), (*6,658327*), (*7,658434*)),
> List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
> *(5,657594)*, (6,658326), (*7,658434*)),
> List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
> *(5,657592)*, (6,658326), (*7,658435*)),
> List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
> *(5,657591)*, (6,658326), (7,658434)),
> List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
> *(5,657592)*, (6,658326), (7,658435)),
> List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
> *(5,657592)*, (6,658326), (7,658435)),
> List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
> *(5,657592)*, (6,658326), (7,658435)),
> List((0,657824), (1,658549), (2,659199), (3,658684), (4,659394),
> *(5,657591)*, (6,658326), (7,658435))
>
> The result is not identical for each execution.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/groupBy-gives-non-deterministic-results-tp13698p13702.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Huge matrix

2014-09-09 Thread Debasish Das
Hi Xiangrui,

For tall skinny matrices, if I can pass a similarityMeasure to
computeGrammian, I could re-use the SVD's computeGrammian for similarity
computation as well...

Do you recommend using this approach for tall skinny matrices or just use
the dimsum's routines ?

Right now RowMatrix does not have a loadRowMatrix function like the one
available in LabeledPoint...should I add one ? I want to export the matrix
out from my stable code and then test dimsum...

Thanks.
Deb



On Fri, Sep 5, 2014 at 9:43 PM, Reza Zadeh  wrote:

> I will add dice, overlap, and jaccard similarity in a future PR, probably
> still for 1.2
>
>
> On Fri, Sep 5, 2014 at 9:15 PM, Debasish Das 
> wrote:
>
>> Awesome...Let me try it out...
>>
>> Any plans of putting other similarity measures in future (jaccard is
>> something that will be useful) ? I guess it makes sense to add some
>> similarity measures in mllib...
>>
>>
>> On Fri, Sep 5, 2014 at 8:55 PM, Reza Zadeh  wrote:
>>
>>> Yes you're right, calling dimsum with gamma as PositiveInfinity turns it
>>> into the usual brute force algorithm for cosine similarity, there is no
>>> sampling. This is by design.
>>>
>>>
>>> On Fri, Sep 5, 2014 at 8:20 PM, Debasish Das 
>>> wrote:
>>>
 I looked at the code: similarColumns(Double.posInf) is generating the
 brute force...

 Basically dimsum with gamma as PositiveInfinity will produce the exact
 same result as doing catesian products of RDD[(product, vector)] and
 computing similarities or there will be some approximation ?

 Sorry I have not read your paper yet. Will read it over the weekend.



 On Fri, Sep 5, 2014 at 8:13 PM, Reza Zadeh  wrote:

> For 60M x 10K brute force and dimsum thresholding should be fine.
>
> For 60M x 10M probably brute force won't work depending on the
> cluster's power, and dimsum thresholding should work with appropriate
> threshold.
>
> Dimensionality reduction should help, and how effective it is will
> depend on your application and domain, it's worth trying if the direct
> computation doesn't work.
>
> You can also try running KMeans clustering (perhaps after
> dimensionality reduction) if your goal is to find batches of similar 
> points
> instead of all pairs above a threshold.
>
>
>
>
> On Fri, Sep 5, 2014 at 8:02 PM, Debasish Das  > wrote:
>
>> Also for tall and wide (rows ~60M, columns 10M), I am considering
>> running a matrix factorization to reduce the dimension to say ~60M x 50 
>> and
>> then run all pair similarity...
>>
>> Did you also try similar ideas and saw positive results ?
>>
>>
>>
>> On Fri, Sep 5, 2014 at 7:54 PM, Debasish Das <
>> debasish.da...@gmail.com> wrote:
>>
>>> Ok...just to make sure I have RowMatrix[SparseVector] where rows are
>>> ~ 60M and columns are 10M say with billion data points...
>>>
>>> I have another version that's around 60M and ~ 10K...
>>>
>>> I guess for the second one both all pair and dimsum will run fine...
>>>
>>> But for tall and wide, what do you suggest ? can dimsum handle it ?
>>>
>>> I might need jaccard as well...can I plug that in the PR ?
>>>
>>>
>>>
>>> On Fri, Sep 5, 2014 at 7:48 PM, Reza Zadeh 
>>> wrote:
>>>
 You might want to wait until Wednesday since the interface will be
 changing in that PR before Wednesday, probably over the weekend, so 
 that
 you don't have to redo your code. Your call if you need it before a 
 week.
 Reza


 On Fri, Sep 5, 2014 at 7:43 PM, Debasish Das <
 debasish.da...@gmail.com> wrote:

> Ohh coolall-pairs brute force is also part of this PR ? Let me
> pull it in and test on our dataset...
>
> Thanks.
> Deb
>
>
> On Fri, Sep 5, 2014 at 7:40 PM, Reza Zadeh 
> wrote:
>
>> Hi Deb,
>>
>> We are adding all-pairs and thresholded all-pairs via dimsum in
>> this PR: https://github.com/apache/spark/pull/1778
>>
>> Your question wasn't entirely clear - does this answer it?
>>
>> Best,
>> Reza
>>
>>
>> On Fri, Sep 5, 2014 at 6:14 PM, Debasish Das <
>> debasish.da...@gmail.com> wrote:
>>
>>> Hi Reza,
>>>
>>> Have you compared with the brute force algorithm for similarity
>>> computation with something like the following in Spark ?
>>>
>>> https://github.com/echen/scaldingale
>>>
>>> I am adding cosine similarity computation but I do want to
>>> compute an all pair similarities...
>>>
>>> Note that the data is sparse for me (the data that goes to
>>> matrix factorization) so I don't think joining and group-by