Job Error:Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@130.1.10.108:23600/)

2015-12-24 Thread donhoff_h
Hi,folks


I wrote some spark jobs and these jobs could ran successfully when I ran them 
one by one. But if I ran them concurrently, for example 12 jobs parallel 
running, I met the following error. Could anybody tell me what cause this? How 
to solve it? Many Thanks!


Exception in thread "main" akka.actor.ActorNotFound: Actor not found for: 
ActorSelection[Anchor(akka.tcp://sparkDriver@130.1.10.108:23600/), 
Path(/user/MapOutputTracker)]
at 
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65)
at 
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at 
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at 
akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at 
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:89)
at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Spark Streaming - print accumulators value every period as logs

2015-12-24 Thread Roberto Coluccio
Hello,

I have a batch and a streaming driver using same functions (Scala). I use
accumulators (passed to functions constructors) to count stuff.

In the batch driver, doing so in the right point of the pipeline, I'm able
to retrieve the accumulator value and print it as log4j log.

In the streaming driver, doing the same results in just nothing. That's
probably due to the fact that accumulators in the streaming driver are
created empty and the code to print them is executed once at the driver
(when they are empty) when the StreamingContext is started and the DAG is
created.

I'm looking for a way to log at every batch period of my Spark Streaming
driver the current value of my accumulators. Indeed, I wish to reset such
accumulators at each period so to just have the counts related to that
period.

Any advice would be really appreciated.

Thanks,
Roberto


How can I get the column data based on specific column name and then stored these data in array or list ?

2015-12-24 Thread zml张明磊
Hi,

   I am a new to Scala and Spark and trying to find relative API in 
DataFrame to solve my problem as title described. However, I just only find 
this API DataFrame.col(colName : String) : Column which returns an object of 
Column. Not the content. If only DataFrame support such API which like 
Column.toArray : Type is enough for me. But now, it doesn’t. How can I do can 
achieve this function ?

Thanks,
Minglei.


Re: How to ignore case in dataframe groupby?

2015-12-24 Thread Yanbo Liang
You can use DF.groupBy(upper(col("a"))).agg(sum(col("b"))).
DataFrame provide function "upper" to update column to uppercase.

2015-12-24 20:47 GMT+08:00 Eran Witkon :

> Use DF.withColumn("upper-code",df("countrycode).toUpper))
> or just run a map function that does the same
>
> On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja 
> wrote:
>
>> Hi,
>> Values in a dataframe column named countrycode are in different cases.
>> Eg: (US, us).  groupBy & count gives two rows but the requirement is to
>> ignore case for this operation.
>> 1) Is there a way to ignore case in groupBy? Or
>> 2) Is there a way to update the dataframe column countrycode to uppercase?
>>
>> Thanks in advance.
>>
>> Regards,
>> Raja
>>
>


Re: Hive error when starting up spark-shell in 1.5.2

2015-12-24 Thread Marco Mistroni
No luck.
But two updates:
1. i have downloaded spark-1.4.1 and everything works fine, i dont see any
error
2. i have added the following XML file to spark's 1.5.2  conf directory and
now i got the following error

aused by: java.lang.RuntimeException: The root scratch dir:
c:/Users/marco/tmp on HDFS should be writable. Current permissions are:
rwx---rwx

I will have to play around with windows permissions to allow spark to use
that directory

kr
 marco




On Sun, Dec 20, 2015 at 5:15 PM, Marco Mistroni  wrote:

> Thanks Chris will give it a go and report back.
> Bizarrely if I start the pyspark shell I don't see any issues
> Kr
> Marco
> On 20 Dec 2015 5:02 pm, "Chris Fregly"  wrote:
>
>> hopping on a plane, but check the hive-site.xml that's in your spark/conf
>> directory (or should be, anyway).  I believe you can change the root path
>> thru this mechanism.
>>
>> if not, this should give you more info google on.
>>
>> let me know as this comes up a fair amount.
>>
>> > On Dec 19, 2015, at 4:58 PM, Marco Mistroni 
>> wrote:
>> >
>> > HI all
>> >  posting again this as i was experiencing this error also under 1.5.1
>> > I am running spark 1.5.2 on a Windows 10 laptop (upgraded from Windows
>> 8)
>> > When i launch spark-shell i am getting this exception, presumably
>> becaus ei hav eno
>> > admin right to /tmp directory on my latpop (windows 8-10 seems very
>> restrictive)
>> >
>> > java.lang.RuntimeException: java.lang.RuntimeException: The root
>> scratch dir: /tmp/hive on HDFS should be writable. Current permissions are:
>> -
>> > at
>> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
>> > at
>> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
>> > at
>> org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:162)
>> > at
>> org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:160)
>> > at
>> org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:167)
>> > at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> > at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> > at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> > at
>> java.lang.reflect.Constructor.newInstance(Constructor.java:422)
>> > at
>> org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
>> > at $iwC$$iwC.(:9)
>> > at $iwC.(:18)
>> > at (:20)
>> > at .(:24)
>> > at .()
>> > at .(:7)
>> > at .()
>> > at $print()
>> > ..
>> > Caused by: java.lang.RuntimeException: The root scratch dir: /tmp/hive
>> on HDFS should be writable. Current permissions are: -
>> > at
>> org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:612)
>> > at
>> org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
>> > at
>> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
>> > ... 56 more
>> >
>> > :10: error: not found: value sqlContext
>> >import sqlContext.implicits._
>> >   ^
>> > :10: error: not found: value sqlContext
>> >import sqlContext.sql
>> >
>> > I was wondering how can i configure hive to point to a different
>> directorywhere i have more permissions
>> >
>> > kr
>> >  marco
>> >
>>
>


how to debug java.lang.IllegalArgumentException: object is not an instance of declaring class

2015-12-24 Thread Andy Davidson
Hi 

Any idea how I can debug this problem. I suspect the problem has to do with
how I am converting a JavaRDD> to a DataFrame.

Is it boxing problem? I tried to use long and double instead of Long and
Double when ever possible.

Thanks in advance, Happy Holidays.

Andy

allData.printSchema()
root

 |-- label: string (nullable = true)

 |-- text: string (nullable = true)

 |-- id: long (nullable = true)

 |-- createdAt: long (nullable = true)

 |-- binomialLabel: string (nullable = true)

 |-- words: array (nullable = true)

 ||-- element: string (containsNull = true)

 |-- features: vector (nullable = true)

 |-- labelIndex: double (nullable = true)



//

// make predictions using all the data

// The real out of sample error will be higher

//

JavaRDD> predictions =
idLabeledPoingRDD.map((Tuple2 t2) -> {

Long id = t2._1();

LabeledPoint lp = t2._2();

double prediction = naiveBayesModel.predict(lp.features());

return new Tuple2(id, prediction);

});



  

public class Prediction {

double prediction;

long id;

Public Getters and setters Š

}

DataFrame predictionDF = sqlContext.createDataFrame(predictions,
Prediction.class);


predictionDF.printSchema()
root

 |-- id: long (nullable = false)

 |-- prediction: double (nullable = false)



DataFrame results = allData.join(predictionDF, "id");

results.show()

Here is the top of long stack trace. I do not know how it relates back to my
code. I do not see any of my class, colNames, function names, Š

java.lang.IllegalArgumentException: object is not an instance of declaring
class

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[na:1.8.0_66]

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
) ~[na:1.8.0_66]

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43) ~[na:1.8.0_66]

at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_66]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2
.apply(SQLContext.scala:500) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2
.apply(SQLContext.scala:500) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
244) ~[scala-library-2.10.5.jar:na]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
244) ~[scala-library-2.10.5.jar:na]

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala
:33) ~[scala-library-2.10.5.jar:na]

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
~[scala-library-2.10.5.jar:na]

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
~[scala-library-2.10.5.jar:na]

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
~[scala-library-2.10.5.jar:na]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext
.scala:500) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1.apply(SQLContext
.scala:498) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
~[scala-library-2.10.5.jar:na]

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
~[scala-library-2.10.5.jar:na]

at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
~[scala-library-2.10.5.jar:na]

at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassM
ergeSortShuffleWriter.java:119) ~[spark-core_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scal
a:73) ~[spark-core_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
~[spark-core_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
~[spark-core_2.10-1.5.2.jar:1.5.2]

at org.apache.spark.scheduler.Task.run(Task.scala:88)
~[spark-core_2.10-1.5.2.jar:1.5.2]

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
~[spark-core_2.10-1.5.2.jar:1.5.2]

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42) [na:1.8.0_66]

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17) [na:1.8.0_66]

at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]




Re: how to debug java.lang.IllegalArgumentException: object is not an instance of declaring class

2015-12-24 Thread Andy Davidson
Problem must be with how I am converting  JavaRDD> to a
DataFrame. 

Any suggestions? Most of my work has been done using pySpark. Tuples are a
lot harder to work with in Java.

  JavaRDD> predictions =
idLabeledPoingRDD.map((Tuple2 t2) -> {

Long id = t2._1();

LabeledPoint lp = t2._2();

double prediction = naiveBayesModel.predict(lp.features());

return new Tuple2(id, prediction);

});



List> debug = predictions.take(3);

for (Tuple2 t : debug) {

logger.warn("prediction: {}", t.toString());

}



//

// evaluate

//

DataFrame predictionDF = sqlContext.createDataFrame(predictions,
Prediction.class);

predictionDF.printSchema();

predictionDF.show();

   



java.lang.IllegalArgumentException: object is not an instance of declaring
class

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[na:1.8.0_66]

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
) ~[na:1.8.0_66]

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43) ~[na:1.8.0_66]

at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_66]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2
.apply(SQLContext.scala:500) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at 
org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2
.apply(SQLContext.scala:500) ~[spark-sql_2.10-1.5.2.jar:1.5.2]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
244) ~[scala-library-2.10.5.jar:na]

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:
244) ~[scala-library-2.10.5.jar:na]

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala
:33) ~[scala-library-2.10.5.jar:na]

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
~[scala-library-2.10.5.jar:na]

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
~[scala-library-2.10.5.jar:na]

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
~[scala-library-2.10.5.jar:na]


From:  Andrew Davidson 
Date:  Thursday, December 24, 2015 at 9:55 AM
To:  "user @spark" 
Subject:  how to debug java.lang.IllegalArgumentException: object is not an
instance of declaring class

> Hi 
> 
> Any idea how I can debug this problem. I suspect the problem has to do with
> how I am converting a JavaRDD> to a DataFrame.
> 
> Is it boxing problem? I tried to use long and double instead of Long and
> Double when ever possible.
> 
> Thanks in advance, Happy Holidays.
> 
> Andy
> 
> allData.printSchema()
> root
> 
>  |-- label: string (nullable = true)
> 
>  |-- text: string (nullable = true)
> 
>  |-- id: long (nullable = true)
> 
>  |-- createdAt: long (nullable = true)
> 
>  |-- binomialLabel: string (nullable = true)
> 
>  |-- words: array (nullable = true)
> 
>  ||-- element: string (containsNull = true)
> 
>  |-- features: vector (nullable = true)
> 
>  |-- labelIndex: double (nullable = true)
> 
> 
> 
> //
> 
> // make predictions using all the data
> 
> // The real out of sample error will be higher
> 
> //
> 
> JavaRDD> predictions =
> idLabeledPoingRDD.map((Tuple2 t2) -> {
> 
> Long id = t2._1();
> 
> LabeledPoint lp = t2._2();
> 
> double prediction = naiveBayesModel.predict(lp.features());
> 
> return new Tuple2(id, prediction);
> 
> });
> 
> 
> 
>   
> 
> public class Prediction {
> 
> double prediction;
> 
> long id;
> 
> Public Getters and setters Š
> 
> }
> 
> DataFrame predictionDF = sqlContext.createDataFrame(predictions,
> Prediction.class);
> 
> 
> predictionDF.printSchema()
> root
> 
>  |-- id: long (nullable = false)
> 
>  |-- prediction: double (nullable = false)
> 
> 
> 
> DataFrame results = allData.join(predictionDF, "id");
> 
> results.show()
> 
> Here is the top of long stack trace. I do not know how it relates back to my
> code. I do not see any of my class, colNames, function names, Š
> 
> java.lang.IllegalArgumentException: object is not an instance of declaring
> class
> 
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_66]
> 
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[na:1.8.0_66]
> 
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.j
> ava:43) ~[na:1.8.0_66]
> 
> at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_66]
> 
> at 
> org.apache.spark.sql.SQLContext$$anonfun$9$$anonfun$apply$1$$anonfun$apply$2.a
> 

Extract compressed JSON withing JSON

2015-12-24 Thread Eran Witkon
Hi,

I have a JSON file with the following row format:
{"cty":"United
Kingdom","gzip":"H4sIAKtWystVslJQcs4rLVHSUUouqQTxQvMyS1JTFLwz89JT8nOB4hnFqSBxj/zS4lSF/DQFl9S83MSibKBMZVExSMbQwNBM19DA2FSpFgDvJUGVUw==","nm":"Edmund
lronside","yrs":"1016"}

The gzip field is a compressed JSON by itself

I want to read the file and build the full nested JSON as a row:

{"cty":"United Kingdom","hse":{"nm": "Cnut","cty": "United
Kingdom","hse": "House of Denmark","yrs": "1016-1035"},"nm":"Edmund
lronside","yrs":"1016"}

I already have the function which extract the compressed field to a string.

Questions:

*if I use the following code the build the RDD :*

val jsonData = sqlContext.read.json(sourceFilesPath)
//
//loop through the DataFrame and manipulate the gzip Filed

val jsonUnGzip = jsonData.map(r => Row(r.getString(0),
GZipHelper.unCompress(r.getString(1)).get, r.getString(2),
r.getString(3)))

*I get a row with 4 columns (String,String,String,String)*

 org.apache.spark.sql.Row = [United Kingdom,{"nm": "Cnut","cty":
"United Kingdom","hse": "House of Denmark","yrs": "1016-1035"},Edmund
lronside,1016]

*Now, I can't tell Spark to "re-parse" Col(1) as JSON, right?*

I seen some post about using case classes or explode but I don't
understand how this can help here?

Eran


Re: Using Java Function API with Java 8

2015-12-24 Thread Sean Owen
You forgot a return statement in the 'else' clause, which is what the
compiler is telling you. There's nothing more to it here. Your
function is much simpler however as

Function checkHeaders2 = (x ->
x.startsWith("npi")||x.startsWith("CPT"));

On Thu, Dec 24, 2015 at 1:13 AM, rdpratti  wrote:
> I am trying to pass lambda expressions to Spark JavaRDD methods.
>
> Having using lambda expressions in Java, in general, I was hoping for
> similar behavour and coding patterns, but am finding confusing compile
> errors.
>
> The use case is a lambda expression that has a number of statements,
> returning a boolean from various points in the logic.
>
> I have tried both inline, as well as defining a Function functional type
> with no luck.
>
> Here is an example:
>
> Function checkHeaders2 = x -> {if
> (x.startsWith("npi")||x.startsWith("CPT"))
>   
>   return new Boolean(false);
>   
>   else new Boolean(true); };
>
> This code gets an error stating that method must return a Boolean.
>
> I know that the lambda expression can be shortened and included as a simple
> one statement return, but using non-Spark Java 8 and a Predicate functional
> type this would compile and be usable.
>
> What am I missing and how to use the Spark Function to define lambda
> exressions made up of mutliple Java statements.
>
> Thanks
>
> rd
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Java-Function-API-with-Java-8-tp25794.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming 1.5.2+Kafka+Python. Strange reading

2015-12-24 Thread Akhil Das
Would you mind posting the relevant code snippet?

Thanks
Best Regards

On Wed, Dec 23, 2015 at 7:33 PM, Vyacheslav Yanuk 
wrote:

> Hi.
> I have very strange situation with direct reading from Kafka.
> For example.
> I have 1000 messages in Kafka.
> After submitting my application I read this data and process it.
> As I process the data I have accumulated 10 new entries.
> In next reading from Kafka I read only 3 records, but not 10!!!
> Why???
> I don't understand...
> Explain to me please!
>
> --
> WBR, Vyacheslav Yanuk
> Codeminders.com
>


Re: How to Parse & flatten JSON object in a text file using Spark into Dataframe

2015-12-24 Thread Eran Witkon
raja! I found the answer to your question!
Look at
http://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes
this is what you (and I) was looking for.
general idea - you read the list as text where project Details is just a
string field and then you build the JSON string representation of the whole
line and you have a nested JSON schema which SparkSQL can read.

Eran

On Thu, Dec 24, 2015 at 10:26 AM Eran Witkon  wrote:

> I don't have the exact answer for you but I would look for something using
> explode method on DataFrame
>
> On Thu, Dec 24, 2015 at 7:34 AM Bharathi Raja  wrote:
>
>> Thanks Gokul, but the file I have had the same format as I have
>> mentioned. First two columns are not in Json format.
>>
>> Thanks,
>> Raja
>> --
>> From: Gokula Krishnan D 
>> Sent: ‎12/‎24/‎2015 2:44 AM
>> To: Eran Witkon 
>> Cc: raja kbv ; user@spark.apache.org
>>
>> Subject: Re: How to Parse & flatten JSON object in a text file using
>> Spark  into Dataframe
>>
>> You can try this .. But slightly modified the  input structure since
>> first two columns were not in Json format.
>>
>> [image: Inline image 1]
>>
>> Thanks & Regards,
>> Gokula Krishnan* (Gokul)*
>>
>> On Wed, Dec 23, 2015 at 9:46 AM, Eran Witkon 
>> wrote:
>>
>>> Did you get a solution for this?
>>>
>>> On Tue, 22 Dec 2015 at 20:24 raja kbv  wrote:
>>>
 Hi,

 I am new to spark.

 I have a text file with below structure.


 (employeeID: Int, Name: String, ProjectDetails:
 JsonObject{[{ProjectName, Description, Duriation, Role}]})
 Eg:
 (123456, Employee1, {“ProjectDetails”:[
  {
 “ProjectName”: “Web Develoement”, “Description” : “Online Sales website”,
 “Duration” : “6 Months” , “Role” : “Developer”}
  {
 “ProjectName”: “Spark Develoement”, “Description” : “Online Sales
 Analysis”, “Duration” : “6 Months” , “Role” : “Data Engineer”}
  {
 “ProjectName”: “Scala Training”, “Description” : “Training”, “Duration” :
 “1 Month” }
   ]
 }


 Could someone help me to parse & flatten the record as below dataframe
 using scala?

 employeeID,Name, ProjectName, Description, Duration, Role
 123456, Employee1, Web Develoement, Online Sales website, 6 Months ,
 Developer
 123456, Employee1, Spark Develoement, Online Sales Analysis, 6 Months,
 Data Engineer
 123456, Employee1, Scala Training, Training, 1 Month, null


 Thank you in advance.

 Regards,
 Raja

>>>
>>


Re: error in spark cassandra connector

2015-12-24 Thread Ted Yu
Mind providing a bit more detail ?

Release of Spark
version of Cassandra connector
How job was submitted
complete stack trace

Thanks

On Thu, Dec 24, 2015 at 2:06 AM, Vijay Kandiboyina  wrote:

> java.lang.NoClassDefFoundError:
> com/datastax/spark/connector/rdd/CassandraTableScanRDD
>
>


Re: Extract compressed JSON withing JSON

2015-12-24 Thread Eran Witkon
Answered using StackOverflow. if you are looking for the solution:
This is the trick:


val jsonNested = sqlContext.read.json(jsonUnGzip.map{case
Row(cty:String, json:String,nm:String,yrs:String) => s"""{"cty":
\"$cty\", "extractedJson": $json , "nm": \"$nm\" , "yrs":
\"$yrs\"}"""})

See this link for source
http://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes

Eran


On Thu, Dec 24, 2015 at 11:42 AM Eran Witkon  wrote:

> Hi,
>
> I have a JSON file with the following row format:
> {"cty":"United
> Kingdom","gzip":"H4sIAKtWystVslJQcs4rLVHSUUouqQTxQvMyS1JTFLwz89JT8nOB4hnFqSBxj/zS4lSF/DQFl9S83MSibKBMZVExSMbQwNBM19DA2FSpFgDvJUGVUw==","nm":"Edmund
> lronside","yrs":"1016"}
>
> The gzip field is a compressed JSON by itself
>
> I want to read the file and build the full nested JSON as a row:
>
> {"cty":"United Kingdom","hse":{"nm": "Cnut","cty": "United Kingdom","hse": 
> "House of Denmark","yrs": "1016-1035"},"nm":"Edmund lronside","yrs":"1016"}
>
> I already have the function which extract the compressed field to a string.
>
> Questions:
>
> *if I use the following code the build the RDD :*
>
> val jsonData = sqlContext.read.json(sourceFilesPath)
> //
> //loop through the DataFrame and manipulate the gzip Filed
>
> val jsonUnGzip = jsonData.map(r => Row(r.getString(0), 
> GZipHelper.unCompress(r.getString(1)).get, r.getString(2), r.getString(3)))
>
> *I get a row with 4 columns (String,String,String,String)*
>
>  org.apache.spark.sql.Row = [United Kingdom,{"nm": "Cnut","cty": "United 
> Kingdom","hse": "House of Denmark","yrs": "1016-1035"},Edmund lronside,1016]
>
> *Now, I can't tell Spark to "re-parse" Col(1) as JSON, right?*
>
> I seen some post about using case classes or explode but I don't understand 
> how this can help here?
>
> Eran
>
>


Re: How to contribute by picking up starter bugs

2015-12-24 Thread Ted Yu
You can send out pull request for the JIRA you're interested in.

Start the title of pull request with:
[SPARK-XYZ] ...

where XYZ is the JIRA number.

The pull request would be posted on the JIRA.
After pull request is reviewed, tested by QA and merged, the committer
would assign your name to the JIRA.

Cheers

On Thu, Dec 24, 2015 at 2:44 AM, lokeshkumar  wrote:

> Hi
>
> From the how to contribute page of spark jira project I came to know that I
> can start by picking up the starter label bugs.
> But who will assign me these bugs? Or should I just fix them and create a
> pull request.
>
> Will be glad to help the project.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-contribute-by-picking-up-starter-bugs-tp25795.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Spark Streaming + Kafka + scala job message read issue

2015-12-24 Thread Bryan
Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you’ve specified for your topic 
match the number of partitions in the topic on Kafka? 

That would be an possible cause – as you might receive all data from a given 
partition while missing data from other partitions.

Regards,

Bryan Jeffrey

Sent from Outlook Mail for Windows 10 phone


From: vivek.meghanat...@wipro.com
Sent: Thursday, December 24, 2015 5:22 AM
To: user@spark.apache.org
Subject: Spark Streaming + Kafka + scala job message read issue

Hi All,


We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.
Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers
Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com 



How to ignore case in dataframe groupby?

2015-12-24 Thread Bharathi Raja
Hi,
Values in a dataframe column named countrycode are in different cases. Eg: (US, 
us).  groupBy & count gives two rows but the requirement is to ignore case for 
this operation.
1) Is there a way to ignore case in groupBy? Or
2) Is there a way to update the dataframe column countrycode to uppercase?

Thanks in advance.

Regards,
Raja

How to contribute by picking up starter bugs

2015-12-24 Thread lokeshkumar
Hi 

>From the how to contribute page of spark jira project I came to know that I
can start by picking up the starter label bugs.
But who will assign me these bugs? Or should I just fix them and create a
pull request. 

Will be glad to help the project.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-contribute-by-picking-up-starter-bugs-tp25795.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



error in spark cassandra connector

2015-12-24 Thread Vijay Kandiboyina
java.lang.NoClassDefFoundError:
com/datastax/spark/connector/rdd/CassandraTableScanRDD


Newbie Help for spark's not finding native hadoop warning

2015-12-24 Thread Bilinmek Istemiyor
Hello,

I have apache spark 1.5.1 installed with the help of  this user group. I
receive following error when I start pyshell

WARN NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable

Later I have downloaded native binary from hadoop site and defined
environment variabla HADOOP_HOME

How can I make spark use hadoop?


Spark Streaming + Kafka + scala job message read issue

2015-12-24 Thread vivek.meghanathan
Hi All,




We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.

Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers

Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com


RE: How to Parse & flatten JSON object in a text file using Spark into Dataframe

2015-12-24 Thread Bharathi Raja
Thanks Eran, I'll check the solution.

Regards,
Raja

-Original Message-
From: "Eran Witkon" 
Sent: ‎12/‎24/‎2015 4:07 PM
To: "Bharathi Raja" ; "Gokula Krishnan D" 

Cc: "user@spark.apache.org" 
Subject: Re: How to Parse & flatten JSON object in a text file using 
Spark into Dataframe

raja! I found the answer to your question! 
Look at 
http://stackoverflow.com/questions/34069282/how-to-query-json-data-column-using-spark-dataframes
this is what you (and I) was looking for.
general idea - you read the list as text where project Details is just a string 
field and then you build the JSON string representation of the whole line and 
you have a nested JSON schema which SparkSQL can read.


Eran


On Thu, Dec 24, 2015 at 10:26 AM Eran Witkon  wrote:

I don't have the exact answer for you but I would look for something using 
explode method on DataFrame  


On Thu, Dec 24, 2015 at 7:34 AM Bharathi Raja  wrote:

Thanks Gokul, but the file I have had the same format as I have mentioned. 
First two columns are not in Json format.

Thanks,
Raja


From: Gokula Krishnan D
Sent: ‎12/‎24/‎2015 2:44 AM
To: Eran Witkon
Cc: raja kbv; user@spark.apache.org

Subject: Re: How to Parse & flatten JSON object in a text file using Spark 
 into Dataframe


You can try this .. But slightly modified the  input structure since first two 
columns were not in Json format. 






Thanks & Regards, 
Gokula Krishnan (Gokul)


On Wed, Dec 23, 2015 at 9:46 AM, Eran Witkon  wrote:

Did you get a solution for this?


On Tue, 22 Dec 2015 at 20:24 raja kbv  wrote:

Hi,


I am new to spark.


I have a text file with below structure.


 
(employeeID: Int, Name: String, ProjectDetails: JsonObject{[{ProjectName, 
Description, Duriation, Role}]})
Eg:
(123456, Employee1, {“ProjectDetails”:[
 { “ProjectName”: “Web 
Develoement”, “Description” : “Online Sales website”, “Duration” : “6 Months” , 
“Role” : “Developer”}
 { “ProjectName”: 
“Spark Develoement”, “Description” : “Online Sales Analysis”, “Duration” : “6 
Months” , “Role” : “Data Engineer”}
 { “ProjectName”: 
“Scala Training”, “Description” : “Training”, “Duration” : “1 Month” }
  ]
}
 
 
Could someone help me to parse & flatten the record as below dataframe using 
scala?
 
employeeID,Name, ProjectName, Description, Duration, Role
123456, Employee1, Web Develoement, Online Sales website, 6 Months , Developer
123456, Employee1, Spark Develoement, Online Sales Analysis, 6 Months, Data 
Engineer
123456, Employee1, Scala Training, Training, 1 Month, null
 


Thank you in advance.


Regards,
Raja

Re: How to ignore case in dataframe groupby?

2015-12-24 Thread Eran Witkon
Use DF.withColumn("upper-code",df("countrycode).toUpper))
or just run a map function that does the same

On Thu, Dec 24, 2015 at 2:05 PM Bharathi Raja 
wrote:

> Hi,
> Values in a dataframe column named countrycode are in different cases. Eg:
> (US, us).  groupBy & count gives two rows but the requirement is to ignore
> case for this operation.
> 1) Is there a way to ignore case in groupBy? Or
> 2) Is there a way to update the dataframe column countrycode to uppercase?
>
> Thanks in advance.
>
> Regards,
> Raja
>


Re: running lda in spark throws exception

2015-12-24 Thread Li Li
anyone could help?

On Wed, Dec 23, 2015 at 1:40 PM, Li Li  wrote:
> I ran my lda example in a yarn 2.6.2 cluster with spark 1.5.2.
> it throws exception in line:   Matrix topics = ldaModel.topicsMatrix();
> But in yarn job history ui, it's successful. What's wrong with it?
> I submit job with
> .bin/spark-submit --class Myclass \
> --master yarn-client \
> --num-executors 2 \
> --driver-memory 4g \
> --executor-memory 4g \
> --executor-cores 1 \
>
>
> My codes:
>
>corpus.cache();
>
>
> // Cluster the documents into three topics using LDA
>
> DistributedLDAModel ldaModel = (DistributedLDAModel) new
> LDA().setOptimizer("em").setMaxIterations(iterNumber).setK(topicNumber).run(corpus);
>
>
> // Output topics. Each is a distribution over words (matching word
> count vectors)
>
> System.out.println("Learned topics (as distributions over vocab of
> " + ldaModel.vocabSize()
>
> + " words):");
>
>//Line81, exception here:Matrix topics = ldaModel.topicsMatrix();
>
> for (int topic = 0; topic < topicNumber; topic++) {
>
>   System.out.print("Topic " + topic + ":");
>
>   for (int word = 0; word < ldaModel.vocabSize(); word++) {
>
> System.out.print(" " + topics.apply(word, topic));
>
>   }
>
>   System.out.println();
>
> }
>
>
> ldaModel.save(sc.sc(), modelPath);
>
>
> Exception in thread "main" java.lang.IndexOutOfBoundsException:
> (1025,0) not in [-58,58) x [-100,100)
>
> at 
> breeze.linalg.DenseMatrix$mcD$sp.update$mcD$sp(DenseMatrix.scala:112)
>
> at 
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:534)
>
> at 
> org.apache.spark.mllib.clustering.DistributedLDAModel$$anonfun$topicsMatrix$1.apply(LDAModel.scala:531)
>
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at 
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix$lzycompute(LDAModel.scala:531)
>
> at 
> org.apache.spark.mllib.clustering.DistributedLDAModel.topicsMatrix(LDAModel.scala:523)
>
> at 
> com.mobvoi.knowledgegraph.textmining.lda.ReviewLDA.main(ReviewLDA.java:81)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:606)
>
> at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
>
> at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> 15/12/23 00:01:16 INFO spark.SparkContext: Invoking stop() from shutdown hook

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Newbie Help for spark's not finding native hadoop warning

2015-12-24 Thread Sean Owen
You can safely ignore it. Native libs aren't set with HADOOP_HOME. See
Hadoop docs on how to configure this if you're curious, but you really
don't need to.

On Thu, Dec 24, 2015 at 12:19 PM, Bilinmek Istemiyor
 wrote:
> Hello,
>
> I have apache spark 1.5.1 installed with the help of  this user group. I
> receive following error when I start pyshell
>
> WARN NativeCodeLoader: Unable to load native-hadoop library for your
> platform... using builtin-java classes where applicable
>
> Later I have downloaded native binary from hadoop site and defined
> environment variabla HADOOP_HOME
>
> How can I make spark use hadoop?
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Newbie Help for spark's not finding native hadoop warning

2015-12-24 Thread Jacek Laskowski
Hi,

To add to it, you can read about the native libs in
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/NativeLibraries.html.

Pozdrawiam,
Jacek

Jacek Laskowski | https://medium.com/@jaceklaskowski/
Mastering Apache Spark
==> https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
Follow me at https://twitter.com/jaceklaskowski


On Thu, Dec 24, 2015 at 2:30 PM, Sean Owen  wrote:
> You can safely ignore it. Native libs aren't set with HADOOP_HOME. See
> Hadoop docs on how to configure this if you're curious, but you really
> don't need to.
>
> On Thu, Dec 24, 2015 at 12:19 PM, Bilinmek Istemiyor
>  wrote:
>> Hello,
>>
>> I have apache spark 1.5.1 installed with the help of  this user group. I
>> receive following error when I start pyshell
>>
>> WARN NativeCodeLoader: Unable to load native-hadoop library for your
>> platform... using builtin-java classes where applicable
>>
>> Later I have downloaded native binary from hadoop site and defined
>> environment variabla HADOOP_HOME
>>
>> How can I make spark use hadoop?
>>
>>
>>
>>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark Streaming + Kafka + scala job message read issue

2015-12-24 Thread vivek.meghanathan
We are using the older receiver based approach, the number of partitions is 1 
(we have a single node kafka) and we use single thread per topic still we have 
the problem. Please see the API we use. All 8 spark jobs use same group name – 
is that a problem?

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap  - Number of 
threads used here is 1
val searches = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(line 
=> parse(line._2).extract[Search])


Regards,
Vivek M
From: Bryan [mailto:bryan.jeff...@gmail.com]
Sent: 24 December 2015 17:20
To: Vivek Meghanathan (WT01 - NEP) ; 
user@spark.apache.org
Subject: RE: Spark Streaming + Kafka + scala job message read issue

Are you using a direct stream consumer, or the older receiver based consumer? 
If the latter, do the number of partitions you’ve specified for your topic 
match the number of partitions in the topic on Kafka?

That would be an possible cause – as you might receive all data from a given 
partition while missing data from other partitions.

Regards,

Bryan Jeffrey

Sent from Outlook Mail for 
Windows 10 phone


From: vivek.meghanat...@wipro.com
Sent: Thursday, December 24, 2015 5:22 AM
To: user@spark.apache.org
Subject: Spark Streaming + Kafka + scala job message read issue

Hi All,



We are using Bitnami Kafka 0.8.2 + spark 1.5.2 in Google cloud platform. Our 
spark streaming job(consumer) not receiving all the messages sent to the 
specific topic. It receives 1 out of ~50 messages(added log in the job stream 
and identified). We are not seeing any errors in the kafka logs. Unable to 
debug further from kafka layer. The console consumer shows the INPUT topic is 
received in the console. it is not reaching the spark-kafka integration stream. 
Any thoughts how to debug this issue. Another topic is working fine in same 
setup.

Again tried with spark 1.3.0, kafka 0.8.1.1 which is also has same issue. All 
these jobs are working fine in our local lab servers

Regards,
Vivek M
The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com

The information contained in this electronic message and any attachments to 
this message are intended for the exclusive use of the addressee(s) and may 
contain proprietary, confidential or privileged information. If you are not the 
intended recipient, you should not disseminate, distribute or copy this e-mail. 
Please notify the sender immediately and destroy all copies of this message and 
any attachments. WARNING: Computer viruses can be transmitted via email. The 
recipient should check this email and any attachments for the presence of 
viruses. The company accepts no liability for any damage caused by any virus 
transmitted by this email. www.wipro.com