Re: Spark Streaming Application Got killed after 2 hours

2014-11-16 Thread Prannoy
Hi Saj,

What is the size of the input data that you are putting on the stream ?
Have you tried running the same application with different set of data ?
Its weird that exactly after 2 hours the streaming stops. Try running the
same application with different data of different size to look if it has
something to do with memory issue. Can you also provide a detailed error
log .

Thanks.

On Sun, Nov 16, 2014 at 11:49 AM, SAJ [via Apache Spark User List] 
ml-node+s1001560n19021...@n3.nabble.com wrote:

 Hi All,
 I am trying to run spark streaming application to run for 24/7 but exactly
 after 2 hours it got killed.I have again tried but again it got killed in 2
 hours.Following are the error log in worker

 14/11/15 13:53:24 INFO network.ConnectionManager: Removing
 ReceivingConnection to ConnectionManagerId(ip-x,38863)
 14/11/15 13:53:24 INFO network.ConnectionManager: Removing
 SendingConnection to ConnectionManagerId(ip-x,38863)
 14/11/15 13:53:24 ERROR network.SendingConnection: Exception while reading
 SendingConnection to ConnectionManagerI

 Does anybody faced these same issue.
 Thanks  Regards,
 SAJ

 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Application-Got-killed-after-2-hours-tp19021.html
  To start a new topic under Apache Spark User List, email
 ml-node+s1001560n1...@n3.nabble.com
 To unsubscribe from Apache Spark User List, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=cHJhbm5veUBzaWdtb2lkYW5hbHl0aWNzLmNvbXwxfC0xNTI2NTg4NjQ2
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Application-Got-killed-after-2-hours-tp19021p19023.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

How to kill/upgrade/restart driver launched in Spark standalone cluster+supervised mode?

2014-11-16 Thread Jesper Lundgren
Hello,

I have a Spark Standalone cluster running in HA mode. I launched a
application using spark-submit with cluster and supervised mode enabled and
it launched sucessfully on one of the worker nodes.

How can I stop/restart/kill or otherwise manage such task running in a
standalone cluster? Seems there is no options in the web interface. I
wonder how I can upgrade my driver in the future.

Also, does supervised mode work across worker nodes? IE will it relaunch on
another node if the current one dies or does it only handle restart on same
node after driver crash?

I would love to hear others experience with this :)

Thanks!

(PS i am launching a Spark Streaming application)

// Jesper Lundgren


Re: SparkSQL exception on cached parquet table

2014-11-16 Thread Cheng Lian

(Forgot to cc user mail list)

On 11/16/14 4:59 PM, Cheng Lian wrote:

Hey Sadhan,

Thanks for the additional information, this is helpful. Seems that 
some Parquet internal contract was broken, but I'm not sure whether 
it's caused by Spark SQL or Parquet, or even maybe the Parquet file 
itself was damaged somehow. I'm investigating this. In the meanwhile, 
would you mind to help to narrow down the problem by trying to scan 
exactly the same Parquet file with some other systems (e.g. Hive or 
Impala)? If other systems work, then there must be something wrong 
with Spark SQL.


Cheng

On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood sadhan.s...@gmail.com 
mailto:sadhan.s...@gmail.com wrote:


Hi Cheng,

Thanks for your response. Here is the stack trace from yarn logs:

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:418)
 at java.util.ArrayList.get(ArrayList.java:431)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
 at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
 at 
parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at 
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
 at 
parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
 at 
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
 at 
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
 ... 26 more


On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian lian.cs@gmail.com
mailto:lian.cs@gmail.com wrote:

Hi Sadhan,

Could you please provide the stack trace of the
|ArrayIndexOutOfBoundsException| (if any)? The reason why the
first query succeeds is that Spark SQL doesn’t bother reading
all data from the table to give |COUNT(*)|. In the second
case, however, the whole table is asked to be cached lazily
via the |cacheTable| call, thus it’s scanned to build the
in-memory columnar cache. Then thing went wrong while scanning
this LZO compressed Parquet file. But unfortunately the stack
trace at hand doesn’t indicate the root cause.

Cheng

On 11/15/14 5:28 AM, Sadhan Sood wrote:


While testing SparkSQL on a bunch of parquet files (basically
used to be a partition for one of our hive tables), I
encountered this error:

import org.apache.spark.sql.SchemaRDD
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val parquetFileRDD = sqlContext.parquetFile(parquetFile)
parquetFileRDD.registerTempTable(xyz_20141109)
sqlContext.sql(SELECT count(*)  FROM
xyz_20141109).collect() -- works fine
sqlContext.cacheTable(xyz_20141109)
sqlContext.sql(SELECT count(*)  FROM
xyz_20141109).collect() -- fails with an exception

parquet.io.ParquetDecodingException: Can not read value at 0
in block -1 in file

hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet

at

parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)

at

parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)

at
org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)

at

org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

at
scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)

at
scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)

at

org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)

at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)

at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)

at
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)

at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

at

Re: Help with Spark Streaming

2014-11-16 Thread Bahubali Jain
Hi,
Can anybody help me on this please, haven't been  able to find the problem
:(

Thanks.
On Nov 15, 2014 4:48 PM, Bahubali Jain bahub...@gmail.com wrote:

 Hi,
 Trying to use spark streaming, but I am struggling with word count :(
 I want consolidate output of the word count (not on a per window basis),
 so I am using updateStateByKey(), but for some reason this is not working.
 The function it self is not being invoked(do not see the sysout output on
 console).


 public final class WordCount {
   private static final Pattern SPACE = Pattern.compile( );

   public static void main(String[] args) {
 if (args.length  2) {
   System.err.println(Usage: JavaNetworkWordCount hostname
 port);
   System.exit(1);
 }

  // Create the context with a 1 second batch size
 SparkConf sparkConf = new
 SparkConf().setAppName(JavaNetworkWordCount);
 JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
 new Duration(1000));
 ssc.checkpoint(/tmp/worcount);
 // Create a JavaReceiverInputDStream on target ip:port and count
 the
 // words in input stream of \n delimited text (eg. generated by
 'nc')
 // Note that no duplication in storage level only for running
 locally.
 // Replication necessary in distributed scenario for fault
 tolerance.
 JavaReceiverInputDStreamString lines = ssc.socketTextStream(
 args[0], Integer.parseInt(args[1]),
 StorageLevels.MEMORY_AND_DISK_SER);
 JavaDStreamString words = lines.flatMap(new
 FlatMapFunctionString, String() {
   @Override
   public IterableString call(String x) {
 return Lists.newArrayList(SPACE.split(x));
   }
 });

 JavaPairDStreamString, Integer wordCounts = words.mapToPair(
   new PairFunctionString, String, Integer() {
 @Override
 public Tuple2String, Integer call(String s) {
 System.err.println(Got +s);
   return new Tuple2String, Integer(s, 1);
 }
   }).reduceByKey(new Function2Integer, Integer, Integer() {
 @Override
 public Integer call(Integer i1, Integer i2) {
   return i1 + i2;
 }
   });

 wordCounts.print();

 *wordCounts.updateStateByKey(new updateFunction());*
  ssc.start();
 ssc.awaitTermination();
   }
 }

 class updateFunction implements Function2ListInteger,
 OptionalInteger, OptionalInteger
 {

   @Override public OptionalInteger call(ListInteger values,
 OptionalInteger state) {

  Integer x = new Integer(0);
  for (Integer i:values)
  x = x+i;
 Integer newSum = state.or(0)+x;  // add the new values with the
 previous running count to get the new count
 System.out.println(Newsum is +newSum);
 return Optional.of(newSum);

   };

 }



Re: Help with Spark Streaming

2014-11-16 Thread ZhangYi
I guess, maybe you don’t need invoke reduceByKey() after mapToPair, because 
updateStateByKey had covered it. For your reference, here is a sample written 
by scala using text file stream instead of socket as below:

object LocalStatefulWordCount extends App {
  val sparkConf = new SparkConf().setAppName(HdfsWordCount)
  val ssc = new StreamingContext(sparkConf, Seconds(2))

  //must set checkpoint for updateStateByKey
  //note: checkpoint derectory can not be source directory
  ssc.checkpoint(./checkpoint)

  val updateFunc = (values: Seq[Int], state: Option[Int]) = {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
  }

  val lines = ssc.textFileStream(/Users/twer/workspace/scala101/data)   
//local directory
  val wordDstream = lines.flatMap(_.split( )).map(x = (x, 1))
  val statefulWordCount = wordDstream.updateStateByKey[Int](updateFunc)
  statefulWordCount.print()

  ssc.start()
  ssc.awaitTermination()
}




Zhang Yi / 张逸
Lead Consultant

Email
yizh...@thoughtworks.com (mailto:yizh...@thoughtworks.com)

Telephone
+86 15023157626 (mailto:+86 15023157626)






Sent with Sparrow (http://www.sparrowmailapp.com/?sig)


On Sunday, November 16, 2014 at 6:19 PM, Bahubali Jain wrote:

 Hi,  
 Can anybody help me on this please, haven't been  able to find the problem :( 
  
 Thanks.  
 On Nov 15, 2014 4:48 PM, Bahubali Jain bahub...@gmail.com 
 (mailto:bahub...@gmail.com) wrote:
  Hi,
  Trying to use spark streaming, but I am struggling with word count :(
  I want consolidate output of the word count (not on a per window basis), so 
  I am using updateStateByKey(), but for some reason this is not working.
  The function it self is not being invoked(do not see the sysout output on 
  console).
   
   
  public final class WordCount {
private static final Pattern SPACE = Pattern.compile( );
   
public static void main(String[] args) {
  if (args.length  2) {
System.err.println(Usage: JavaNetworkWordCount hostname 
  port);
System.exit(1);
  }
   
   // Create the context with a 1 second batch size
  SparkConf sparkConf = new 
  SparkConf().setAppName(JavaNetworkWordCount);
  JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,  new 
  Duration(1000));
  ssc.checkpoint(/tmp/worcount);
  // Create a JavaReceiverInputDStream on target ip:port and count the
  // words in input stream of \n delimited text (eg. generated by 
  'nc')
  // Note that no duplication in storage level only for running 
  locally.
  // Replication necessary in distributed scenario for fault 
  tolerance.
  JavaReceiverInputDStreamString lines = ssc.socketTextStream(
  args[0], Integer.parseInt(args[1]), 
  StorageLevels.MEMORY_AND_DISK_SER);
  JavaDStreamString words = lines.flatMap(new 
  FlatMapFunctionString, String() {
@Override
public IterableString call(String x) {
  return Lists.newArrayList(SPACE.split(x));
}
  });
   
  JavaPairDStreamString, Integer wordCounts = words.mapToPair(
new PairFunctionString, String, Integer() {
  @Override
  public Tuple2String, Integer call(String s) {
  System.err.println(Got +s);
return new Tuple2String, Integer(s, 1);
  }
}).reduceByKey(new Function2Integer, Integer, Integer() {
  @Override
  public Integer call(Integer i1, Integer i2) {
return i1 + i2;
  }
});
   
  wordCounts.print();
  wordCounts.updateStateByKey(new updateFunction());
  ssc.start();
  ssc.awaitTermination();
}
  }
   
  class updateFunction implements Function2ListInteger, OptionalInteger, 
  OptionalInteger  
  {
   
@Override public OptionalInteger call(ListInteger values, 
  OptionalInteger state) {
 
   Integer x = new Integer(0);
   for (Integer i:values)
   x = x+i;
  Integer newSum = state.or(0)+x;  // add the new values with the 
  previous running count to get the new count
  System.out.println(Newsum is +newSum);
  return Optional.of(newSum);
 
};
   
  }  



Re: Using data in RDD to specify HDFS directory to write to

2014-11-16 Thread Akhil Das
Can you check in the worker logs what exactly is happening!??

Thanks
Best Regards

On Sun, Nov 16, 2014 at 2:54 AM, jschindler john.schind...@utexas.edu
wrote:

 UPDATE

 I have removed  and added things systematically to the job and have figured
 that the inclusion of the construction of the SparkContext object is what
 is
 causing it to fail.

 The last run contained the code below.

 I keep losing executors apparently and I'm not sure why.  Some of the
 relevant spark output is below, will add more on Monday as I must go
 participate in wknd activities.

  14/11/15 14:53:43 INFO SparkDeploySchedulerBackend: Granted executor ID
 app-20141115145328-0025/3 on hostPort cloudera01.local.company.com:7078
 with
 8 cores, 512.0 MB RAM
 14/11/15 14:53:43 INFO AppClient$ClientActor: Executor updated:
 app-20141115145328-0025/3 is now RUNNING
 14/11/15 14:53:46 INFO MemoryStore: ensureFreeSpace(1063) called with
 curMem=1063, maxMem=309225062
 14/11/15 14:53:46 INFO MemoryStore: Block input-0-1416084826000 stored as
 bytes to memory (size 1063.0 B, free 294.9 MB)
 14/11/15 14:53:46 INFO BlockManagerInfo: Added input-0-1416084826000 in
 memory on cloudera01.local.company.com:49902 (size: 1063.0 B, free: 294.9
 MB)
 14/11/15 14:53:46 INFO BlockManagerMaster: Updated info of block
 input-0-1416084826000
 14/11/15 14:53:46 WARN BlockManager: Block input-0-1416084826000 already
 exists on this machine; not re-adding it
 14/11/15 14:53:46 INFO BlockGenerator: Pushed block input-0-1416084826000
 14/11/15 14:53:46 INFO SparkDeploySchedulerBackend: Registered executor:
 Actor[akka.tcp://
 sparkexecu...@cloudera01.local.company.com:52715/user/Executor#-1518587721
 ]
 with ID 3
 14/11/15 14:53:47 INFO BlockManagerInfo: Registering block manager
 cloudera01.local.company.com:46926 with 294.9 MB RAM
 14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor 3
 disconnected,
 so removing it
 14/11/15 14:53:47 ERROR TaskSchedulerImpl: Lost an executor 3 (already
 removed): remote Akka client disassociated
 14/11/15 14:53:47 INFO AppClient$ClientActor: Executor updated:
 app-20141115145328-0025/3 is now EXITED (Command exited with code 1)
 14/11/15 14:53:47 INFO SparkDeploySchedulerBackend: Executor
 app-20141115145328-0025/3 removed: Command exited with code 1
 14/11/15 14:53:47 INFO AppClient$ClientActor: Executor added:
 app-20141115145328-0025/4 on
 worker-20141114114152-cloudera01.local.company.com-7078
 (cloudera01.local.company.com:7078) with 8 cores

 BLOCK 2 - last block before app fails:

 14/11/15 14:54:15 INFO BlockManagerInfo: Registering block manager
 cloudera01.local.uship.com:34335 with 294.9 MB RAM
 14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor 9
 disconnected,
 so removing it
 14/11/15 14:54:16 ERROR TaskSchedulerImpl: Lost an executor 9 (already
 removed): remote Akka client disassociated
 14/11/15 14:54:16 INFO AppClient$ClientActor: Executor updated:
 app-20141115145328-0025/9 is now EXITED (Command exited with code 1)
 14/11/15 14:54:16 INFO SparkDeploySchedulerBackend: Executor
 app-20141115145328-0025/9 removed: Command exited with code 1
 14/11/15 14:54:16 ERROR SparkDeploySchedulerBackend: Application has been
 killed. Reason: Master removed our application: FAILED
 14/11/15 14:54:16 ERROR TaskSchedulerImpl: Exiting due to error from
 cluster
 scheduler: Master removed our application: FAILED
 [hdfs@cloudera01 root]$



 import kafka.producer._
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.StreamingContext._
 import org.apache.spark.streaming.kafka._
 import org.apache.spark.SparkConf
 import org.apache.spark._

 import org.json4s._
 import org.json4s.native.JsonMethods._

 import scala.collection.mutable.Map
 import scala.collection.mutable.MutableList

 case class Event(EventName: String, Payload: org.json4s.JValue)

 object App {

   def main(args: Array[String]) {

 val ssc = new StreamingContext(local[2], Data, Seconds(20))
 ssc.checkpoint(checkpoint)


   val conf = new
 SparkConf().setMaster(spark://cloudera01.local.company.com:7077)
   val sc = new SparkContext(conf)



 val eventMap = scala.collection.immutable.Map(Events - 1)
 val pipe = KafkaUtils.createStream(ssc,
 dockerrepo,dockerrepo,dockerrepo, Cons1, eventMap).map(_._2)


 val eventStream = pipe.map(data = {
   parse(data)
 }).map(json = {


   implicit val formats = DefaultFormats
   val eventName = (json \ event).extractOpt[String]
   Event(eventName.getOrElse(*** NO EVENT NAME ***), json)

 })


 eventStream.foreach(x = {
   var arr = x.toArray
   x.foreachPartition(y = {
 y.foreach(z = {print(z)})

   })
 })


 ssc.start()
 ssc.awaitTermination()

   }

 }





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Using-data-in-RDD-to-specify-HDFS-directory-to-write-to-tp18789p19012.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 

Re: Cancelled Key Exceptions on Massive Join

2014-11-16 Thread Akhil Das
This usually happens when one of the worker is stuck on GC Pause and it
times out. Enable the following configurations while creating sparkContext:

 sc.set(spark.rdd.compress,true)

  sc.set(spark.storage.memoryFraction,1)
  sc.set(spark.core.connection.ack.wait.timeout,6000)
  sc.set(spark.akka.frameSize,100)



Thanks
Best Regards

On Sat, Nov 15, 2014 at 12:46 AM, Ganelin, Ilya ilya.gane...@capitalone.com
 wrote:

 Hello all. I have been running a Spark Job that eventually needs to do a
 large join.

 24 million x 150 million

 A broadcast join is infeasible in this instance clearly, so I am instead
 attempting to do it with Hash Partitioning by defining a custom partitioner
 as:


 class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {

   override def getPartition(key: Any): Int = key match {
 case k: Tuple2[Int, String] = super.getPartition(k._1)
 case _ = super.getPartition(key)
   }

 }

 I then partition both arrays using this partitioner. However, the job 
 eventually fails with the following exception which if I had to guess 
 indicated that a network connection was interrupted during the shuffle stage, 
 causing things to get lost and ultimately resulting in a  fetch failure:

 14/11/14 12:56:21 INFO ConnectionManager: Removing ReceivingConnection to 
 ConnectionManagerId(innovationdatanode08.cof.ds.capitalone.com,37590)
 14/11/14 12:56:21 INFO ConnectionManager: Key not valid ? 
 sun.nio.ch.SelectionKeyImpl@7369b398
 14/11/14 12:56:21 INFO ConnectionManager: key already cancelled ? 
 sun.nio.ch.SelectionKeyImpl@7369b398
 java.nio.channels.CancelledKeyException
   at 
 org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
   at 
 org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)


 In the spark UI, I still see a substantial amount of shuffling going on at 
 this stage, I am wondering if I’m perhaps using the partitioner incorrectly?


 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed.  If the reader of this message is not the
 intended recipient, you are hereby notified that any review,
 retransmission, dissemination, distribution, copying or other use of, or
 taking of any action in reliance upon this information is strictly
 prohibited. If you have received this communication in error, please
 contact the sender and delete the material from your computer.



Re: User Authn and Authz in Spark missing ?

2014-11-16 Thread Akhil Das
Have a look at this doc http://spark.apache.org/docs/latest/security.html

You can configure your network to only accept connections from the trusted
CIDR. If you are using Cloud services like Ec2/Azure/GCE etc, then it is
straight forward from their web portal. If you are having a bunch of custom
vps's then you might want to configure the iptable entries.

Thanks
Best Regards

On Fri, Nov 14, 2014 at 9:20 PM, Zeeshan Ali Shah zas...@pdc.kth.se wrote:

 Hi,  I am facing an issue as a Cloud Sysadmin , when Spark master launched
 on public IPs any one who knows the URL of spark can submit the jobs to it
 .

 Any way/hack to have a Authn and Authz in spark . i tried to look into it
 but could not find ..

 any hint ?

 --

 Regards

 Zeeshan Ali Shah
 System Administrator - PDC HPC
 PhD researcher (IT security)
 Kungliga Tekniska Hogskolan
 +46 8 790 9115
 http://www.pdc.kth.se/members/zashah



Re: Status of MLLib exporting models to PMML

2014-11-16 Thread Charles Earl
Manish and others,
A follow up question on my mind is whether there are protobuf (or other
binary format) frameworks in the vein of PMML. Perhaps scientific data
storage frameworks like netcdf, root are possible also.
I like the comprehensiveness of PMML but as you mention the complexity of
management for large models is a concern.
Cheers

On Fri, Nov 14, 2014 at 1:35 AM, Manish Amde manish...@gmail.com wrote:

 @Aris, we are closely following the PMML work that is going on and as
 Xiangrui mentioned, it might be easier to migrate models such as logistic
 regression and then migrate trees. Some of the models get fairly large (as
 pointed out by Sung Chung) with deep trees as building blocks and we might
 have to consider a distributed storage and prediction strategy.


 On Tuesday, November 11, 2014, Xiangrui Meng men...@gmail.com wrote:

 Vincenzo sent a PR and included k-means as an example. Sean is helping
 review it. PMML standard is quite large. So we may start with simple
 model export, like linear methods, then move forward to tree-based.
 -Xiangrui

 On Mon, Nov 10, 2014 at 11:27 AM, Aris arisofala...@gmail.com wrote:
  Hello Spark and MLLib folks,
 
  So a common problem in the real world of using machine learning is that
 some
  data analysis use tools like R, but the more data engineers out there
 will
  use more advanced systems like Spark MLLib or even Python Scikit Learn.
 
  In the real world, I want to have a system where multiple different
  modeling environments can learn from data / build models, represent the
  models in a common language, and then have a layer which just takes the
  model and run model.predict() all day long -- scores the models in other
  words.
 
  It looks like the project openscoring.io and jpmml-evaluator are some
  amazing systems for this, but they fundamentally use PMML as the model
  representation here.
 
  I have read some JIRA tickets that Xiangrui Meng is interested in
 getting
  PMML implemented to export MLLib models, is that happening? Further,
 would
  something like Manish Amde's boosted ensemble tree methods be
 representable
  in PMML?
 
  Thank you!!
  Aris

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
- Charles


RE: filtering a SchemaRDD

2014-11-16 Thread Daniel, Ronald (ELS-SDG)
Indeed it did. Thanks!

Ron


From: Michael Armbrust [mailto:mich...@databricks.com]
Sent: Friday, November 14, 2014 9:53 PM
To: Daniel, Ronald (ELS-SDG)
Cc: user@spark.apache.org
Subject: Re: filtering a SchemaRDD


If I use row[6] instead of row[text] I get what I am looking for. However, 
finding the right numeric index could be a pain.

Can I access the fields in a Row of a SchemaRDD by name, so that I can map, 
filter, etc. without a trial and error process of finding the right int for the 
fieldname?

row.text should work.

More examples here: 
http://spark.apache.org/docs/1.1.0/sql-programming-guide.html#tab_python_2

Michael


Returning breeze.linalg.DenseMatrix from method

2014-11-16 Thread Ritesh Kumar Singh
Hi,

I have a method that returns DenseMatrix:
def func(str: String): DenseMatrix = {
...
...
}

But I keep getting this error:
*class DenseMatrix takes type parameters*

I tried this too:
def func(str: String): DenseMatrix(Int, Int, Array[Double]) = {
...
...
}
But this gives me this error:
*'=' expected but '(' found*

Any possible fixes?


Re: Communication between Driver and Executors

2014-11-16 Thread Tobias Pfeiffer
Hi,

On Fri, Nov 14, 2014 at 3:20 PM, Mayur Rustagi mayur.rust...@gmail.com
wrote:

 I wonder if SparkConf is dynamically updated on all worker nodes or only
 during initialization. It can be used to piggyback information.
 Otherwise I guess you are stuck with Broadcast.
 Primarily I have had these issues moving legacy MR operators to Spark
 where MR piggybacks on Hadoop conf pretty  heavily, in spark Native
 application its rarely required. Do you have a usecase like that?


My usecase is
http://apache-spark-user-list.1001560.n3.nabble.com/StreamingContext-does-not-stop-td18826.html
– that is, notifying my Spark executors that the StreamingContext has been
shut down. (Even with non-graceful shutdown, Spark doesn't seem to end the
actual execution, just all the Spark-internal timers etc.) I need to do
this properly or processing will go on for a very long time.

I have been trying to mis-use broadcast as in
- create a class with a boolean var, set to true
- query this boolean on the executors as a prerequisite to process the next
item
- when I want to shutdown, I set the boolean to false and unpersist the
broadcast variable (which will trigger re-delivery).
This is very dirty, but it works with a local[*] master. Unfortunately,
when deployed on YARN, the new value will never arrive at my executors.

Any idea what could go wrong on YARN with this approach – or what is a
good way to do this?

Thanks
Tobias


Interoperability between ScalaRDD, JavaRDD and PythonRDD

2014-11-16 Thread Nam Nguyen
Hello,

Is it possible to reuse RDD implementations written in Scala/Java with PySpark?

Thanks,
Nam

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



re: How to incrementally compile spark examples using mvn

2014-11-16 Thread Yiming (John) Zhang
Thank you Marcelo. I tried your suggestion (# mvn -pl :spark-examples_2.10 
compile), but it required to download many spark components (as listed below), 
which I have already compiled on my server.

Downloading: 
https://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.1.0/spark-core_2.10-1.1.0.pom
...
Downloading: 
https://repo1.maven.org/maven2/org/apache/spark/spark-streaming_2.10/1.1.0/spark-streaming_2.10-1.1.0.pom
...
Downloading: 
https://repository.jboss.org/nexus/content/repositories/releases/org/apache/spark/spark-hive_2.10/1.1.0/spark-hive_2.10-1.1.0.pom
...

This problem didn't happen when I compiled the whole project using ``mvn 
-DskipTests package''. I guess some configurations have to be made to tell mvn 
the dependencies are local. Any idea for that?

Thank you for your help!

Cheers,
Yiming

-邮件原件-
发件人: Marcelo Vanzin [mailto:van...@cloudera.com] 
发送时间: 2014年11月16日 10:26
收件人: sdi...@gmail.com
抄送: user@spark.apache.org
主题: Re: How to incrementally compile spark examples using mvn

I haven't tried scala:cc, but you can ask maven to just build a particular 
sub-project. For example:

  mvn -pl :spark-examples_2.10 compile

On Sat, Nov 15, 2014 at 5:31 PM, Yiming (John) Zhang sdi...@gmail.com wrote:
 Hi,



 I have already successfully compile and run spark examples. My problem 
 is that if I make some modifications (e.g., on SparkPi.scala or 
 LogQuery.scala) I have to use “mvn -DskipTests package” to rebuild the 
 whole spark project and wait a relatively long time.



 I also tried “mvn scala:cc” as described in 
 http://spark.apache.org/docs/latest/building-with-maven.html, but I 
 could only get infinite stop like:

 [INFO] --- scala-maven-plugin:3.2.0:cc (default-cli) @ spark-parent 
 ---

 [INFO] wait for files to compile...



 Is there any method to incrementally compile the examples using mvn? 
 Thank you!



 Cheers,

 Yiming



--
Marcelo


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL exception on cached parquet table

2014-11-16 Thread Sadhan Sood
Hi Cheng,

I tried reading the parquet file(on which we were getting the exception)
through parquet-tools and it is able to dump the file and I can read the
metadata, etc. I also loaded the file through hive table and can run a
table scan query on it as well. Let me know if I can do more to help
resolve the problem, I'll run it through a debugger and see if I can get
more information on it in the meantime.

Thanks,
Sadhan

On Sun, Nov 16, 2014 at 4:35 AM, Cheng Lian lian.cs@gmail.com wrote:

  (Forgot to cc user mail list)


 On 11/16/14 4:59 PM, Cheng Lian wrote:

 Hey Sadhan,

  Thanks for the additional information, this is helpful. Seems that some
 Parquet internal contract was broken, but I'm not sure whether it's caused
 by Spark SQL or Parquet, or even maybe the Parquet file itself was damaged
 somehow. I'm investigating this. In the meanwhile, would you mind to help
 to narrow down the problem by trying to scan exactly the same Parquet file
 with some other systems (e.g. Hive or Impala)? If other systems work, then
 there must be something wrong with Spark SQL.

  Cheng

 On Sun, Nov 16, 2014 at 1:19 PM, Sadhan Sood sadhan.s...@gmail.com
 wrote:

 Hi Cheng,

  Thanks for your response. Here is the stack trace from yarn logs:

  Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
 at java.util.ArrayList.elementData(ArrayList.java:418)
 at java.util.ArrayList.get(ArrayList.java:431)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.GroupColumnIO.getLast(GroupColumnIO.java:95)
 at parquet.io.PrimitiveColumnIO.getLast(PrimitiveColumnIO.java:80)
 at parquet.io.PrimitiveColumnIO.isLast(PrimitiveColumnIO.java:74)
 at 
 parquet.io.RecordReaderImplementation.init(RecordReaderImplementation.java:282)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:131)
 at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:96)
 at 
 parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:136)
 at 
 parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:96)
 at 
 parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:126)
 at 
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:193)
 ... 26 more


 On Sat, Nov 15, 2014 at 9:28 AM, Cheng Lian lian.cs@gmail.com
 wrote:

  Hi Sadhan,

 Could you please provide the stack trace of the
 ArrayIndexOutOfBoundsException (if any)? The reason why the first query
 succeeds is that Spark SQL doesn’t bother reading all data from the table
 to give COUNT(*). In the second case, however, the whole table is asked
 to be cached lazily via the cacheTable call, thus it’s scanned to build
 the in-memory columnar cache. Then thing went wrong while scanning this LZO
 compressed Parquet file. But unfortunately the stack trace at hand doesn’t
 indicate the root cause.

 Cheng

 On 11/15/14 5:28 AM, Sadhan Sood wrote:

 While testing SparkSQL on a bunch of parquet files (basically used to be
 a partition for one of our hive tables), I encountered this error:

  import org.apache.spark.sql.SchemaRDD
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;

  val sqlContext = new org.apache.spark.sql.SQLContext(sc)

  val parquetFileRDD = sqlContext.parquetFile(parquetFile)
 parquetFileRDD.registerTempTable(xyz_20141109)
 sqlContext.sql(SELECT count(*)  FROM xyz_20141109).collect() -- works
 fine
 sqlContext.cacheTable(xyz_20141109)
 sqlContext.sql(SELECT count(*)  FROM xyz_20141109).collect() -- fails
 with an exception

   parquet.io.ParquetDecodingException: Can not read value at 0 in block
 -1 in file
 hdfs://::9000/event_logs/xyz/20141109/part-9359b87ae-a949-3ded-ac3e-3a6bda3a4f3a-r-9.lzo.parquet

 at
 parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:213)

 at
 parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:204)

 at
 org.apache.spark.rdd.NewHadoopRDD$anon$1.hasNext(NewHadoopRDD.scala:145)

 at
 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

 at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:327)

 at scala.collection.Iterator$anon$14.hasNext(Iterator.scala:388)

 at
 org.apache.spark.sql.columnar.InMemoryRelation$anonfun$3$anon$1.hasNext(InMemoryColumnarTableScan.scala:136)

 at
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248)

 at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)

 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)

 at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)

 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

 at
 

Iterative changes to RDD and broadcast variables

2014-11-16 Thread Shannon Quinn

Hi all,

I'm iterating over an RDD (representing a distributed matrix...have to 
roll my own in Python) and making changes to different submatrices at 
each iteration. The loop structure looks something like:


for i in range(x):
  VAR = sc.broadcast(i)
  rdd.map(func1).reduceByKey(func2)
M = rdd.collect()

where func1 and func2 use the current value of VAR for that iteration.

Because there aren't any actions in the main loop, nothing actually 
happens until the collect method is called. I'm running into problems 
I can't diagnose (*extremely* long execution time for no particular 
reason, among others); is this code even valid? If not, how should make 
in-place iterative edits to different portions of a matrix, where each 
subsequent edit is dependent on the edits from the previous iteration?


Thanks in advance!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Communication between Driver and Executors

2014-11-16 Thread Tobias Pfeiffer
Hi again,

On Mon, Nov 17, 2014 at 8:16 AM, Tobias Pfeiffer t...@preferred.jp wrote:

 I have been trying to mis-use broadcast as in
 - create a class with a boolean var, set to true
 - query this boolean on the executors as a prerequisite to process the
 next item
 - when I want to shutdown, I set the boolean to false and unpersist the
 broadcast variable (which will trigger re-delivery).
 This is very dirty, but it works with a local[*] master. Unfortunately,
 when deployed on YARN, the new value will never arrive at my executors.


In fact, it seems as if change mutable object (like mutable list) and
unpersist in order to trigger redeploy only works locally. When running on
YARN, even after an unpersist, the value will always be identical to what I
shipped first. Now I wonder what unpersist actually does in that case. Must
I call unpersist from an executor or from the driver?

Thanks
Tobias


RDD.aggregate versus accumulables...

2014-11-16 Thread Segerlind, Nathan L
Hi All.

I am trying to get my head around why using accumulators and accumulables seems 
to be the most recommended method for accumulating running sums, averages, 
variances and the like, whereas the aggregate method seems to me to be the 
right one. I have no performance measurements as of yet, but it seems that 
aggregate is simpler and more intuitive (And it does what one might expect an 
accumulator to do) whereas the accumulators and accumulables seem to have some 
extra complications and overhead.

So...

What's the real difference between an accumulator/accumulable and aggregating 
an RDD? When is one method of aggregation preferred over the other?

Thanks,
Nate


Load json format dataset as RDD

2014-11-16 Thread J
Hi,

I am new to spark. I met a problem when I intended to load one dataset.

I have a dataset where the data is in json format and I'd like to load it
as a RDD.

As one record may span multiple lines, so SparkContext.textFile() is not
doable. I also tried to use json4s to parse the json manually and then
merge them into RDD one by one, but this solution is not convenient and low
efficient.

It seems that there is JsonRDD in SparkSQL, but it seems that it is for
query only.

Could any one provide me some suggestion about how to load json format data
as RDD? For example, given the file path, load the dataset as RDD[JObject].

Thank you very much!

Regards,
J


Functions in Spark

2014-11-16 Thread Deep Pradhan
Hi,
Is there any way to know which of my functions perform better in Spark? In
other words, say I have achieved same thing using two different
implementations. How do I judge as to which implementation is better than
the other. Is processing time the only metric that we can use to claim the
goodness of one implementation to the other?
Can anyone please share some thoughts on this?

Thank You


Re: Functions in Spark

2014-11-16 Thread Samarth Mailinglist
Check this video out:
https://www.youtube.com/watch?v=dmL0N3qfSc8list=UURzsq7k4-kT-h3TDUBQ82-w

On Mon, Nov 17, 2014 at 9:43 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 Is there any way to know which of my functions perform better in Spark? In
 other words, say I have achieved same thing using two different
 implementations. How do I judge as to which implementation is better than
 the other. Is processing time the only metric that we can use to claim the
 goodness of one implementation to the other?
 Can anyone please share some thoughts on this?

 Thank You



Re: Load json format dataset as RDD

2014-11-16 Thread Cheng Lian
|SQLContext.jsonFile| assumes one JSON record per line. Although I 
haven’t tried yet, it seems that this |JsonInputFormat| [1] can be 
helpful. You may read your original data set with 
|SparkContext.hadoopFile| and |JsonInputFormat|, then transform the 
resulted |RDD[String]| into a |JsonRDD| via |SQLContext.jsonRDD|.


[1] 
http://pivotal-field-engineering.github.io/pmr-common/pmr/apidocs/com/gopivotal/mapreduce/lib/input/JsonInputFormat.html


On 11/17/14 11:34 AM, J wrote:


Hi,

I am new to spark. I met a problem when I intended to load one dataset.

I have a dataset where the data is in json format and I'd like to load 
it as a RDD.


As one record may span multiple lines, so SparkContext.textFile() is 
not doable. I also tried to use json4s to parse the json manually and 
then merge them into RDD one by one, but this solution is not 
convenient and low efficient.


It seems that there is JsonRDD in SparkSQL, but it seems that it is 
for query only.


Could any one provide me some suggestion about how to load json format 
data as RDD? For example, given the file path, load the dataset as 
RDD[JObject].


Thank you very much!

Regards,
J


​


Questions Regarding to MPI Program Migration to Spark

2014-11-16 Thread Jun Yang
Guys,

Recently we are migrating our backend pipeline from to Spark.

In our pipeline, we have a MPI-based HAC implementation, to ensure the
result consistency of migration, we also want to migrate this
MPI-implemented code to Spark.

However, during the migration process, I found that there are some possible
limitation with Spark.

In the original MPI implementation, the logic looks like the following:

Node 0( master node )

 Get the complete document data, store in g_doc_data
 Get the document sub-set for which this node needs to  calculate the
distance metrics, store in l_dist_metric_data
 while ( exit condition is not met ) {
Find the locally closed node pair, notated as l_closest_pair
Get the globally closed node pair from other nodes via MPI's
MPI_AllReduce, notated as g_closest_pair
Merge the globally closed node pair and update the document data
g_doc_data.
Re-calculate the distance metrics for those node pair which will be
impacted by the above merge operations, update l_dist_metric_data.
  }
Node 1/2/.../P ( slave nodes )
 Get the complete document data, store in g_doc_data
 Get the document sub-set for which this node needs to  calculate the
distance metrics, store in l_dist_metric_data
 while ( exit condition is not met ) {
Find the locally closed node pair, notated as l_closest_pair
Get the globally closed node pair from other nodes via MPI's
MPI_AllReduce, notated as g_closest_pair
Merge the globally closed node pair and update the document data
g_doc_data.
Re-calculate the distance metrics for those node pair which will be
impacted by the above merge operations, update l_dist_metric_data.
  }

The essential difficulty for migrating the above logic to Spark is:
In the original implementation, between each iteration, the computation
nodes need to hold the local state( which is g_doc_data and
l_dist_metric_data ).
And in Spark, it looks that there isn't any effective ways for keeping
intermediate local state between iterations. Usually in Spark, we use
either broadcast variable or closure to pass state to the operations of
each iterations.

Of course, after each iteration, we could summarize the change effects from
all the worker nodes via reduce and then broadcast this summarization
effect to them back again. But this operation will involve a significant
data transfer, when the data size is large ( e.g. 100 thousands documents
with 500 dimension feature vectors ),  and the performance penalty is
non-neglectable.

So my question is:
1. Is the difficulty I mentioned above is the limitations imposed by the
computation paradigm of Spark?
2. Is there any possible ways for implementing the bottom-up agglomeration
hierarchical clustering algorithms in Spark?

BTW, I know there are some top-down divisive hierarchical clustering
algorithm in the upcoming 1.2 release, I will also give them a try.

Thanks.
-- 
yangjun...@gmail.com
http://hi.baidu.com/yjpro


Re: Functions in Spark

2014-11-16 Thread Mukesh Jha
Thanks I did go through the video it was very informative, but I think I's
looking for the Transformations section @ page
https://spark.apache.org/docs/0.9.1/scala-programming-guide.html.


On Mon, Nov 17, 2014 at 10:31 AM, Samarth Mailinglist 
mailinglistsama...@gmail.com wrote:

 Check this video out:
 https://www.youtube.com/watch?v=dmL0N3qfSc8list=UURzsq7k4-kT-h3TDUBQ82-w

 On Mon, Nov 17, 2014 at 9:43 AM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 Hi,
 Is there any way to know which of my functions perform better in Spark?
 In other words, say I have achieved same thing using two different
 implementations. How do I judge as to which implementation is better than
 the other. Is processing time the only metric that we can use to claim the
 goodness of one implementation to the other?
 Can anyone please share some thoughts on this?

 Thank You





-- 


Thanks  Regards,

*Mukesh Jha me.mukesh@gmail.com*


Re: spark-submit question

2014-11-16 Thread Sean Owen
You are changing these paths and filenames to match your own actual scripts
and file locations right?
On Nov 17, 2014 4:59 AM, Samarth Mailinglist mailinglistsama...@gmail.com
wrote:

 I am trying to run a job written in python with the following command:

 bin/spark-submit --master spark://localhost:7077 
 /path/spark_solution_basic.py --py-files /path/*.py --files 
 /path/config.properties

 I always get an exception that config.properties is not found:

 INFO - IOError: [Errno 2] No such file or directory: 'config.properties'

 Why isn't this working?
 ​