Re: Problem in Spark Streaming

2014-06-11 Thread nilmish

I used these commands to show the GC timings : -verbose:gc
-XX:-PrintGCDetails -XX:+PrintGCTimeStamps

Following is the output I got on the standard output :

4.092: [GC 4.092: [ParNew: 274752K-27199K(309056K), 0.0421460 secs]
274752K-27199K(995776K), 0.0422720 secs] [Times: user=0.33 sys=0.11,
real=0.04 secs] 

16.630: [GC 16.630: [ParNew: 301951K-17854K(309056K), 0.0686940 secs]
301951K-23624K(995776K), 0.0689110 secs] [Times: user=0.36 sys=0.05,
real=0.07 secs]
 
32.440: [GC 32.441: [ParNew: 292606K-14985K(309056K), 0.0206040 secs]
298376K-20755K(995776K), 0.0208320 secs] [Times: user=0.20 sys=0.00,
real=0.02 secs]
 
42.626: [GC 42.626: [ParNew: 289737K-15467K(309056K), 0.0138100 secs]
295507K-21237K(995776K), 0.0139830 secs] [Times: user=0.10 sys=0.00,
real=0.01 secs]
 
56.633: [GC 56.633: [ParNew: 290219K-17334K(309056K), 0.0170930 secs]
295989K-23105K(995776K), 0.0173130 secs] [Times: user=0.12 sys=0.01,
real=0.02 secs] 

Can anyone help me to understand these messgaes related to GC ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7384.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Problem in Spark Streaming

2014-06-10 Thread nilmish
I am running a spark streaming job to count top 10 hashtags over last 5 mins
window, querying every 1 sec. 

It is taking approx 1.4 sec (end-to-end-delay) to answer most of the query
but there are few instances in between when it takes considerable more
amount of time (like around 15 sec) due to which the response time of
further queries also becomes more.  I am not able to debug the reason for
such spikes in between.  The data rate is nearly constant, so this spike is
not due to sudden increase in the data rate. 

Also is there any way so that I can fix a bound on time taken by a
particular query. Like if a particular query takes more than say 2 sec then
it should kill that query and move on to the next query. So that if a
particular query takes more time then it do not effect future queries.

Thanx,
Nilesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Problem in Spark Streaming

2014-06-10 Thread nilmish
You can measure the latency from the logs. Search for words like Total delay
in the logs. This denotes the total end to end delay for a particular query.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7312.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Problem in Spark Streaming

2014-06-10 Thread nilmish
How can I measure data rate/node ?

I am feeding the data through kafka API. I only know the total inflow data
rate which almost remains constant . How can I figure out what amount of
data is distributed to the nodes in my cluster ? 

Latency does not keep on increasing infinetly. It goes up for some instant
and then it drops down again to the normal level. I want to get away with
these spikes in between. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-in-Spark-Streaming-tp7310p7325.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error related to serialisation in spark streaming

2014-06-05 Thread nilmish
Thanx a lot for your reply. I can see kryo serialiser in the UI. 

I have 1 another query :

I wanted to know the meaning of the following log message when running a
spark streaming job : 

[spark-akka.actor.default-dispatcher-18] INFO 
org.apache.spark.streaming.scheduler.JobScheduler - Total delay: 5.432 s for
time 1401870454500 ms (execution: 0.593 s) 

According to my understanding, total delay here means total end-to-end delay
which is here 5.432 sec. 

What is the meaning of execution : 0.593 ?? 

Is it the time taken for executing this particular query ? 

PS : I am running a streaming job over a window of 5 mins and quering every
1.5 sec. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801p7039.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Problem understanding log message in SparkStreaming

2014-06-04 Thread nilmish
I wanted to know the meaning of the following log message when running a
spark streaming job :

[spark-akka.actor.default-dispatcher-18] INFO 
org.apache.spark.streaming.scheduler.JobScheduler - Total delay: 5.432 s for
time 1401870454500 ms (execution: 0.593 s)

According to my understanding, total delay here means total end-to-end delay
which is here 5.432 sec. 

What is the meaning of execution : 0.593 ??

Is it the time taken for executing this particular query ?

PS : I am running a streaming job over a window of 5 mins and quering every
1.5 sec.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-understanding-log-message-in-SparkStreaming-tp6893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Error related to serialisation in spark streaming

2014-06-04 Thread nilmish
The error is resolved. I was using a comparator which was not serialised
because of which it was throwing the error. 

I have now switched to kryo serializer as it is faster than java serialser.
I have set the required config 

conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer);
conf.set(spark.kryo.registrator, MyRegistrator);

and also in MyRegistrator class I have registered all the classes I am
serialising.

How can I confirm that my code is actually using kryo serialiser and not
java serialiser now ?

PS : It seems like my code is still not using kryo serialiser.  




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801p6904.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Error related to serialisation in spark streaming

2014-06-03 Thread nilmish
I am using the following code segment : 

countPerWindow.foreachRDD(new FunctionJavaPairRDDlt;String, Long, Void()
{
@Override
public Void call(JavaPairRDDString, Long rdd) throws Exception
{

ComparatorTuple2lt;String,Long comp = new
ComparatorTuple2lt;String,Long ()
{

public int compare(Tuple2String,Long tupleA,
   Tuple2String,Long tupleB)
{
return 1-tupleA._2.compareTo(tupleB._2);
}

};

   Listscala.Tuple2lt;String,Long top = rdd.top(5,comp); //
creating error

   System.out.println(Top 5 are : );
for(int i=0;itop.size();++i)
{
System.out.println(top.get(i)._2 +   + top.get(i)._1);
}
return null;
}
});
}




I am getting the following error related to serialisation  : 

org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException

Detailed Error :

 INFO  org.apache.spark.scheduler.DAGScheduler - Failed to run top at
OptimisingSort.java:173
2014-06-03 13:10:57,180 [spark-akka.actor.default-dispatcher-14] ERROR
org.apache.spark.streaming.scheduler.JobScheduler - Error running job
streaming job 1401801057000 ms.2
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: OptimisingSort$6$1
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)

How can I remove this error ?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Error-related-to-serialisation-in-spark-streaming-tp6801.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Selecting first ten values in a RDD/partition

2014-05-30 Thread nilmish
My primary goal : To get top 10 hashtag for every 5 mins interval.

I want to do this efficiently. I have already done this by using
reducebykeyandwindow() and then sorting all hashtag in 5 mins interval
taking only top 10 elements. But this is very slow. 

So I now I am thinking of retaining only top 10 hashtags in each RDD because
these only could come in the final answer. 

I am stuck at : how to retain only top 10 hashtag in each RDD of my DSTREAM
? Basically I need to transform my DTREAM in which each RDD contains only
top 10 hashtags so that number of hashtags in 5 mins interval is low.

If there is some more efficient way of doing this then please let me know
that also.

Thanx,
Nilesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Selecting-first-ten-values-in-a-RDD-partition-tp6517p6577.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Selecting first ten values in a RDD/partition

2014-05-29 Thread nilmish
I have a DSTREAM which consists of RDD partitioned every 2 sec. I have sorted
each RDD and want to retain only top 10 values and discard further value.
How can I retain only top 10 values ?

I am trying to get top 10 hashtags.  Instead of sorting the entire of
5-minute-counts (thereby, incurring the cost of a data shuffle), I am trying
to get the top 10 hashtags in each partition. I am struck at how to retain
top 10 hashtags in each partition.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Selecting-first-ten-values-in-a-RDD-partition-tp6517.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Efficient implementation of getting top 10 hashtags in last 5 mins window

2014-05-16 Thread nilmish
I wanted to know how can we efficiently get top 10 hashtags in last 5 mins
window. Currently I am using reduceByKeyAndWindow over 5 mins window and
then sorting to get top 10 hashtags. But it is taking a lot of time. How can
we do it efficiently ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-implementation-of-getting-top-10-hashtags-in-last-5-mins-window-tp5741.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.