Setting Optimal Number of Spark Executor Instances

2017-03-14 Thread kpeng1
Hi All,

I am currently on Spark 1.6 and I was doing a sql join on two tables that
are over 100 million rows each and I noticed that it was spawn 3+ tasks
(this is the progress meter that we are seeing show up).  We tried to
coalesece, repartition and shuffle partitions to drop the number of tasks
down because we were getting time outs due to the number of task being
spawned, but those operations did not seem to reduce the number of tasks. 
The solution we came up with was actually to set the num executors to 50
(--num-executors=50) and it looks like it spawned 200 active tasks, but the
total number of tasks remained the same.  Was wondering if anyone knows what
is going on?  Is there an optimal number of executors, I was under the
impression that the default dynamic allocation would pick the optimal number
of executors for us and that this situation wouldn't happen.  Is there
something I am missing?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Setting-Optimal-Number-of-Spark-Executor-Instances-tp28493.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Weird results with Spark SQL Outer joins

2016-05-02 Thread kpeng1
Also, the results of the inner query produced the same results:
sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account AS
d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s INNER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
d.ad) WHERE s.date >= '2016-01-03'AND d.date >= '2016-01-03'").count() 
RESULT:23747 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861p26863.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Weird results with Spark SQL Outer joins

2016-05-02 Thread kpeng1
Hi All,

I am running into a weird result with Spark SQL Outer joins.  The results
for all of them seem to be the same, which does not make sense due to the
data.  Here are the queries that I am running with the results:

sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account AS
d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s FULL OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
d.ad) WHERE s.date >= '2016-01-03'AND d.date >= '2016-01-03'").count()
RESULT:23747


sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account AS
d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s LEFT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
d.ad) WHERE s.date >= '2016-01-03'AND d.date >= '2016-01-03'").count()
RESULT:23747

sqlContext.sql("SELECT s.date AS edate  , s.account AS s_acc  , d.account AS
d_acc  , s.ad as s_ad  , d.ad as d_ad , s.spend AS s_spend  ,
d.spend_in_dollar AS d_spend FROM swig_pin_promo_lt s RIGHT OUTER JOIN
dps_pin_promo_lt d  ON (s.date = d.date AND s.account = d.account AND s.ad =
d.ad) WHERE s.date >= '2016-01-03'AND d.date >= '2016-01-03'").count()
RESULT: 23747

Was wondering if someone had encountered this issues before.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Weird-results-with-Spark-SQL-Outer-joins-tp26861.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



println not appearing in libraries when running job using spark-submit --master local

2016-03-28 Thread kpeng1
Hi All,

I am currently trying to debug a spark application written in scala.  I have
a main method: 
  def main(args: Array[String]) {
...
 SocialUtil.triggerAndWait(triggerUrl)
...

The SocialUtil object is included in a seperate jar.  I launched the
spark-submit command using --jars passing the SocialUtil jar.  Inside the
triggerAndWait function I have a println statement that is the first thing
in the method, but it doesn't seem to be coming out.  All println that
happen inside the main function directly are appearing though.  I was
wondering if anyone knows what is going on in this situation and how I can
go about making the println in the SocialUtil object appear.

Thanks,

KP




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/println-not-appearing-in-libraries-when-running-job-using-spark-submit-master-local-tp26617.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Using regular rdd transforms on schemaRDD

2015-03-17 Thread kpeng1
Hi All,

I was wondering how rdd transformation work on schemaRDDs.  Is there a way
to force the rdd transform to keep the schemaRDD types or do I need to
recreate the schemaRDD by applying the applySchema method?

Currently what I have is an array of SchemaRDDs and I just want to do a
union across them i.e. I want the result to be one SchemaRDD with the union
of all the SchemaRDDs in the array.  This is what I currently have that is
not working:
scala z
res23: Array[org.apache.spark.sql.SchemaRDD]

scala z.reduce((a,b) = a.union(b))
I get the following error:
 found   : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
 required: org.apache.spark.sql.SchemaRDD
  z.reduce((a,b) = a.union(b))

I also noticed then when I do a simple join: z(0).join(z(1)) the result back
is not a schemaRDD, but a normal RDD:
scala z(0).union(z(1))
res22: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

Is there a simple way for me to convert back to schemaRDD or do I need to
HiveContext.applySchema(res22, myschema)?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-regular-rdd-transforms-on-schemaRDD-tp22105.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using regular rdd transforms on schemaRDD

2015-03-17 Thread kpeng1
Looks like if I use unionAll this works.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-regular-rdd-transforms-on-schemaRDD-tp22105p22107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Creating a hive table on top of a parquet file written out by spark

2015-03-16 Thread kpeng1
Hi All,

I wrote out a complex parquet file from spark sql and now I am trying to put
a hive table on top.  I am running into issues with creating the hive table
itself.  Here is the json that I wrote out to parquet using spark sql:
{user_id:4513,providers:[{id:4220,name:dbmvl,behaviors:{b1:gxybq,b2:ntfmx}},{id:4173,name:dvjke,behaviors:{b1:sizow,b2:knuuc}}]}
{user_id:3960,providers:[{id:1859,name:ponsv,behaviors:{b1:ahfgc,b2:txpea}},{id:103,name:uhqqo,behaviors:{b1:lktyo,b2:ituxy}}]}
{user_id:567,providers:[{id:9622,name:crjju,behaviors:{b1:rhaqc,b2:npnot}},{id:6965,name:fnheh,behaviors:{b1:eipse,b2:nvxqk}}]}

I basically created a hive context and read in the json file using jsonFile
and then I wrote it back out using saveAsParquetFile.

Afterwards I was trying to create a hive table on top of the parquet file. 
Here is the hive hql that I have:
create table test (mycol STRUCTuser_id:String,
providers:ARRAYlt;STRUCTlt;id:String, name:String,
behaviors:MAPlt;String, String) stored as parquet;
Alter table test set location 'hdfs:///tmp/test.parquet';

I get errors when I try to do a select * on the table:
Failed with exception java.io.IOException:java.lang.IllegalStateException:
Column mycol at index 0 does not exist in {providers=providers,
user_id=user_id}





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Creating-a-hive-table-on-top-of-a-parquet-file-written-out-by-spark-tp22084.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Loading in json with spark sql

2015-03-13 Thread kpeng1
Hi All,

I was noodling around with loading in a json file into spark sql's hive
context and I noticed that I get the following message after loading in the
json file:
PhysicalRDD [_corrupt_record#0], MappedRDD[5] at map at JsonRDD.scala:47

I am using the HiveContext to load in the json file using the jsonFile
command.  I also have 1 json object per line on the file.  Here is a sample
of the contents in the json file:
{user_id:7070,providers:{{id:8753,name:pjfig,behaviors:{b1:erwxt,b2:yjooj}},{id:8329,name:dfvhh,behaviors:{b1:pjjdn,b2:ooqsh
{user_id:1615,providers:{{id:6105,name:rsfon,behaviors:{b1:whlje,b2:lpjnq}},{id:6828,name:pnmrb,behaviors:{b1:fjpmz,b2:dxqxk
{user_id:5210,providers:{{id:9360,name:xdylm,behaviors:{b1:gcdze,b2:cndcs}},{id:4812,name:gxboh,behaviors:{b1:qsxao,b2:ixdzq




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Loading-in-json-with-spark-sql-tp22044.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark sql writing in avro

2015-03-12 Thread kpeng1
Hi All,

I am current trying to write out a scheme RDD to avro.  I noticed that there
is a databricks spark-avro library and I have included that in my
dependencies, but it looks like I am not able to access the AvroSaver
object.  On compilation of the job I get this:
error: not found: value AvroSaver
[ERROR] AvroSaver.save(resultRDD, args(4))

I also tried calling saveAsAvro on the resultRDD(the actual rdd with the
results) and that passes compilation, but when I run the code I get an error
that says the saveAsAvro is not implemented.  I am using version 0.1 of
spark-avro_2.10




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-sql-writing-in-avro-tp22021.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Writing wide parquet file in Spark SQL

2015-03-10 Thread kpeng1
Hi All,

I am currently trying to write a very wide file into parquet using spark
sql.  I have 100K column records that I am trying to write out, but of
course I am running into space issues(out of memory - heap space).  I was
wondering if there are any tweaks or work arounds for this.

I am basically calling saveAsParquetFile on the schemaRDD.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Writing-wide-parquet-file-in-Spark-SQL-tp21995.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Passing around SparkContext with in the Driver

2015-03-04 Thread kpeng1
Hi All,

I am trying to create a class that wraps functionalities that I need; some
of these functions require access to the SparkContext, which I would like to
pass in.  I know that the SparkContext is not seralizable, and I am not
planning on passing it to worker nodes or anything, I just want to wrap some
functionalities that require SparkContext's api.  As a preface, I am
basically using the spark shell to test the functionality of my code at the
moment, so I am not sure if that plays into any of the issues I am having. 
Here is my current class:

class MyClass(sparkContext: SparkContext) {
  import org.apache.spark.sql._
  import org.apache.spark.rdd._

  val sqlContext = new SQLContext(sparkContext)

  val DATA_TYPE_MAPPING = Map(
int - IntegerType,
double - DoubleType,
float - FloatType,
long - LongType,
short - ShortType,
binary - BinaryType,
bool - BooleanType,
byte - ByteType,
string - StringType)

  //removes the first line of a text file
  def removeHeader(partitionIdx: Int, fileItr: Iterator[String]):
Iterator[String] ={
//header line is first line in first partition
if(partitionIdx == 0){
  fileItr.drop(1)
}
fileItr
  }

  //returns back a StructType for the schema
  def getSchema(rawSchema: Array[String]): StructType ={
//return backs a StructField
def getSchemaFieldHelper(schemaField: String): StructField ={
  val schemaParts = schemaField.split(' ')
  StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true)
}

val structFields = rawSchema.map(column = getSchemaFieldHelper(column))
StructType(structFields)
  }

  def getRow(strRow: String): Row ={
val spRow = strRow.split(',')
val tRow = spRow.map(_.trim)
Row(tRow:_*)
  }
  def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile:
String): SchemaRDD ={
//apply schema to rdd to create schemaRDD
def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType):
SchemaRDD ={
  val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct)
  schemaRDD
}
  
val rawSchema = sparkContext.textFile(schemaFile).collect
val schema = getSchema(rawSchema)
  
val rawCsvData = sparkContext.textFile(csvFile)
  
//if we want to keep header from csv file
if(includeHeader){
  val rowRDD = rawCsvData.map(getRow) 
  val schemaRDD = createSchemaRDD(rowRDD, schema)
  return schemaRDD
}
 
val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader)
val rowRDD = csvData.map(getRow)
val schemaRDD = createSchemaRDD(rowRDD, schema)
schemaRDD
  }

}

So in the spark shell I am basically creating an instance of this class and
calling applySchemaToCsv like so:
val test = new MyClass(sc)
test.applySchemaToCsv(/tmp/myFile.csv, false, /tmp/schema.txt)

What I am getting is not serializable exception:
15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at
console:62
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615)
   .
   .
   .
Caused by: java.io.NotSerializableException:


If I remove the class wrapper and make references to sc directly everything
works.  I am basically wondering what is causing the serialization issues
and if I can wrap a class around these functions.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Issues with maven dependencies for version 1.2.0 but not version 1.1.0

2015-03-04 Thread kpeng1
Hi All,

I am currently having problem with the maven dependencies for version 1.2.0
of spark-core and spark-hive.  Here are my dependencies:
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.10/artifactId
  version1.2.0/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-hive_2.10/artifactId
  version1.2.0/version
/dependency

When the dependencies are set to version 1.1.0, I do not get any errors. 
Here are the errors I am getting from artifactory for version 1.2.0 of
spark-core:
error=Could not transfer artifact
org.apache.spark\:spark-core_2.10\:pom\:1.2.0 from/to repo
(https\://m2.mines.com\:8443/artifactory/repo)\: Failed to transfer file\:
https\://m2.mines.com\:8443/artifactory/repo/org/apache/spark/spark-core_2.10/1.2.0/spark-core_2.10-1.2.0.pom.
Return code is\: 409 , ReasonPhrase\:Conflict.

The error is the same for spark-hive.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issues-with-maven-dependencies-for-version-1-2-0-but-not-version-1-1-0-tp21919.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Issues reading in Json file with spark sql

2015-03-02 Thread kpeng1
Hi All,

I am currently having issues reading in a json file using spark sql's api. 
Here is what the json file looks like:
{
  namespace: spacey,
  name: namer,
  type: record,
  fields: [
{name:f1,type:[null,string]},
{name:f2,type:[null,string]},
{name:f3,type:[null,string]},
{name:f4,type:[null,string]},
{name:f5,type:[null,string]},
{name:f6,type:[null,string]},
{name:f7,type:[null,string]},
{name:f8,type:[null,string]},
{name:f9,type:[null,string]},
{name:f10,type:[null,string]},
{name:f11,type:[null,string]},
{name:f12,type:[null,string]},
{name:f13,type:[null,string]},
{name:f14,type:[null,string]},
{name:f15,type:[null,string]}
  ]
}

This is what I am doing to read in the json file(using spark sql in the
spark shell on CDH5.3):

val sqlsc = new org.apache.spark.sql.SQLContext(sc)
val j = sqlsc.jsonFile(/tmp/try.avsc)


This is what I am getting as an error:

15/03/02 11:23:45 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 12,
10.0.2.15): scala.MatchError: namespace (of class java.lang.String)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853)
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851)
at 
org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
at 
org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.1 in stage 3.0 (TID
14, 10.0.2.15, ANY, 1308 bytes)
15/03/02 11:23:45 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID
13) in 128 ms on 10.0.2.15 (1/2)
15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.1 in stage 3.0 (TID 14)
on executor 10.0.2.15: scala.MatchError (namespace (of class
java.lang.String)) [duplicate 1]
15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.2 in stage 3.0 (TID
15, 10.0.2.15, ANY, 1308 bytes)
15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.2 in stage 3.0 (TID 15)
on executor 10.0.2.15: scala.MatchError (namespace (of class
java.lang.String)) [duplicate 2]
15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.3 in stage 3.0 (TID
16, 10.0.2.15, ANY, 1308 bytes)
15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.3 in stage 3.0 (TID 16)
on executor 10.0.2.15: scala.MatchError (namespace (of class
java.lang.String)) [duplicate 3]
15/03/02 11:23:45 ERROR TaskSetManager: Task 0 in stage 3.0 failed 4 times;
aborting job
15/03/02 11:23:45 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks
have all completed, from pool 
15/03/02 11:23:45 INFO TaskSchedulerImpl: Cancelling stage 3
15/03/02 11:23:45 INFO DAGScheduler: Job 3 failed: reduce at
JsonRDD.scala:57, took 0.210707 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0
(TID 16, 10.0.2.15): scala.MatchError: namespace (of class java.lang.String)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305)
at
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172)
at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853)
at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851)
at 
org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350)
at 

Spark SQL Converting RDD to SchemaRDD without hardcoding a case class in scala

2015-02-27 Thread kpeng1
Hi All,

I am currently trying to build out a spark job that would basically convert
a csv file into parquet.  From what I have seen it looks like spark sql is
the way to go and how I would go about this would be to load in the csv file
into an RDD and convert it into a schemaRDD by injecting in the schema via a
case class.

What I want to avoid is hard coding in the case class itself.  I want to
reuse this job and pass in a file that contains the schema i.e. an avro avsc
file or something similar.  I was wondering if there was a way to do this,
since I couldn't figure out how to create a case class dynamically... if
there are ways around creating a case class I am definitely open to trying
it out as well.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Converting-RDD-to-SchemaRDD-without-hardcoding-a-case-class-in-scala-tp21851.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark streaming on Yarn

2014-11-17 Thread kpeng1
Hi,
 
I have been using spark streaming in standalone mode and now I want to
migrate to spark running on yarn, but I am not sure how you would you would
go about designating a specific node in the cluster to act as an avro
listener since I am using flume based push approach with spark.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-on-Yarn-tp19093.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Is it possible to call a transform + action inside an action?

2014-10-28 Thread kpeng1
I currently writing an application that uses spark streaming.  What I am
trying to do is basically read in a few files (I do this by using the spark
context textFile) and then process those files inside an action that I apply
to a streaming RDD.  Here is the main code below:

def main(args: Array[String]) {
  val sparkConf = new SparkConf().setAppName(EmailIngestion)
  val ssc = new StreamingContext(sparkConf, Seconds(1)) 
  val sc = new SparkContext(sparkConf)
  val badWords = sc.textFile(/filters/badwords.txt)
  val urlBlacklist = sc.textFile(/filters/source_url_blacklist.txt)
  val domainBlacklist = sc.textFile(/filters/domain_blacklist.txt)
  val emailBlacklist = sc.textFile(/filters/blacklist.txt)


  val lines = FlumeUtils.createStream(ssc, localhost, 4545,
StorageLevel.MEMORY_ONLY_SER_2) 

  lines.foreachRDD(rdd = rdd.foreachPartition(json =
Processor.ProcessRecord(json, badWords, urlBlacklist, domainBlacklist,
emailBlacklist)))
  ssc.start()
  ssc.awaitTermination()   
} 

Here is the code for processing the files found inside the ProcessRecord
method:
val emailBlacklistCnt = emailBlacklist.filter(black =
black.contains(email)).count

It looks like this throws an exception.  Is it possible to do this?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-call-a-transform-action-inside-an-action-tp17568.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to properly debug spark streaming?

2014-10-28 Thread kpeng1
I am still fairly new to spark and spark streaming.  I have been struggling
with how to properly debug spark streaming and I was wondering what is the
best approach.  I have been basically putting println statements everywhere,
but sometimes they show up when I run the job and sometimes they don't.  I
currently run the job using the following command:
/usr/bin/spark-submit --class project.TheMain --master local[2]
/home/cloudera/my.jar 100





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-properly-debug-spark-streaming-tp17571.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Is it possible to call a transform + action inside an action?

2014-10-28 Thread kpeng1
Ok cool.  So in that case the only way I could think of doing this would be
calling the toArray method on those RDDs which would return Array[String]
and store them as broadcast variables.  I read about the broadcast
variables, but it still fuzzy.  I am assume that since broadcast variables
are available to all nodes in the cluster... programmatically I can treat
them as globals and use them in different object from the ones that they are
defined in?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-call-a-transform-action-inside-an-action-tp17568p17572.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Invalid signature file digest for Manifest main attributes with spark job built using maven

2014-09-15 Thread kpeng1
Hi All,

I am trying to submit a spark job that I have built in maven using the
following command:
/usr/bin/spark-submit --deploy-mode client --class com.spark.TheMain
--master local[1] /home/cloudera/myjar.jar 100

But I seem to be getting the following error:
Exception in thread main java.lang.SecurityException: Invalid signature
file digest for Manifest main attributes
at
sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:286)
at
sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:239)
at java.util.jar.JarVerifier.processEntry(JarVerifier.java:307)
at java.util.jar.JarVerifier.update(JarVerifier.java:218)
at java.util.jar.JarFile.initializeVerifier(JarFile.java:345)
at java.util.jar.JarFile.getInputStream(JarFile.java:412)
at 
sun.misc.URLClassPath$JarLoader$2.getInputStream(URLClassPath.java:775)
at sun.misc.Resource.cachedInputStream(Resource.java:77)
at sun.misc.Resource.getByteBuffer(Resource.java:160)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:436)
at java.net.URLClassLoader.access$100(URLClassLoader.java:71)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:289)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


Here is the pom file I am using to build the jar:
project xmlns=http://maven.apache.org/POM/4.0.0;
xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance;
xsi:schemaLocation=http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd;
  modelVersion4.0.0/modelVersion
  groupIdcom.spark/groupId
  artifactIdmyjar/artifactId
  version0.0.1-SNAPSHOT/version
  name${project.artifactId}/name
  descriptionMy wonderfull scala app/description
  inceptionYear2010/inceptionYear
  licenses
license
  nameMy License/name
  urlhttp:///url
  distributionrepo/distribution
/license
  /licenses

  properties
cdh.versioncdh5.1.0/cdh.version
maven.compiler.source1.6/maven.compiler.source
maven.compiler.target1.6/maven.compiler.target
encodingUTF-8/encoding
scala.tools.version2.10/scala.tools.version
scala.version2.10.4/scala.version
  /properties

  repositories
repository
  idscala-tools.org/id
  nameScala-tools Maven2 Repository/name
  urlhttps://oss.sonatype.org/content/repositories/snapshots//url
/repository
repository
  idmaven-hadoop/id
  nameHadoop Releases/name
 
urlhttps://repository.cloudera.com/content/repositories/releases//url
/repository
repository
  idcloudera-repos/id
  nameCloudera Repos/name
  urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url
/repository
  /repositories
  pluginRepositories
pluginRepository
  idscala-tools.org/id
  nameScala-tools Maven2 Repository/name
  urlhttps://oss.sonatype.org/content/repositories/snapshots//url
/pluginRepository
  /pluginRepositories

  dependencies
dependency
  groupIdorg.scala-lang/groupId
  artifactIdscala-library/artifactId
  version${scala.version}/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-core_2.10/artifactId
  version1.0.0-${cdh.version}/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-tools_2.10/artifactId
  version1.0.0-${cdh.version}/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-streaming-flume_2.10/artifactId
  version1.0.0-${cdh.version}/version
/dependency
dependency
  groupIdorg.apache.spark/groupId
  artifactIdspark-streaming_2.10/artifactId
  version1.0.0-${cdh.version}/version
/dependency
dependency
  groupIdorg.apache.flume/groupId
  artifactIdflume-ng-sdk/artifactId
  version1.5.0-${cdh.version}/version
  
  exclusions
exclusion
  groupIdio.netty/groupId
  artifactIdnetty/artifactId
/exclusion
  /exclusions
/dependency
dependency
  groupIdorg.apache.flume/groupId
  artifactIdflume-ng-core/artifactId
  version1.5.0-${cdh.version}/version
  
  exclusions
exclusion
  groupIdio.netty/groupId
  artifactIdnetty/artifactId
/exclusion
  /exclusions
/dependency
dependency
  

Spark Streaming into HBase

2014-09-03 Thread kpeng1
I have been trying to understand how spark streaming and hbase connect, but
have not been successful. What I am trying to do is given a spark stream,
process that stream and store the results in an hbase table. So far this is
what I have:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
import org.apache.hadoop.hbase.util.Bytes

def blah(row: Array[String]) {
  val hConf = new HBaseConfiguration()
  val hTable = new HTable(hConf, table)
  val thePut = new Put(Bytes.toBytes(row(0)))
  thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)),
Bytes.toBytes(row(0)))
  hTable.put(thePut)
}

val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream(localhost, ,
StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.map(_.split(,))
val store = words.foreachRDD(rdd = rdd.foreach(blah))
ssc.start()

I am currently running the above code in spark-shell. I am not sure what I
am doing wrong.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming into HBase

2014-09-03 Thread kpeng1
Sean,

I create a streaming context near the bottom of the code (ssc) and
basically apply a foreachRDD on the resulting DStream so that I can get
access to the underlying RDD, which in return I apply a foreach on and pass
in my function which applies the storing logic.

Is there a different approach I should be using?

Thanks for the help.


On Wed, Sep 3, 2014 at 2:43 PM, Sean Owen-2 [via Apache Spark User List] 
ml-node+s1001560n13385...@n3.nabble.com wrote:

 This doesn't seem to have to do with HBase per se. Some function is
 getting the StreamingContext into the closure and that won't work. Is
 this exactly the code? since it doesn't reference a StreamingContext,
 but is there maybe a different version in reality that tries to use
 StreamingContext inside a function?

 On Wed, Sep 3, 2014 at 10:36 PM, Ted Yu [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=0 wrote:

  Adding back user@
 
  I am not familiar with the NotSerializableException. Can you show the
 full
  stack trace ?
 
  See SPARK-1297 for changes you need to make so that Spark works with
 hbase
  0.98
 
  Cheers
 
 
  On Wed, Sep 3, 2014 at 2:33 PM, Kevin Peng [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=1 wrote:
 
  Ted,
 
  The hbase-site.xml is in the classpath (had worse issues before...
 until I
  figured that it wasn't in the path).
 
  I get the following error in the spark-shell:
  org.apache.spark.SparkException: Job aborted due to stage failure: Task
  not serializable: java.io.NotSerializableException:
  org.apache.spark.streaming.StreamingContext
  at
  org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.sc

  ...
 
  I also double checked the hbase table, just in case, and nothing new is
  written in there.
 
  I am using hbase version: 0.98.1-cdh5.1.0 the default one with the
  CDH5.1.0 distro.
 
  Thank you for the help.
 
 
  On Wed, Sep 3, 2014 at 2:09 PM, Ted Yu [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=2 wrote:
 
  Is hbase-site.xml in the classpath ?
  Do you observe any exception from the code below or in region server
 log
  ?
 
  Which hbase release are you using ?
 
 
  On Wed, Sep 3, 2014 at 2:05 PM, kpeng1 [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=3 wrote:
 
  I have been trying to understand how spark streaming and hbase
 connect,
  but
  have not been successful. What I am trying to do is given a spark
  stream,
  process that stream and store the results in an hbase table. So far
 this
  is
  what I have:
 
  import org.apache.spark.SparkConf
  import org.apache.spark.streaming.{Seconds, StreamingContext}
  import org.apache.spark.streaming.StreamingContext._
  import org.apache.spark.storage.StorageLevel
  import org.apache.hadoop.hbase.HBaseConfiguration
  import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get}
  import org.apache.hadoop.hbase.util.Bytes
 
  def blah(row: Array[String]) {
val hConf = new HBaseConfiguration()
val hTable = new HTable(hConf, table)
val thePut = new Put(Bytes.toBytes(row(0)))
thePut.add(Bytes.toBytes(cf), Bytes.toBytes(row(0)),
  Bytes.toBytes(row(0)))
hTable.put(thePut)
  }
 
  val ssc = new StreamingContext(sc, Seconds(1))
  val lines = ssc.socketTextStream(localhost, ,
  StorageLevel.MEMORY_AND_DISK_SER)
  val words = lines.map(_.split(,))
  val store = words.foreachRDD(rdd = rdd.foreach(blah))
  ssc.start()
 
  I am currently running the above code in spark-shell. I am not sure
 what
  I
  am doing wrong.
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
  -
  To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=4
  For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=5
 
 
 
 

 -
 To unsubscribe, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=6
 For additional commands, e-mail: [hidden email]
 http://user/SendEmail.jtp?type=nodenode=13385i=7



 --
  If you reply to this email, your message will be added to the discussion
 below:

 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-into-HBase-tp13378p13385.html
  To unsubscribe from Spark Streaming into HBase, click here
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=13378code=a3BlbmcxQGdtYWlsLmNvbXwxMzM3OHwxMjA2NzA5NzQ3
 .
 NAML
 http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace

Re: Issue Connecting to HBase in spark shell

2014-08-27 Thread kpeng1
It looks like the issue I had is that I didn't pull in htrace-core jar into
the spark class path.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-Connecting-to-HBase-in-spark-shell-tp12855p12924.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org