Re: spark metrics question

2016-02-05 Thread Matt K
Yes. And what I'm trying to figure out if there's a way to package the jar
in such a way that I don't have to install it on every Executor node.


On Wed, Feb 3, 2016 at 7:46 PM, Yiannis Gkoufas 
wrote:

> Hi Matt,
>
> does the custom class you want to package reports metrics of each Executor?
>
> Thanks
>
> On 3 February 2016 at 15:56, Matt K  wrote:
>
>> Thanks for sharing Yiannis, looks very promising!
>>
>> Do you know if I can package a custom class with my application, or does
>> it have to be pre-deployed on all Executor nodes?
>>
>> On Wed, Feb 3, 2016 at 10:36 AM, Yiannis Gkoufas 
>> wrote:
>>
>>> Hi Matt,
>>>
>>> there is some related work I recently did in IBM Research for
>>> visualizing the metrics produced.
>>> You can read about it here
>>> http://www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
>>> We recently opensourced it if you are interested to have a deeper look
>>> to it: https://github.com/ibm-research-ireland/sparkoscope
>>>
>>> Thanks,
>>> Yiannis
>>>
>>> On 3 February 2016 at 13:32, Matt K  wrote:
>>>
 Hi guys,

 I'm looking to create a custom sync based on Spark's Metrics System:

 https://github.com/apache/spark/blob/9f603fce78fcc997926e9a72dec44d48cbc396fc/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala

 If I want to collect metrics from the Driver, Master, and Executor
 nodes, should the jar with the custom class be installed on Driver, Master,
 and Executor nodes?

 Also, on Executor nodes, does the MetricsSystem run inside the
 Executor's JVM?

 Thanks,
 -Matt

>>>
>>>
>>
>>
>> --
>> www.calcmachine.com - easy online calculator.
>>
>
>


-- 
www.calcmachine.com - easy online calculator.


Re: Kafka directsream receiving rate

2016-02-05 Thread Cody Koeninger
If you're using the direct stream, you have 0 receivers.  Do you mean you
have 1 executor?

Can you post the relevant call to createDirectStream from your code, as
well as any relevant spark configuration?

On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> Adding more info
>
> Batch  interval  is  2000ms.
> I expect all 100 messages  go thru one  dstream from  directsream but it
> receives at rate of 10 messages at time. Am  I missing  some
>  configurations here. Any help appreciated.
>
> Regards
> Diwakar.
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: Diwakar Dhanuskodi 
> Date:05/02/2016 07:33 (GMT+05:30)
> To: user@spark.apache.org
> Cc:
> Subject: Kafka directsream receiving rate
>
> Hi,
> Using spark 1.5.1.
> I have a topic with 20 partitions.  When I publish 100 messages. Spark
> direct stream is receiving 10 messages per  dstream. I have  only  one
>  receiver . When I used createStream the  receiver  received  entire 100
> messages  at once.
>
> Appreciate  any  help .
>
> Regards
> Diwakar
>
>
> Sent from Samsung Mobile.
>


RE: Can't view executor logs in web UI on Windows

2016-02-05 Thread Mark Pavey
We have created JIRA ticket
https://issues.apache.org/jira/browse/SPARK-13142 and will submit a pull
request next week.

Mark


-Original Message-
From: Ted Yu [mailto:yuzhih...@gmail.com] 
Sent: 01 February 2016 14:24
To: Mark Pavey
Cc: user@spark.apache.org
Subject: Re: Can't view executor logs in web UI on Windows

I did a brief search but didn't find relevant JIRA either. 

You can create a JIRA and submit pull request for the fix. 

Cheers

> On Feb 1, 2016, at 5:13 AM, Mark Pavey  wrote:
> 
> I am running Spark on Windows. When I try to view the Executor logs in 
> the UI I get the following error:
> 
> HTTP ERROR 500
> 
> Problem accessing /logPage/. Reason:
> 
>Server Error
> Caused by:
> 
> java.net.URISyntaxException: Illegal character in path at index 1:
> .\work/app-20160129154716-0038/2/
>at java.net.URI$Parser.fail(Unknown Source)
>at java.net.URI$Parser.checkChars(Unknown Source)
>at java.net.URI$Parser.parseHierarchical(Unknown Source)
>at java.net.URI$Parser.parse(Unknown Source)
>at java.net.URI.(Unknown Source)
>at org.apache.spark.deploy.worker.ui.LogPage.getLog(LogPage.scala:141)
>at org.apache.spark.deploy.worker.ui.LogPage.render(LogPage.scala:78)
>at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
>at org.apache.spark.ui.WebUI$$anonfun$2.apply(WebUI.scala:79)
>at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:69)
>at javax.servlet.http.HttpServlet.service(HttpServlet.java:735)
>at javax.servlet.http.HttpServlet.service(HttpServlet.java:848)
>at
>
org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
>at
>
org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:
501)
>at
>
org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandle
r.java:1086)
>at
>
org.spark-project.jetty.servlet.ServletHandler.doScope(ServletHandler.java:4
28)
>at
>
org.spark-project.jetty.server.handler.ContextHandler.doScope(ContextHandler
.java:1020)
>at
>
org.spark-project.jetty.server.handler.ScopedHandler.handle(ScopedHandler.ja
va:135)
>at
>
org.spark-project.jetty.server.handler.GzipHandler.handle(GzipHandler.java:2
64)
>at
>
org.spark-project.jetty.server.handler.ContextHandlerCollection.handle(Conte
xtHandlerCollection.java:255)
>at
>
org.spark-project.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.
java:116)
>at org.spark-project.jetty.server.Server.handle(Server.java:370)
>at
>
org.spark-project.jetty.server.AbstractHttpConnection.handleRequest(Abstract
HttpConnection.java:494)
>at
>
org.spark-project.jetty.server.AbstractHttpConnection.headerComplete(Abstrac
tHttpConnection.java:971)
>at
>
org.spark-project.jetty.server.AbstractHttpConnection$RequestHandler.headerC
omplete(AbstractHttpConnection.java:1033)
>at
org.spark-project.jetty.http.HttpParser.parseNext(HttpParser.java:644)
>at
>
org.spark-project.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
>at
>
org.spark-project.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnectio
n.java:82)
>at
>
org.spark-project.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEnd
Point.java:667)
>at
>
org.spark-project.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndP
oint.java:52)
>at
>
org.spark-project.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool
.java:608)
>at
>
org.spark-project.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.
java:543)
>at java.lang.Thread.run(Unknown Source)
> 
> 
> 
> Looking at the source code for
> org.apache.spark.deploy.worker.ui.LogPage.getLog reveals the following:
> - At line 141 the constructor of java.net.URI is called with the path 
> to the log directory as a String argument. This string 
> (".\work/app-20160129154716-0038/2/" in example above) contains a 
> backslash, which is an illegal character for the URI constructor.
> - The component of the path containing the backslash is created at 
> line 71 by calling the getPath method on a java.io.File object. 
> Because it is running on Windows it uses the default Windows file 
> separator, which is a backslash.
> 
> I am using Spark 1.5.1 but the source code appears unchanged in 1.6.0.
> 
> I haven't been able to find an open issue for this but if there is one 
> could possibly submit a pull request for it.
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-t-view-executo
> r-logs-in-web-UI-on-Windows-tp26122.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For 
> additional commands, e-mail: user-h...@spark.apache.org
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org

Re: Driver not able to restart the job automatically after the application of Streaming with Kafka Direct went down

2016-02-05 Thread swetha kasireddy
Following is the error that I see when it retries.


org.apache.spark.SparkException: Failed to read checkpoint from directory
/share/checkpointDir

at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:342)

at
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:866)

at
com.walmart.platform.exp.reporting.streaming.ExpoStreamingJob$.main(ExpoStreamingJob.scala:35)

at
com.walmart.platform.exp.reporting.streaming.ExpoStreamingJob.main(ExpoStreamingJob.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)

at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)

Caused by: java.io.IOException: java.lang.ClassCastException: cannot assign
instance of scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.streaming.dstream.TransformedDStream.parents of type
scala.collection.Seq in instance of
org.apache.spark.streaming.dstream.TransformedDStream

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)

at
org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:188)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at
org.apache.spark.streaming.Checkpoint$$anonfun$deserialize$2.apply(Checkpoint.scala:151)

at
org.apache.spark.streaming.Checkpoint$$anonfun$deserialize$2.apply(Checkpoint.scala:141)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)

at org.apache.spark.streaming.Checkpoint$.deserialize(Checkpoint.scala:154)

at
org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:329)

at
org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:325)

at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)

at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:325)

... 9 more

Caused by: java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.streaming.dstream.TransformedDStream.parents of type
scala.collection.Seq in instance of
org.apache.spark.streaming.dstream.TransformedDStream

at
java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2089)

at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

at
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)

at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)

at 

Re: kafkaDirectStream usage error

2016-02-05 Thread Cody Koeninger
2 things:

- you're only attempting to read from a single TopicAndPartition.  Since
your topic has multiple partitions, this probably isn't what you want

- you're getting an offset out of range exception because the offset you're
asking for doesn't exist in kafka.

Use the other createDirectStream method (the one that takes a set of
topics, not a map of topicpartitions to offset), at least until you get an
understanding for how things work.



On Thu, Feb 4, 2016 at 7:58 PM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> I am  using  below  directsream to consume  messages from kafka . Topic
> has 8 partitions.
>
>  val topicAndPart =  OffsetRange.create("request5",0,
> 1,10).topicAndPartition()
> val fromOffsets =
> Map[kafka.common.TopicAndPartition,Long](topicAndPart->0)
> val messageHandler = (mmd : MessageAndMetadata[String,String]) =>
> (mmd.key(),mmd.message())
> val k1 =
> KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
> kafkaParams, fromOffsets,messageHandler)
>
> I am  getting  below  error . Any  idea  where  I am doing  wrong  .
> Please  help .
>
> 6/02/04 21:04:38 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 0.0
> (TID 2, datanode3.isdp.com): UnknownReason
> 16/02/04 21:04:38 INFO scheduler.TaskSetManager: Starting task 0.2 in
> stage 0.0 (TID 3, datanode3.isdp.com, RACK_LOCAL, 2273 bytes)
> 16/02/04 21:04:38 WARN spark.ThrowableSerializationWrapper: Task exception
> could not be deserialized
> java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:278)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:167)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> 

Re: Kafka directsream receiving rate

2016-02-05 Thread Cody Koeninger
How are you counting the number of messages?

I'd go ahead and remove the settings for backpressure and
maxrateperpartition, just to eliminate that as a variable.

On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> I am  using  one  directsream. Below  is  the  call  to directsream:-
>
> val topicSet = topics.split(",").toSet
> val kafkaParams = Map[String,String]("bootstrap.servers" -> "
> datanode4.isdp.com:9092")
> val k =
> KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,
> kafkaParams, topicSet)
>
> When  I replace   DirectStream call  to  createStream,  all  messages were
>  read  by  one  Dstream block.:-
> val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap
> ,StorageLevel.MEMORY_ONLY)
>
> I am  using   below  spark-submit to execute:
> ./spark-submit --master yarn-client --conf
> "spark.dynamicAllocation.enabled=true" --conf
> "spark.shuffle.service.enabled=true" --conf
> "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf
> "spark.sql.unsafe.enabled=false" --conf
> "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s"
> --conf "spark.shuffle.consolidateFiles=true"   --conf
> "spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g
> --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files
> /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
> --jars
> /root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
> /root/Jars/sparkreceiver.jar
>
>
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: Cody Koeninger 
> Date:05/02/2016 22:07 (GMT+05:30)
> To: Diwakar Dhanuskodi 
> Cc: user@spark.apache.org
> Subject: Re: Kafka directsream receiving rate
>
> If you're using the direct stream, you have 0 receivers.  Do you mean you
> have 1 executor?
>
> Can you post the relevant call to createDirectStream from your code, as
> well as any relevant spark configuration?
>
> On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
>
>> Adding more info
>>
>> Batch  interval  is  2000ms.
>> I expect all 100 messages  go thru one  dstream from  directsream but it
>> receives at rate of 10 messages at time. Am  I missing  some
>>  configurations here. Any help appreciated.
>>
>> Regards
>> Diwakar.
>>
>>
>> Sent from Samsung Mobile.
>>
>>
>>  Original message 
>> From: Diwakar Dhanuskodi 
>> Date:05/02/2016 07:33 (GMT+05:30)
>> To: user@spark.apache.org
>> Cc:
>> Subject: Kafka directsream receiving rate
>>
>> Hi,
>> Using spark 1.5.1.
>> I have a topic with 20 partitions.  When I publish 100 messages. Spark
>> direct stream is receiving 10 messages per  dstream. I have  only  one
>>  receiver . When I used createStream the  receiver  received  entire 100
>> messages  at once.
>>
>> Appreciate  any  help .
>>
>> Regards
>> Diwakar
>>
>>
>> Sent from Samsung Mobile.
>>
>
>


Please Add Our Meetup to the Spark Meetup List

2016-02-05 Thread Timothy Spann
Our meetup is

NJ Data Science - Apache Spark
http://www.meetup.com/nj-datascience
Princeton, NJ


Past Meetups:

Spark Streaming by 
Prasad Sripathi, airisDATA
August 13, 2015

ELK Stack and Spark
June 30, 2015

Spark Hands-on Intro 
workshop by Kristina 
Rogale Plazonic, airisDATA
June 18, 2015

Graph Analytics – Titan and 
Cassandra by Isaac 
Rieksts  (Slides)
June 4, 2015

Machine Learning 
Fundamentals  by 
SriSatish Ambati, H2O ML Platform
May 14, 2015  
Slides


—
airis.DATA
Timothy Spann, Senior Solutions Engineer
C: 609-250-5894



Re: spark metrics question

2016-02-05 Thread Takeshi Yamamuro
How about using `spark.jars` to send jars into a cluster?

On Sat, Feb 6, 2016 at 12:00 AM, Matt K  wrote:

> Yes. And what I'm trying to figure out if there's a way to package the jar
> in such a way that I don't have to install it on every Executor node.
>
>
> On Wed, Feb 3, 2016 at 7:46 PM, Yiannis Gkoufas 
> wrote:
>
>> Hi Matt,
>>
>> does the custom class you want to package reports metrics of each
>> Executor?
>>
>> Thanks
>>
>> On 3 February 2016 at 15:56, Matt K  wrote:
>>
>>> Thanks for sharing Yiannis, looks very promising!
>>>
>>> Do you know if I can package a custom class with my application, or does
>>> it have to be pre-deployed on all Executor nodes?
>>>
>>> On Wed, Feb 3, 2016 at 10:36 AM, Yiannis Gkoufas 
>>> wrote:
>>>
 Hi Matt,

 there is some related work I recently did in IBM Research for
 visualizing the metrics produced.
 You can read about it here
 http://www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
 We recently opensourced it if you are interested to have a deeper look
 to it: https://github.com/ibm-research-ireland/sparkoscope

 Thanks,
 Yiannis

 On 3 February 2016 at 13:32, Matt K  wrote:

> Hi guys,
>
> I'm looking to create a custom sync based on Spark's Metrics System:
>
> https://github.com/apache/spark/blob/9f603fce78fcc997926e9a72dec44d48cbc396fc/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
>
> If I want to collect metrics from the Driver, Master, and Executor
> nodes, should the jar with the custom class be installed on Driver, 
> Master,
> and Executor nodes?
>
> Also, on Executor nodes, does the MetricsSystem run inside the
> Executor's JVM?
>
> Thanks,
> -Matt
>


>>>
>>>
>>> --
>>> www.calcmachine.com - easy online calculator.
>>>
>>
>>
>
>
> --
> www.calcmachine.com - easy online calculator.
>



-- 
---
Takeshi Yamamuro


Re: Too many open files, why changing ulimit not effecting?

2016-02-05 Thread Nirav Patel
For centos there's also /etc/security/limits.d/90-nproc.conf  that may need
modifications.

Services that you expect to use new limits needs to be restarted. Simple
thing to do is to reboot the machine.

On Fri, Feb 5, 2016 at 3:59 AM, Ted Yu  wrote:

> bq. and *"session required pam_limits.so"*.
>
> What was the second file you modified ?
>
> Did you make the change on all the nodes ?
>
> Please see the verification step in
> https://easyengine.io/tutorials/linux/increase-open-files-limit/
>
> On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI 
> wrote:
>
>> Hello all,
>>
>> I'm getting the famous *java.io.FileNotFoundException: ... (Too many
>> open files) *exception. What seemed to have helped people out, it
>> haven't for me. I tried to set the ulimit via the command line *"ulimit
>> -n"*, then I tried to add the following lines to
>> *"/etc/security/limits.conf"* file:
>>
>> ** - nofile 100*
>> *root soft nofile 100*
>> *root hard nofile 100*
>> *hduser soft nofile 100*
>> *hduser hard nofile 100*
>>
>> ...then I added this line *"session required pam_limits.so"* to the two
>> files* "/etc/pam.d/common-session"* and *"session required
>> pam_limits.so"*. The I logged-out/logged-in. First, I tried only the
>> first line (** - nofile 100**)*, then added the 2nd and the 3rd
>> (root...),  then added the last two lines (hduser...), no effect. Weirdly
>> enough, when I check with the command *"ulimit -n"* it returns the
>> correct value of 100.
>>
>> I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master and
>> in each of my workers, no effect.
>>
>> What else could it be besides changing the ulimit setting? if it's only
>> that, what could cause Spark to ignore it?
>>
>> I'll appreciate any help in advance.
>>
>> --
>> *PhD Student - EIS Group - Bonn University, Germany.*
>> *+49 1575 8482232 <%2B49%201575%208482232>*
>>
>>
>

-- 


[image: What's New with Xactly] 

  [image: LinkedIn] 
  [image: Twitter] 
  [image: Facebook] 
  [image: YouTube] 



Spark process failing to receive data from the Kafka queue in yarn-client mode.

2016-02-05 Thread Rachana Srivastava
I am trying to run following code using yarn-client mode in but getting slow 
readprocessor error mentioned below but the code works just fine in the local 
mode.  Any pointer is really appreciated.

Line of code to receive data from the Kafka Queue:
JavaPairReceiverInputDStream messages =  
KafkaUtils.createStream(jssc, String.class, String.class, StringDecoder.class, 
StringDecoder.class, kafkaParams, kafkaTopicMap, StorageLevel.MEMORY_ONLY());

JavaDStream lines = messages.map(new Function, 
String>() {
  public String call(Tuple2 tuple2) {
  LOG.info("  Input json stream 
data  " +  tuple2._2);
return tuple2._2();
  }
});


Error Details:
016-02-05 11:44:00 WARN DFSClient:975 - Slow ReadProcessor read fields took 30
011ms (threshold=3ms); ack: seqno: 1960 reply: 0 reply: 0 reply: 0 downstrea
mAckTimeNanos: 1227280, targets: [DatanodeInfoWithStorage[10.0.0.245:50010,DS-a5
5d9212-3771-4936-bbe7-02035e7de148,DISK], DatanodeInfoWithStorage[10.0.0.243:500
10,DS-231b9915-c2e2-4392-b075-8a52ba1820ac,DISK], DatanodeInfoWithStorage[10.0.0
.244:50010,DS-6b8b5814-7dd7-4315-847c-b73bd375af0e,DISK]]
2016-02-05 11:44:00 INFO BlockManager:59 - Removing RDD 1954
2016-02-05 11:44:00 INFO MapPartitionsRDD:59 - Removing RDD 1955 from persisten


Re: How to edit/delete a message posted in Apache Spark User List?

2016-02-05 Thread Luciano Resende
Please see

http://www.apache.org/foundation/public-archives.html

On Fri, Feb 5, 2016 at 9:35 AM, SRK  wrote:

> Hi,
>
> How do I edit/delete a message posted in Apache Spark User List?
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-edit-delete-a-message-posted-in-Apache-Spark-User-List-tp26160.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: Help needed in deleting a message posted in Spark User List

2016-02-05 Thread Marcelo Vanzin
You don't... just send a new one.

On Fri, Feb 5, 2016 at 9:33 AM, swetha kasireddy
 wrote:
> Hi,
>
> I want to edit/delete a message posted in Spark User List. How do I do that?
>
> Thanks!



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to edit/delete a message posted in Apache Spark User List?

2016-02-05 Thread SRK
Hi,

How do I edit/delete a message posted in Apache Spark User List?

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-edit-delete-a-message-posted-in-Apache-Spark-User-List-tp26160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Help needed in deleting a message posted in Spark User List

2016-02-05 Thread swetha kasireddy
Hi,

I want to edit/delete a message posted in Spark User List. How do I do that?

Thanks!


What is the best way to JOIN two 10TB csv files and three 100kb files on Spark?

2016-02-05 Thread Rex X
Dear all,

The new DataFrame of spark is extremely fast. But out cluster have limited
RAM (~500GB).

What is the best way to do such a big table Join?

Any sample code is greatly welcome!


Best,
Rex


Re: Kafka directsream receiving rate

2016-02-05 Thread Diwakar Dhanuskodi
I am  using  one  directsream. Below  is  the  call  to directsream:-

val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)

When  I replace   DirectStream call  to  createStream,  all  messages were  
read  by  one  Dstream block.:-
val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap 
,StorageLevel.MEMORY_ONLY)

I am  using   below  spark-submit to execute:
./spark-submit --master yarn-client --conf 
"spark.dynamicAllocation.enabled=true" --conf 
"spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" 
--conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf 
"spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" 
--conf "spark.shuffle.consolidateFiles=true"   --conf 
"spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g 
--executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files 
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
 --jars 
/root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
 /root/Jars/sparkreceiver.jar




Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
 Date:05/02/2016  22:07  (GMT+05:30) 
To: Diwakar Dhanuskodi  Cc: 
user@spark.apache.org Subject: Re: Kafka directsream receiving rate 

If you're using the direct stream, you have 0 receivers.  Do you mean you 
have 1 executor?

Can you post the relevant call to createDirectStream from your code, as well as 
any relevant spark configuration?

On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi 
 wrote:
Adding more info

Batch  interval  is  2000ms.
I expect all 100 messages  go thru one  dstream from  directsream but it 
receives at rate of 10 messages at time. Am  I missing  some  configurations 
here. Any help appreciated. 

Regards 
Diwakar.


Sent from Samsung Mobile.


 Original message 
From: Diwakar Dhanuskodi 
Date:05/02/2016 07:33 (GMT+05:30)
To: user@spark.apache.org
Cc:
Subject: Kafka directsream receiving rate

Hi,
Using spark 1.5.1.
I have a topic with 20 partitions.  When I publish 100 messages. Spark direct 
stream is receiving 10 messages per  dstream. I have  only  one  receiver . 
When I used createStream the  receiver  received  entire 100 messages  at once. 
 

Appreciate  any  help .

Regards 
Diwakar


Sent from Samsung Mobile.



Re: Hadoop credentials missing in some tasks?

2016-02-05 Thread Peter Vandenabeele
On Fri, Feb 5, 2016 at 12:58 PM, Gerard Maas  wrote:

> Hi,
>
> We're facing a situation where simple queries to parquet files stored in
> Swift through a Hive Metastore sometimes fail with this exception:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 6
> in stage 58.0 failed 4 times, most recent failure: Lost task 6.3 in stage
> 58.0 (TID 412, agent-1.mesos.private):
> org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Missing
> mandatory configuration option: fs.swift.service.##.auth.url
> at 
> org.apache.hadoop.fs.swift.http.RestClientBindings.copy(RestClientBindings.java:219)
> (...)
>
> Queries requiring a full table scan, like select(count(*)) would fail with
> the mentioned exception while smaller chunks of work like " select *
>  from... LIMIT 5" would succeed.
>

...

An update:

When using the Zeppelin Notebook on a Mesos cluster, as a _workaround_ I
can get the Notebook running
reliably when using this setting and starting with this paragraph:

* spark.mesos.coarse = true

|| import util.Random.nextInt
|| sc.parallelize((0 to 1000).toList,
20).toDF.write.parquet(s"swift://###/test/${util.Random.nextInt}"

This parquet write will touch all the executors (4 worker nodes in this
experiment).

So, it looks like _writing_ once, at the start of the Notebook will
distribute the swift authentication
data to the executors and after that, alle queries just work (including the
count(*) queries that failed
before).

This is using a Zeppelin notebook with Spark 1.5.1 with Hadoop 2.4.

HTH,

Peter


Re: What is the best way to JOIN two 10TB csv files and three 100kb files on Spark?

2016-02-05 Thread Takeshi Yamamuro
Hi,

How about using broadcast joins?
largeDf.join(broadcast(smallDf), "joinKey")

On Sat, Feb 6, 2016 at 2:25 AM, Rex X  wrote:

> Dear all,
>
> The new DataFrame of spark is extremely fast. But out cluster have limited
> RAM (~500GB).
>
> What is the best way to do such a big table Join?
>
> Any sample code is greatly welcome!
>
>
> Best,
> Rex
>
>


-- 
---
Takeshi Yamamuro


Re: pyspark - spark history server

2016-02-05 Thread cs user
Hi Folks,

So the fix for me was to copy this file on the nodes built with Ambari:

/usr/hdp/2.3.4.0-3485/spark/lib/spark-assembly-1.5.2.2.3.4.0-3485-hadoop2.7.1.2.3.4.0-3485.jar

To this file on the client machine, external to the cluster:

/opt/spark/lib/spark-assembly-1.5.2-hadoop2.6.0.jar

I tried this after reading:

https://mail-archives.apache.org/mod_mbox/spark-user/201503.mbox/%3ccaaonq7v7cq4hqr2p9ez5ojucmyc+mo2ggh068cwh+qwt6sx...@mail.gmail.com%3E

So I assume this is a custom built jar which is not part of the official
distribution. Just wanted to post in case this helps another person.

Thanks!


On Fri, Feb 5, 2016 at 2:08 PM, cs user  wrote:

> Hi All,
>
> I'm having trouble getting a job to use the spark history server. We have
> a cluster configured with Ambari, if I run the job from one of the nodes
> within the Ambari configured cluster, everything works fine, the job
> appears in the spark history server.
>
> If I configure a client external to the cluster, running the same job, the
> history server is not used.
>
> When the job completes fine, I see these lines appear in the log:
>
>
> 16/02/05 11:57:22 INFO history.YarnHistoryService: Starting
> YarnHistoryService for application application_1453893909110_0108 attempt
> Some(appattempt_1453893909110_0108_01); state=1; endpoint=
> http://somehost:8188/ws/v1/timeline/; bonded to ATS=false;
> listening=false; batchSize=10; flush count=0; total number queued=0,
> processed=0; attempted entity posts=0 successful entity posts=0 failed
> entity posts=0; events dropped=0; app start event received=false; app end
> event received=false;
> 16/02/05 11:57:22 INFO history.YarnHistoryService: Spark events will be
> published to the Timeline service at http://somehost:8188/ws/v1/timeline/
>
>
> On the client which is external to the cluster, these lines do not appear
> in the logs. I have printed out spark context and attempted to match what
> is configured on the working job, with the failing job, all seems fine.
>
> These are the job settings:
>
> conf.set('spark.speculation','true')
> conf.set('spark.dynamicAllocation.enabled','false')
> conf.set('spark.shuffle.service.enabled','false')
> conf.set('spark.executor.instances', '4')
> conf.set('spark.akka.threads','4')
> conf.set('spark.dynamicAllocation.initialExecutors','4')
>
> conf.set('spark.history.provider','org.apache.spark.deploy.yarn.history.YarnHistoryProvider')
>
> conf.set('spark.yarn.services','org.apache.spark.deploy.yarn.history.YarnHistoryService')
> conf.set('spark.history.ui.port','18080')
> conf.set('spark.driver.extraJavaOptions','-Dhdp.version=2.3.4.0-3485')
> conf.set('spark.yarn.containerLauncherMaxThreads','25')
> conf.set('spark.yarn.driver.memoryOverhead','384')
> conf.set('spark.yarn.executor.memoryOverhead','384')
> conf.set('spark.yarn.historyServer.address','somehost:18080')
> conf.set('spark.yarn.max.executor.failures','3')
> conf.set('spark.yarn.preserve.staging.files','false')
> conf.set('spark.yarn.queue','default')
> conf.set('spark.yarn.scheduler.heartbeat.interval-ms','5000')
> conf.set('spark.yarn.submit.file.replication','3')
> conf.set('spark.yarn.am.extraJavaOptions','-Dhdp.version=2.3.4.0-3485')
> conf.set('spark.blockManager.port','9096')
> conf.set('spark.driver.port','9095')
> conf.set('spark.fileserver.port','9097')
>
> I am using the following tar.gz file to install spark on the node external
> to the cluster:
>
>
> http://www.apache.org/dyn/closer.lua/spark/spark-1.5.2/spark-1.5.2-bin-hadoop2.6.tgz
>
> Will this version of spark have everything required to talk correctly to
> yarn and the spark history service?
>
> So it comes down to, the spark context settings appear to be exactly the
> same, there are no errors in the logs pointing to the job not being able to
> connect to anything, none of the ports are blocked, why is this not working
> when run external to the cluster?
>
> There is no kerberos security configured on the cluster.
>
> Thanks!
>
>
>
>
>
>
>
>


Failed to remove broadcast 2 with removeFromMaster = true in Graphx

2016-02-05 Thread Zhang, Jingyu
I running a Pregel function with 37 nodes in EMR hadoop. After a hour
logs show following. Can anyone please tell what the problem is and
how do I solve it? Thanks

16/02/05 14:02:46 WARN BlockManagerMaster: Failed to remove broadcast
2 with removeFromMaster = true - Cannot receive any reply in 120
seconds. This timeout is controlled by spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in
120 seconds. This timeout is controlled by spark.rpc.askTimeout
at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
at 
scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at 
scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at 
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643)
at 
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658)
at 
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
at 
scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635)
at 
scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at 
scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634)
at 
scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694)
at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any
reply in 120 seconds
at 
org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242)
... 7 more
16/02/05 14:02:46 ERROR ContextCleaner: Error cleaning broadcast 2
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in
120 seconds. This timeout is controlled by spark.rpc.askTimeout
at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)

Re: Spark 1.6.0 HiveContext NPE

2016-02-05 Thread Ted Yu
Was there any other exception(s) in the client log ?

Just want to find the cause for this NPE.

Thanks

On Wed, Feb 3, 2016 at 8:33 AM, Shipper, Jay [USA] 
wrote:

> I’m upgrading an application from Spark 1.4.1 to Spark 1.6.0, and I’m
> getting a NullPointerException from HiveContext.  It’s happening while it
> tries to load some tables via JDBC from an external database (not Hive),
> using context.read().jdbc():
>
> —
> java.lang.NullPointerException
> at
> org.apache.spark.sql.hive.client.ClientWrapper.conf(ClientWrapper.scala:205)
> at
> org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:552)
> at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:551)
> at
> org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:538)
> at
> org.apache.spark.sql.hive.HiveContext$$anonfun$configure$1.apply(HiveContext.scala:537)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.spark.sql.hive.HiveContext.configure(HiveContext.scala:537)
> at
> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:250)
> at
> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:237)
> at
> org.apache.spark.sql.hive.HiveContext$$anon$2.(HiveContext.scala:457)
> at
> org.apache.spark.sql.hive.HiveContext.catalog$lzycompute(HiveContext.scala:457)
> at org.apache.spark.sql.hive.HiveContext.catalog(HiveContext.scala:456)
> at
> org.apache.spark.sql.hive.HiveContext$$anon$3.(HiveContext.scala:473)
> at
> org.apache.spark.sql.hive.HiveContext.analyzer$lzycompute(HiveContext.scala:473)
> at org.apache.spark.sql.hive.HiveContext.analyzer(HiveContext.scala:472)
> at
> org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
> at org.apache.spark.sql.DataFrame.(DataFrame.scala:133)
> at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
> at
> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:442)
> at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:223)
> at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:146)
> —
>
> Even though the application is not using Hive, HiveContext is used instead
> of SQLContext, for the additional functionality it provides.  There’s no
> hive-site.xml for the application, but this did not cause an issue for
> Spark 1.4.1.
>
> Does anyone have an idea about what’s changed from 1.4.1 to 1.6.0 that
> could explain this NPE?  The only obvious change I’ve noticed for
> HiveContext is that the default warehouse location is different (1.4.1 -
> current directory, 1.6.0 - /user/hive/warehouse), but I verified that this
> NPE happens even when /user/hive/warehouse exists and is readable/writeable
> for the application.  In terms of changes to the application to work with
> Spark 1.6.0, the only one that might be relevant to this issue is the
> upgrade in the Hadoop dependencies to match what Spark 1.6.0 uses
> (2.6.0-cdh5.7.0-SNAPSHOT).
>
> Thanks,
> Jay
>


Re: Kafka directsream receiving rate

2016-02-05 Thread Diwakar Dhanuskodi
I am  able  to  see  no of  messages processed  per  event  in  sparkstreaming 
web UI . Also  I am  counting  the  messages inside  foreachRDD .
Removed  the  settings for  backpressure but still  the  same .





Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
 Date:06/02/2016  00:33  (GMT+05:30) 
To: Diwakar Dhanuskodi  Cc: 
user@spark.apache.org Subject: Re: Kafka directsream receiving rate 

How are you counting the number of messages?

I'd go ahead and remove the settings for backpressure and maxrateperpartition, 
just to eliminate that as a variable.

On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi 
 wrote:
I am  using  one  directsream. Below  is  the  call  to directsream:-

val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)

When  I replace   DirectStream call  to  createStream,  all  messages were  
read  by  one  Dstream block.:-
val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap 
,StorageLevel.MEMORY_ONLY)

I am  using   below  spark-submit to execute:
./spark-submit --master yarn-client --conf 
"spark.dynamicAllocation.enabled=true" --conf 
"spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" 
--conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf 
"spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" 
--conf "spark.shuffle.consolidateFiles=true"   --conf 
"spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g 
--executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files 
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
 --jars 
/root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
 /root/Jars/sparkreceiver.jar




Sent from Samsung Mobile.


 Original message 
From: Cody Koeninger 
Date:05/02/2016 22:07 (GMT+05:30)
To: Diwakar Dhanuskodi 
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

If you're using the direct stream, you have 0 receivers.  Do you mean you have 
1 executor?

Can you post the relevant call to createDirectStream from your code, as well as 
any relevant spark configuration?

On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi 
 wrote:
Adding more info

Batch  interval  is  2000ms.
I expect all 100 messages  go thru one  dstream from  directsream but it 
receives at rate of 10 messages at time. Am  I missing  some  configurations 
here. Any help appreciated. 

Regards 
Diwakar.


Sent from Samsung Mobile.


 Original message 
From: Diwakar Dhanuskodi 
Date:05/02/2016 07:33 (GMT+05:30)
To: user@spark.apache.org
Cc:
Subject: Kafka directsream receiving rate

Hi,
Using spark 1.5.1.
I have a topic with 20 partitions.  When I publish 100 messages. Spark direct 
stream is receiving 10 messages per  dstream. I have  only  one  receiver . 
When I used createStream the  receiver  received  entire 100 messages  at once. 
 

Appreciate  any  help .

Regards 
Diwakar


Sent from Samsung Mobile.




Re: Unit test with sqlContext

2016-02-05 Thread Steve Annessa
Thanks for all of the responses.

I do have an afterAll that stops the sc.

While looking over Holden's readme I noticed she mentioned "Make sure to
disable parallel execution." That was what I was missing; I added the
follow to my build.sbt:

```
parallelExecution in Test := false
```

Now all of my tests are running.

I'm going to look into using the package she created.

Thanks again,

-- Steve


On Thu, Feb 4, 2016 at 8:50 PM, Rishi Mishra  wrote:

> Hi Steve,
> Have you cleaned up your SparkContext ( sc.stop())  , in a afterAll(). The
> error suggests you are creating more than one SparkContext.
>
>
> On Fri, Feb 5, 2016 at 10:04 AM, Holden Karau 
> wrote:
>
>> Thanks for recommending spark-testing-base :) Just wanted to add if
>> anyone has feature requests for Spark testing please get in touch (or add
>> an issue on the github) :)
>>
>>
>> On Thu, Feb 4, 2016 at 8:25 PM, Silvio Fiorito <
>> silvio.fior...@granturing.com> wrote:
>>
>>> Hi Steve,
>>>
>>> Have you looked at the spark-testing-base package by Holden? It’s really
>>> useful for unit testing Spark apps as it handles all the bootstrapping for
>>> you.
>>>
>>> https://github.com/holdenk/spark-testing-base
>>>
>>> DataFrame examples are here:
>>> https://github.com/holdenk/spark-testing-base/blob/master/src/test/1.3/scala/com/holdenkarau/spark/testing/SampleDataFrameTest.scala
>>>
>>> Thanks,
>>> Silvio
>>>
>>> From: Steve Annessa 
>>> Date: Thursday, February 4, 2016 at 8:36 PM
>>> To: "user@spark.apache.org" 
>>> Subject: Unit test with sqlContext
>>>
>>> I'm trying to unit test a function that reads in a JSON file,
>>> manipulates the DF and then returns a Scala Map.
>>>
>>> The function has signature:
>>> def ingest(dataLocation: String, sc: SparkContext, sqlContext:
>>> SQLContext)
>>>
>>> I've created a bootstrap spec for spark jobs that instantiates the Spark
>>> Context and SQLContext like so:
>>>
>>> @transient var sc: SparkContext = _
>>> @transient var sqlContext: SQLContext = _
>>>
>>> override def beforeAll = {
>>>   System.clearProperty("spark.driver.port")
>>>   System.clearProperty("spark.hostPort")
>>>
>>>   val conf = new SparkConf()
>>> .setMaster(master)
>>> .setAppName(appName)
>>>
>>>   sc = new SparkContext(conf)
>>>   sqlContext = new SQLContext(sc)
>>> }
>>>
>>> When I do not include sqlContext, my tests run. Once I add the
>>> sqlContext I get the following errors:
>>>
>>> 16/02/04 17:31:58 WARN SparkContext: Another SparkContext is being
>>> constructed (or threw an exception in its constructor).  This may indicate
>>> an error, since only one SparkContext may be running in this JVM (see
>>> SPARK-2243). The other SparkContext was created at:
>>> org.apache.spark.SparkContext.(SparkContext.scala:81)
>>>
>>> 16/02/04 17:31:59 ERROR SparkContext: Error initializing SparkContext.
>>> akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is
>>> not unique!
>>>
>>> and finally:
>>>
>>> [info] IngestSpec:
>>> [info] Exception encountered when attempting to run a suite with class
>>> name: com.company.package.IngestSpec *** ABORTED ***
>>> [info]   akka.actor.InvalidActorNameException: actor name
>>> [ExecutorEndpoint] is not unique!
>>>
>>>
>>> What do I need to do to get a sqlContext through my tests?
>>>
>>> Thanks,
>>>
>>> -- Steve
>>>
>>
>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Regards,
> Rishitesh Mishra,
> SnappyData . (http://www.snappydata.io/)
>
> https://in.linkedin.com/in/rishiteshmishra
>


Re: Kafka directsream receiving rate

2016-02-05 Thread Cody Koeninger
Have you tried just printing each message, to see which ones are being
processed?

On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> I am  able  to  see  no of  messages processed  per  event  in
>  sparkstreaming web UI . Also  I am  counting  the  messages inside
>  foreachRDD .
> Removed  the  settings for  backpressure but still  the  same .
>
>
>
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: Cody Koeninger 
> Date:06/02/2016 00:33 (GMT+05:30)
> To: Diwakar Dhanuskodi 
> Cc: user@spark.apache.org
> Subject: Re: Kafka directsream receiving rate
>
> How are you counting the number of messages?
>
> I'd go ahead and remove the settings for backpressure and
> maxrateperpartition, just to eliminate that as a variable.
>
> On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
>
>> I am  using  one  directsream. Below  is  the  call  to directsream:-
>>
>> val topicSet = topics.split(",").toSet
>> val kafkaParams = Map[String,String]("bootstrap.servers" -> "
>> datanode4.isdp.com:9092")
>> val k =
>> KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,
>> kafkaParams, topicSet)
>>
>> When  I replace   DirectStream call  to  createStream,  all  messages
>> were  read  by  one  Dstream block.:-
>> val k = KafkaUtils.createStream(ssc, 
>> "datanode4.isdp.com:2181","resp",topicMap
>> ,StorageLevel.MEMORY_ONLY)
>>
>> I am  using   below  spark-submit to execute:
>> ./spark-submit --master yarn-client --conf
>> "spark.dynamicAllocation.enabled=true" --conf
>> "spark.shuffle.service.enabled=true" --conf
>> "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf
>> "spark.sql.unsafe.enabled=false" --conf
>> "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s"
>> --conf "spark.shuffle.consolidateFiles=true"   --conf
>> "spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g
>> --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files
>> /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
>> --jars
>> /root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
>> /root/Jars/sparkreceiver.jar
>>
>>
>>
>>
>> Sent from Samsung Mobile.
>>
>>
>>  Original message 
>> From: Cody Koeninger 
>> Date:05/02/2016 22:07 (GMT+05:30)
>> To: Diwakar Dhanuskodi 
>> Cc: user@spark.apache.org
>> Subject: Re: Kafka directsream receiving rate
>>
>> If you're using the direct stream, you have 0 receivers.  Do you mean you
>> have 1 executor?
>>
>> Can you post the relevant call to createDirectStream from your code, as
>> well as any relevant spark configuration?
>>
>> On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi <
>> diwakar.dhanusk...@gmail.com> wrote:
>>
>>> Adding more info
>>>
>>> Batch  interval  is  2000ms.
>>> I expect all 100 messages  go thru one  dstream from  directsream but it
>>> receives at rate of 10 messages at time. Am  I missing  some
>>>  configurations here. Any help appreciated.
>>>
>>> Regards
>>> Diwakar.
>>>
>>>
>>> Sent from Samsung Mobile.
>>>
>>>
>>>  Original message 
>>> From: Diwakar Dhanuskodi 
>>> Date:05/02/2016 07:33 (GMT+05:30)
>>> To: user@spark.apache.org
>>> Cc:
>>> Subject: Kafka directsream receiving rate
>>>
>>> Hi,
>>> Using spark 1.5.1.
>>> I have a topic with 20 partitions.  When I publish 100 messages. Spark
>>> direct stream is receiving 10 messages per  dstream. I have  only  one
>>>  receiver . When I used createStream the  receiver  received  entire 100
>>> messages  at once.
>>>
>>> Appreciate  any  help .
>>>
>>> Regards
>>> Diwakar
>>>
>>>
>>> Sent from Samsung Mobile.
>>>
>>
>>
>


Shuffle memory woes

2016-02-05 Thread Corey Nolet
I just recently had a discovery that my jobs were taking several hours to
completely because of excess shuffle spills. What I found was that when I
hit the high point where I didn't have enough memory for the shuffles to
store all of their file consolidations at once, it could spill so many
times that it causes my job's runtime to increase by orders of magnitude
(and sometimes fail altogether).

I've played with all the tuning parameters I can find. To speed the
shuffles up, I tuned the akka threads to different values. I also tuned the
shuffle buffering a tad (both up and down).

I feel like I see a weak point here. The mappers are sharing memory space
with reducers and the shuffles need enough memory to consolidate and pull
otherwise they will need to spill and spill and spill. What i've noticed
about my jobs is that this is a difference between them taking 30 minutes
and 4 hours or more. Same job- just different memory tuning.

I've found that, as a result of the spilling, I'm better off not caching
any data in memory and lowering my storage fraction to 0 and still hoping I
was able to give my shuffles enough memory that my data doesn't
continuously spill. Is this the way it's supposed to be? It makes it hard
because it seems like it forces the memory limits on my job- otherwise it
could take orders of magnitude longer to execute.


Re: Using jar bundled log4j.xml on worker nodes

2016-02-05 Thread Matthias Niehoff
mh, that seems to be the problem we are facing. but with —files you can
just pass local files, no files in the class path. so we would need a file
outside of our jar..

2016-02-04 18:20 GMT+01:00 Ted Yu :

> Have you taken a look at SPARK-11105 ?
>
> Cheers
>
> On Thu, Feb 4, 2016 at 9:06 AM, Matthias Niehoff <
> matthias.nieh...@codecentric.de> wrote:
>
>> Hello everybody,
>>
>> we’ve bundle our log4j.xml into our jar (in the classpath root).
>>
>> I’ve added the log4j.xml to the spark-defaults.conf with
>>
>> spark.{driver,executor}.extraJavaOptions=-Dlog4j.configuration=log4j.xml
>>
>> There is no log4j.properties or log4j.xml in one of the conf folders on
>> any machine.
>>
>> When I start the app the driver is using our log4j.xml, but all the
>> executors use the default log4j.properties („Using Spark’s default log4j
>> profile: org/apache/spark/log4j-defaults.properties“).
>>
>> What do I have to change to make spark use the log4j.xml from our jar
>> also on our executors?
>>
>> We are using Spark 1.5.2
>>
>> Thank you!
>>
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> 172.1702676
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>> E-Mail ist nicht gestattet
>>
>
>


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


RE: pass one dataframe column value to another dataframe filter expression + Spark 1.5 + scala

2016-02-05 Thread Lohith Samaga M
Hi,
If you can also format the condition file as a csv file similar 
to the main file, then you can join the two dataframes and select only required 
columns.

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga

From: Divya Gehlot [mailto:divya.htco...@gmail.com]
Sent: Friday, February 05, 2016 13.12
To: user @spark
Subject: pass one dataframe column value to another dataframe filter expression 
+ Spark 1.5 + scala

Hi,
I have two input datasets
First input dataset like as below :

year,make,model,comment,blank
"2012","Tesla","S","No comment",
1997,Ford,E350,"Go get one now they are going fast",
2015,Chevy,Volt

Second Input dataset :

TagId,condition
1997_cars,year = 1997 and model = 'E350'
2012_cars,year=2012 and model ='S'
2015_cars ,year=2015 and model = 'Volt'

Now my requirement is read first data set and based on the filtering condition 
in second dataset need to tag rows of first input dataset by introducing a new 
column TagId to first input data set
so the expected should look like :

year,make,model,comment,blank,TagId
"2012","Tesla","S","No comment",2012_cars
1997,Ford,E350,"Go get one now they are going fast",1997_cars
2015,Chevy,Volt, ,2015_cars

I tried like :

val sqlContext = new SQLContext(sc)
val carsSchema = StructType(Seq(
StructField("year", IntegerType, true),
StructField("make", StringType, true),
StructField("model", StringType, true),
StructField("comment", StringType, true),
StructField("blank", StringType, true)))

val carTagsSchema = StructType(Seq(
StructField("TagId", StringType, true),
StructField("condition", StringType, true)))


val dfcars = 
sqlContext.read.format("com.databricks.spark.csv").option("header", "true") 
.schema(carsSchema).load("/TestDivya/Spark/cars.csv")
val dftags = 
sqlContext.read.format("com.databricks.spark.csv").option("header", "true") 
.schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv")

val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
val cdtnval = dftags.select("condition")
val df2=dfcars.filter(cdtnval)
:35: error: overloaded method value filter with alternatives:
  (conditionExpr: String)org.apache.spark.sql.DataFrame 
  (condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.sql.DataFrame)
   val df2=dfcars.filter(cdtnval)

another way :

val col = dftags.col("TagId")
val finaldf = dfcars.withColumn("TagId", col)
org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5 
missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project 
[year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];

finaldf.write.format("com.databricks.spark.csv").option("header", 
"true").save("/TestDivya/Spark/carswithtags.csv")


Would really appreciate if somebody give me pointers how can I pass the filter 
condition(second dataframe) to filter function of first dataframe.
Or another solution .
My apppologies for such a naive question as I am new to scala and Spark

Thanks
Information transmitted by this e-mail is proprietary to Mphasis, its 
associated companies and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may 
contain information that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended 
recipient or it appears that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination 
of this information in any manner is strictly 
prohibited. In such cases, please notify us immediately at 
mailmas...@mphasis.com and delete this mail from your records.


Too many open files, why changing ulimit not effecting?

2016-02-05 Thread Mohamed Nadjib MAMI

Hello all,

I'm getting the famous /java.io.FileNotFoundException: ... (Too many 
open files) /exception. What seemed to have helped people out, it 
haven't for me. I tried to set the ulimit via the command line /"ulimit 
-n"/, then I tried to add the following lines to 
/"/etc/security/limits.conf"/ file:

/
//* - nofile 100//
//root soft nofile 100//
//root hard nofile 100//
//hduser soft nofile 100//
//hduser hard nofile 100/

...then I added this line /"session required pam_limits.so"/ to the two 
files/"/etc/pam.d/common-session"/ and /"session required 
pam_limits.so"/. The I logged-out/logged-in. First, I tried only the 
first line (/* - nofile 100//)/, then added the 2nd and the 3rd 
(root...),  then added the last two lines (hduser...), no effect. 
Weirdly enough, when I check with the command /"ulimit -n"/ it returns 
the correct value of 100.


I then added /"ulimit -n 100"/ to /"spark-env.sh"/ in the master and 
in each of my workers, no effect.


What else could it be besides changing the ulimit setting? if it's only 
that, what could cause Spark to ignore it?


I'll appreciate any help in advance.

--
/PhD Student - EIS Group - Bonn University, Germany.//
//+49 1575 8482232/



Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-05 Thread Udo Fholl
It does not look like. Here is the output of "grep -A2 -i waiting
spark_tdump.log"

"RMI TCP Connection(idle)" daemon prio=5 tid=156 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"task-result-getter-1" daemon prio=5 tid=101 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"BLOCK_MANAGER cleanup timer" daemon prio=5 tid=46 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"context-cleaner-periodic-gc" daemon prio=5 tid=69 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"qtp512934838-58" daemon prio=5 tid=58 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"dispatcher-event-loop-3" daemon prio=5 tid=22 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"RMI TCP Connection(idle)" daemon prio=5 tid=150 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"submit-job-thread-pool-0" daemon prio=5 tid=83 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"cw-metrics-publisher" daemon prio=5 tid=90 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at
com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:136)
--
"qtp512934838-57" daemon prio=5 tid=57 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"sparkDriverActorSystem-akka.remote.default-remote-dispatcher-19" daemon
prio=5 tid=193 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
--
"dispatcher-event-loop-2" daemon prio=5 tid=21 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"qtp512934838-56" daemon prio=5 tid=56 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"BROADCAST_VARS cleanup timer" daemon prio=5 tid=47 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"pool-1-thread-1" prio=5 tid=16 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"dispatcher-event-loop-0" daemon prio=5 tid=19 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"RecurringTimer - Kinesis Checkpointer - Worker
localhost:7b412e3a-f7c8-466d-90f1-deaad8656884" daemon prio=5 tid=89
TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
--
"qtp512934838-55" daemon prio=5 tid=55 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"Executor task launch worker-0" daemon prio=5 tid=84 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"qtp512934838-54" daemon prio=5 tid=54 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"pool-28-thread-1" prio=5 tid=92 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"sparkDriverActorSystem-akka.remote.default-remote-dispatcher-18" daemon
prio=5 tid=185 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at
scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
--
"Spark Context Cleaner" daemon prio=5 tid=68 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
--
"qtp512934838-53" daemon prio=5 tid=53 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"SparkListenerBus" daemon prio=5 tid=18 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"block-manager-slave-async-thread-pool-6" daemon prio=5 tid=179
TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"RMI Scheduler(0)" daemon prio=5 tid=151 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"Executor task launch worker-1" daemon prio=5 tid=99 WAITING
  at java.lang.Object.wait(Native Method)
  

DenseMatrix update

2016-02-05 Thread Zapper22
There was Update method in Spark 1.3.1

https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/mllib/linalg/DenseMatrix.html

But in Spark 1.6.0, there is no Update method

https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/mllib/linalg/DenseMatrix.html

My idea is to store large set of elements in a matrix and perform operation
on that, How to update values in DenseMatrix ??

Any example or suggestions are welcome!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/DenseMatrix-update-tp26157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Hadoop credentials missing in some tasks?

2016-02-05 Thread Gerard Maas
Hi,

We're facing a situation where simple queries to parquet files stored in
Swift through a Hive Metastore sometimes fail with this exception:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 6
in stage 58.0 failed 4 times, most recent failure: Lost task 6.3 in stage
58.0 (TID 412, agent-1.mesos.private):
org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Missing
mandatory configuration option: fs.swift.service.##.auth.url
at 
org.apache.hadoop.fs.swift.http.RestClientBindings.copy(RestClientBindings.java:219)
(...)

Queries requiring a full table scan, like select(count(*)) would fail with
the mentioned exception while smaller chunks of work like " select *
 from... LIMIT 5" would succeed.

The problem seems to relate to the number of tasks scheduled:

If we force a reduction of the number of tasks to 1, the job  succeeds:

dataframe.rdd.coalesce(1).count()

Would return a correct result while

dataframe.count() would fail with the exception mentioned  above.

To me, it looks like credentials are lost somewhere in the serialization
path when the tasks are submitted to the cluster.  I have not found an
explanation yet to why a job that requires only one task succeeds.

We are running on Apache Zepellin  for Swift and Spark Notebook for S3.
Both show an equivalent exception within their specific hadoop filesystem
implementation when the task fails:

Zepelling + Swift:

org.apache.hadoop.fs.swift.exceptions.SwiftConfigurationException: Missing
mandatory configuration option: fs.swift.service.##.auth.url

Spark Notebook + S3:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key
must be specified as the username or password (respectively) of a s3n URL,
or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey
properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)

Valid credentials are being set programmatically through
sc.hadoopConfiguration

Our system: Zepellin or Spark Notebook with Spark 1.5.1 running on Docker,
Docker running on Mesos, Hadoop 2.4.0. One environment running on Softlayer
(Swift) and other Amazon EC2 (S3) of similar sizes.

Any ideas on how to address this issue or figure out what's going on??

Thanks,  Gerard.


Re: Too many open files, why changing ulimit not effecting?

2016-02-05 Thread Ted Yu
bq. and *"session required pam_limits.so"*.

What was the second file you modified ?

Did you make the change on all the nodes ?

Please see the verification step in
https://easyengine.io/tutorials/linux/increase-open-files-limit/

On Fri, Feb 5, 2016 at 1:42 AM, Mohamed Nadjib MAMI 
wrote:

> Hello all,
>
> I'm getting the famous *java.io.FileNotFoundException: ... (Too many open
> files) *exception. What seemed to have helped people out, it haven't for
> me. I tried to set the ulimit via the command line *"ulimit -n"*, then I
> tried to add the following lines to *"/etc/security/limits.conf"* file:
>
> ** - nofile 100*
> *root soft nofile 100*
> *root hard nofile 100*
> *hduser soft nofile 100*
> *hduser hard nofile 100*
>
> ...then I added this line *"session required pam_limits.so"* to the two
> files* "/etc/pam.d/common-session"* and *"session required pam_limits.so"*.
> The I logged-out/logged-in. First, I tried only the first line (** -
> nofile 100**)*, then added the 2nd and the 3rd (root...),  then added
> the last two lines (hduser...), no effect. Weirdly enough, when I check
> with the command *"ulimit -n"* it returns the correct value of 100.
>
> I then added *"ulimit -n 100"* to *"spark-env.sh"* in the master and
> in each of my workers, no effect.
>
> What else could it be besides changing the ulimit setting? if it's only
> that, what could cause Spark to ignore it?
>
> I'll appreciate any help in advance.
>
> --
> *PhD Student - EIS Group - Bonn University, Germany.*
> *+49 1575 8482232 <%2B49%201575%208482232>*
>
>


pyspark - spark history server

2016-02-05 Thread cs user
Hi All,

I'm having trouble getting a job to use the spark history server. We have a
cluster configured with Ambari, if I run the job from one of the nodes
within the Ambari configured cluster, everything works fine, the job
appears in the spark history server.

If I configure a client external to the cluster, running the same job, the
history server is not used.

When the job completes fine, I see these lines appear in the log:


16/02/05 11:57:22 INFO history.YarnHistoryService: Starting
YarnHistoryService for application application_1453893909110_0108 attempt
Some(appattempt_1453893909110_0108_01); state=1; endpoint=
http://somehost:8188/ws/v1/timeline/; bonded to ATS=false; listening=false;
batchSize=10; flush count=0; total number queued=0, processed=0; attempted
entity posts=0 successful entity posts=0 failed entity posts=0; events
dropped=0; app start event received=false; app end event received=false;
16/02/05 11:57:22 INFO history.YarnHistoryService: Spark events will be
published to the Timeline service at http://somehost:8188/ws/v1/timeline/


On the client which is external to the cluster, these lines do not appear
in the logs. I have printed out spark context and attempted to match what
is configured on the working job, with the failing job, all seems fine.

These are the job settings:

conf.set('spark.speculation','true')
conf.set('spark.dynamicAllocation.enabled','false')
conf.set('spark.shuffle.service.enabled','false')
conf.set('spark.executor.instances', '4')
conf.set('spark.akka.threads','4')
conf.set('spark.dynamicAllocation.initialExecutors','4')
conf.set('spark.history.provider','org.apache.spark.deploy.yarn.history.YarnHistoryProvider')
conf.set('spark.yarn.services','org.apache.spark.deploy.yarn.history.YarnHistoryService')
conf.set('spark.history.ui.port','18080')
conf.set('spark.driver.extraJavaOptions','-Dhdp.version=2.3.4.0-3485')
conf.set('spark.yarn.containerLauncherMaxThreads','25')
conf.set('spark.yarn.driver.memoryOverhead','384')
conf.set('spark.yarn.executor.memoryOverhead','384')
conf.set('spark.yarn.historyServer.address','somehost:18080')
conf.set('spark.yarn.max.executor.failures','3')
conf.set('spark.yarn.preserve.staging.files','false')
conf.set('spark.yarn.queue','default')
conf.set('spark.yarn.scheduler.heartbeat.interval-ms','5000')
conf.set('spark.yarn.submit.file.replication','3')
conf.set('spark.yarn.am.extraJavaOptions','-Dhdp.version=2.3.4.0-3485')
conf.set('spark.blockManager.port','9096')
conf.set('spark.driver.port','9095')
conf.set('spark.fileserver.port','9097')

I am using the following tar.gz file to install spark on the node external
to the cluster:

http://www.apache.org/dyn/closer.lua/spark/spark-1.5.2/spark-1.5.2-bin-hadoop2.6.tgz

Will this version of spark have everything required to talk correctly to
yarn and the spark history service?

So it comes down to, the spark context settings appear to be exactly the
same, there are no errors in the logs pointing to the job not being able to
connect to anything, none of the ports are blocked, why is this not working
when run external to the cluster?

There is no kerberos security configured on the cluster.

Thanks!


Re: Please Add Our Meetup to the Spark Meetup List

2016-02-05 Thread Tushar R Kale
Hi Timothy,

It is the Spark Apache admin who adds groups to WW Spark Meetup list. This 
is correct email id which would address your request 
:user@spark.apache.org" .

Hope this helps.

Thank you and best regards,

Tushar Kale
BIG DATA Evangelist
Strategy & Analytics - Big Data CoC
tushark...@in.ibm.com
Mobile: 91-961-975-3366
http://www.meetup.com/Mumbai-Spark-Meetup/
Twitter:@tushar_kale
Linkedin: /www.linkedin.com/in/tusharkale9
"Big Data is our next natural resource – promising to do for our era what 
steam, electricity and oil did for the Industrial Age.“ Ginni Rometty




From:   Timothy Spann 
To: 
Cc: "user@spark.apache.org" 
Date:   02/06/2016 12:53 AM
Subject:Please Add Our Meetup to the Spark Meetup List



Our meetup is 

NJ Data Science - Apache Spark
http://www.meetup.com/nj-datascience
Princeton, NJ

Past Meetups:
Spark Streaming by Prasad Sripathi, airisDATA
August 13, 2015
ELK Stack and Spark
June 30, 2015
Spark Hands-on Intro workshop by Kristina Rogale Plazonic, airisDATA
June 18, 2015
Graph Analytics – Titan and Cassandra by Isaac Rieksts  (Slides)
June 4, 2015
Machine Learning Fundamentals  by SriSatish Ambati, H2O ML Platform
May 14, 2015  Slides

— 
airis.DATA
Timothy Spann, Senior Solutions Engineer
C: 609-250-5894





Re: pass one dataframe column value to another dataframe filter expression + Spark 1.5 + scala

2016-02-05 Thread Ali Tajeldin
I think the tricky part here is that the join condition is encoded in the 
second data frame and not a direct value.

Assuming the second data frame (the tags) is small enough, you can collect it 
(read it into memory) and then construct a "when" expression chain for each of 
the possible tags , then all you need to do is select on data frame 1 (df1) (or 
use "withColumn" to add the column) and provide the constructed "when" chain as 
the new column value.
From a very high level, looking at your example data below, you would construct 
the following expression from df2 (off course, in the real case, you would 
construct the expression programmatically from the collected df2 data and not 
hardcoded).

val e = when(expr("year = 1997 and model = 'E350'"), "1997_cars").
...
   when(expr("year = 2015 and model = 'Volt'"), "2015_cars").
   otherwise("unknown")

then you just need to add the new column to your input as:
df1.withColumn("tag", e)

I caveat this by saying that I have not tried the above (especially using 
"expr" to evaluate partial sql expressions, but should work according to doc).
Sometimes, half the battle is just finding the right API.  the 
"when"/"otherwise" is documented under the "Column" class and 
"withColumn"/"collect" are documented under "DataFrame".

--
Ali

 
On Feb 5, 2016, at 1:56 AM, Lohith Samaga M  wrote:

> Hi,
> If you can also format the condition file as a csv file 
> similar to the main file, then you can join the two dataframes and select 
> only required columns.
>  
> Best regards / Mit freundlichen Grüßen / Sincères salutations
> M. Lohith Samaga
>  
> From: Divya Gehlot [mailto:divya.htco...@gmail.com] 
> Sent: Friday, February 05, 2016 13.12
> To: user @spark
> Subject: pass one dataframe column value to another dataframe filter 
> expression + Spark 1.5 + scala
>  
> Hi,
> I have two input datasets
> First input dataset like as below : 
>  
> year,make,model,comment,blank
> "2012","Tesla","S","No comment",
> 1997,Ford,E350,"Go get one now they are going fast",
> 2015,Chevy,Volt
>  
> Second Input dataset :
>  
> TagId,condition
> 1997_cars,year = 1997 and model = 'E350'
> 2012_cars,year=2012 and model ='S'
> 2015_cars ,year=2015 and model = 'Volt'
>  
> Now my requirement is read first data set and based on the filtering 
> condition in second dataset need to tag rows of first input dataset by 
> introducing a new column TagId to first input data set 
> so the expected should look like :
>  
> year,make,model,comment,blank,TagId
> "2012","Tesla","S","No comment",2012_cars
> 1997,Ford,E350,"Go get one now they are going fast",1997_cars
> 2015,Chevy,Volt, ,2015_cars
>  
> I tried like :
>  
> val sqlContext = new SQLContext(sc)
> val carsSchema = StructType(Seq(
> StructField("year", IntegerType, true),
> StructField("make", StringType, true),
> StructField("model", StringType, true),
> StructField("comment", StringType, true),
> StructField("blank", StringType, true)))
> 
> val carTagsSchema = StructType(Seq(
> StructField("TagId", StringType, true),
> StructField("condition", StringType, true)))
> 
> 
> val dfcars = 
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true") 
> .schema(carsSchema).load("/TestDivya/Spark/cars.csv")
> val dftags = 
> sqlContext.read.format("com.databricks.spark.csv").option("header", "true") 
> .schema(carTagsSchema).load("/TestDivya/Spark/CarTags.csv")
> 
> val Amendeddf = dfcars.withColumn("TagId", dfcars("blank"))
> val cdtnval = dftags.select("condition")
> val df2=dfcars.filter(cdtnval)
> :35: error: overloaded method value filter with alternatives:
>   (conditionExpr: String)org.apache.spark.sql.DataFrame 
>   (condition: org.apache.spark.sql.Column)org.apache.spark.sql.DataFrame
>  cannot be applied to (org.apache.spark.sql.DataFrame)
>val df2=dfcars.filter(cdtnval)
>  
> another way :
>  
> val col = dftags.col("TagId")
> val finaldf = dfcars.withColumn("TagId", col)
> org.apache.spark.sql.AnalysisException: resolved attribute(s) TagId#5 
> missing from comment#3,blank#4,model#2,make#1,year#0 in operator !Project 
> [year#0,make#1,model#2,comment#3,blank#4,TagId#5 AS TagId#8];
> 
> finaldf.write.format("com.databricks.spark.csv").option("header", 
> "true").save("/TestDivya/Spark/carswithtags.csv")
>  
>  
> Would really appreciate if somebody give me pointers how can I pass the 
> filter condition(second dataframe) to filter function of first dataframe.
> Or another solution .
> My apppologies for such a naive question as I am new to scala and Spark 
>  
> Thanks  
> 
> Information transmitted by this e-mail is proprietary to Mphasis, its 
> associated companies and/ or its customers and is intended 
> for use only by the individual 

RE: different behavior while using createDataFrame and read.df in SparkR

2016-02-05 Thread Sun, Rui
I guess this is related to https://issues.apache.org/jira/browse/SPARK-11976

When calling createDataFrame on iris, the “.” Character in column names will be 
replaced with “_”.
It seems that when you create a DataFrame from the CSV file, the “.” Character 
in column names are still there.

From: Devesh Raj Singh [mailto:raj.deves...@gmail.com]
Sent: Friday, February 5, 2016 2:44 PM
To: user@spark.apache.org
Cc: Sun, Rui
Subject: different behavior while using createDataFrame and read.df in SparkR


Hi,

I am using Spark 1.5.1

When I do this

df <- createDataFrame(sqlContext, iris)

#creating a new column for category "Setosa"

df$Species1<-ifelse((df)[[5]]=="setosa",1,0)

head(df)

output: new column created

  Sepal.Length Sepal.Width Petal.Length Petal.Width Species
1  5.1 3.5  1.4 0.2  setosa
2  4.9 3.0  1.4 0.2  setosa
3  4.7 3.2  1.3 0.2  setosa
4  4.6 3.1  1.5 0.2  setosa
5  5.0 3.6  1.4 0.2  setosa
6  5.4 3.9  1.7 0.4  setosa

but when I saved the iris dataset as a CSV file and try to read it and convert 
it to sparkR dataframe

df <- read.df(sqlContext,"/Users/devesh/Github/deveshgit2/bdaml/data/iris/",
  source = "com.databricks.spark.csv",header = "true",inferSchema = 
"true")

now when I try to create new column

df$Species1<-ifelse((df)[[5]]=="setosa",1,0)
I get the below error:

16/02/05 12:11:01 ERROR RBackendHandler: col on 922 failed
Error in select(x, x$"*", alias(col, colName)) :
  error in evaluating the argument 'col' in selecting a method for function 
'select': Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Cannot resolve column name 
"Sepal.Length" among (Sepal.Length, Sepal.Width, Petal.Length, Petal.Width, 
Species);
at org.apache.spark.s
--
Warm regards,
Devesh.