Re: Spark yarn cluster

2020-07-11 Thread Diwakar Dhanuskodi
Thanks Martin.

I was not clear in my question initially . Thanks for  understanding and
briefing.

The idea as you said is  to explore the possibility of using yarn for
cluster scheduling with spark being used without hdfs.  Thanks again  for
clarification.

On Sat, Jul 11, 2020 at 1:27 PM Juan Martín Guillén <
juanmartinguil...@yahoo.com.ar> wrote:

> Hi Diwakar,
>
> A Yarn cluster not having Hadoop is kind of a fuzzy concept.
>
> Definitely you may want to have Hadoop and don't need to use MapReduce and
> use Spark instead. That is the main reason to use Spark in a Hadoop cluster
> anyway.
>
> On the other hand it is highly probable you may want to use HDFS although
> not strictly necessary.
>
> So answering your question you are using Hadoop by using Yarn because it
> is one of the 3 main components of it but that doesn't mean you need to use
> other components of the Hadoop cluster, namely MapReduce and HDFS.
>
> That being said, if you just need cluster scheduling and not using
> MapReduce nor HDFS it is possible you will be fine with the Spark
> Standalone cluster.
>
> Regards,
> Juan Martín.
>
> El sábado, 11 de julio de 2020 13:57:40 ART, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> escribió:
>
>
> Hi ,
>
> Could it be possible to setup Spark within Yarn cluster which may not have
> Hadoop?.
>
> Thanks.
>


Spark yarn cluster

2020-07-11 Thread Diwakar Dhanuskodi
Hi ,

Could it be possible to setup Spark within Yarn cluster which may not have
Hadoop?.

Thanks.


Foreachpartition in spark streaming

2017-03-20 Thread Diwakar Dhanuskodi
Just wanted to clarify!!!

Is foreachPartition in spark  an output operation?

Which one is better use mapPartitions or foreachPartitions?

Regards
Diwakar


Re: Pls assist: Spark 2.0 build failure on Ubuntu 16.06

2016-09-03 Thread Diwakar Dhanuskodi
Please run with -X and post logs here. We can get exact error from it.

On Sat, Sep 3, 2016 at 7:24 PM, Marco Mistroni  wrote:

> hi all
>
>  i am getting failures when building spark 2.0 on Ubuntu 16.06
> Here's details of what i have installed on the ubuntu host
> -  java 8
> - scala 2.11
> - git
>
> When i launch the command
>
> ./build/mvn  -Pyarn -Phadoop-2.7  -DskipTests clean package
>
> everything compiles sort of fine and at the end i get this exception
>
> INFO] BUILD FAILURE
> [INFO] 
> 
> [INFO] Total time: 01:05 h
> [INFO] Finished at: 2016-09-03T13:25:27+00:00
> [INFO] Final Memory: 57M/208M
> [INFO] 
> 
> [ERROR] Failed to execute goal 
> net.alchim31.maven:scala-maven-plugin:3.2.2:testCompile
> (scala-test-compile-first) on project spark-streaming_2.11: Execution
> scala-test-compile-first of goal 
> net.alchim31.maven:scala-maven-plugin:3.2.2:testCompile
> failed. CompileFailed -> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/
> PluginExecutionException
> [ERROR]
> [ERROR] After correcting the problems, you can resume the build with the
> command
> [ERROR]   mvn  -rf :spark-streaming_2.11
>
> could anyone help?
>
> kr
>


Hive connection issues in spark-shell

2016-09-03 Thread Diwakar Dhanuskodi
Hi,

I recently built spark using maven. Now when starting spark-shell, it
couldn't connect hive and getting below error

I couldn't find datanucleus jar in built library. But datanucleus jar is
available in hive/lib folders.

java.lang.ClassNotFoundException:
org.datanucleus.api.jdo.JDOPersistenceManagerFactory
at
scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at javax.jdo.JDOHelper$18.run(JDOHelper.java:2018)
at javax.jdo.JDOHelper$18.run(JDOHelper.java:2016)
at java.security.AccessController.doPrivileged(Native Method)
at javax.jdo.JDOHelper.forName(JDOHelper.java:2015)
at
javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1162)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365)
at
org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394)
at
org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291)
at
org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at
org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at
org.apache.hadoop.hive.metastore.RawStoreProxy.(RawStoreProxy.java:57)
at
org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:66)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:593)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:571)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.(RetryingHMSHandler.java:66)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
at
org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:199)
at
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:74)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:86)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
at
org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)
at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
at
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:204)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:238)
at
org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:218)
at
org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:208)
at
org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:462)
at
org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:461)
at org.apache.spark.sql.UDFRegistration.(UDFRegistration.scala:40)
at org.apache.spark.sql.SQLContext.(SQLContext.scala:330)
at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:97)
at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:101)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.spark.repl.Main$.createSQLContext(Main.scala:89)
at $line4.$read$$iw$$iw.(:12)
at $line4.$read$$iw.(:21)
at $line4.$read.(:23)
at $line4.$read$.(:27)
at $line4.$read$.()
at $line4.$eval$.$print$lzycompute(:7)
at $line4.$eval$.$print(:6)
at $line4.$eval.$print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at

Re: Spark build 1.6.2 error

2016-09-03 Thread Diwakar Dhanuskodi
Sorry my bad. In both the runs I included -Dscala-2.11

On Sat, Sep 3, 2016 at 12:39 PM, Nachiketa <nachiketa.shu...@gmail.com>
wrote:

> I think the difference was the -Dscala2.11 to the command line.
>
> I have seen this show up when I miss that.
>
> Regards,
> Nachiketa
>
> On Sat 3 Sep, 2016, 12:14 PM Diwakar Dhanuskodi, <
> diwakar.dhanusk...@gmail.com> wrote:
>
>> Hi,
>>
>> Just re-ran again without killing zinc server process
>>
>> /make-distribution.sh --name custom-spark --tgz  -Phadoop-2.6 -Phive
>> -Pyarn -Dmaven.version=3.0.4 -Dscala-2.11 -X -rf :spark-sql_2.11
>>
>> Build is success. Not sure how it worked with just re-running command
>> again.
>>
>> On Sat, Sep 3, 2016 at 11:44 AM, Diwakar Dhanuskodi <
>> diwakar.dhanusk...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> java version 7
>>>
>>> mvn command
>>> ./make-distribution.sh --name custom-spark --tgz  -Phadoop-2.6 -Phive
>>> -Phive-thriftserver -Pyarn -Dmaven.version=3.0.4
>>>
>>>
>>> yes, I executed script to change scala version to 2.11
>>> killed  "com.typesafe zinc.Nailgun" process
>>>
>>> re-ran mvn with below command again
>>>
>>> ./make-distribution.sh --name custom-spark --tgz  -Phadoop-2.6 -Phive
>>> -Phive-thriftserver -Pyarn -Dmaven.version=3.0.4 -X -rf :spark-sql_2.11
>>>
>>> Getting same error
>>>
>>> [warn] /home/cloudera/Downloads/spark-1.6.2/sql/core/src/main/
>>> scala/org/apache/spark/sql/sources/interfaces.scala:911: method isDir
>>> in class FileStatus is deprecated: see corresponding Javadoc for more
>>> information.
>>> [warn] status.isDir,
>>> [warn]^
>>> [error] missing or invalid dependency detected while loading class file
>>> 'WebUI.class'.
>>> [error] Could not access term eclipse in package org,
>>> [error] because it (or its dependencies) are missing. Check your build
>>> definition for
>>> [error] missing or conflicting dependencies. (Re-run with
>>> `-Ylog-classpath` to see the problematic classpath.)
>>> [error] A full rebuild may help if 'WebUI.class' was compiled against an
>>> incompatible version of org.
>>> [error] missing or invalid dependency detected while loading class file
>>> 'WebUI.class'.
>>> [error] Could not access term jetty in value org.eclipse,
>>> [error] because it (or its dependencies) are missing. Check your build
>>> definition for
>>> [error] missing or conflicting dependencies. (Re-run with
>>> `-Ylog-classpath` to see the problematic classpath.)
>>> [error] A full rebuild may help if 'WebUI.class' was compiled against an
>>> incompatible version of org.eclipse.
>>> [warn] 17 warnings found
>>> [error] two errors found
>>> [debug] Compilation failed (CompilerInterface)
>>> [error] Compile failed at Sep 3, 2016 11:28:34 AM [21.611s]
>>> [INFO] 
>>> 
>>> [INFO] Reactor Summary:
>>> [INFO]
>>> [INFO] Spark Project Parent POM .. SUCCESS
>>> [5.583s]
>>> [INFO] Spark Project Test Tags ... SUCCESS
>>> [4.189s]
>>> [INFO] Spark Project Launcher  SUCCESS
>>> [12.226s]
>>> [INFO] Spark Project Networking .. SUCCESS
>>> [13.386s]
>>> [INFO] Spark Project Shuffle Streaming Service ... SUCCESS
>>> [6.723s]
>>> [INFO] Spark Project Unsafe .. SUCCESS
>>> [21.231s]
>>> [INFO] Spark Project Core  SUCCESS
>>> [3:46.334s]
>>> [INFO] Spark Project Bagel ... SUCCESS
>>> [7.032s]
>>> [INFO] Spark Project GraphX .. SUCCESS
>>> [19.558s]
>>> [INFO] Spark Project Streaming ... SUCCESS
>>> [50.452s]
>>> [INFO] Spark Project Catalyst  SUCCESS
>>> [1:14.172s]
>>> [INFO] Spark Project SQL . FAILURE
>>> [23.222s]
>>> [INFO] Spark Project ML Library .. SKIPPED
>>> [INFO] Spark Project Tools ... SKIPPED
>>> [INFO] Spark Project Hive  SKIPPED
>>> [INFO] Spark Project Docker Integration 

Re: Spark build 1.6.2 error

2016-09-03 Thread Diwakar Dhanuskodi
Hi,

Just re-ran again without killing zinc server process

/make-distribution.sh --name custom-spark --tgz  -Phadoop-2.6 -Phive -Pyarn
-Dmaven.version=3.0.4 -Dscala-2.11 -X -rf :spark-sql_2.11

Build is success. Not sure how it worked with just re-running command
again.

On Sat, Sep 3, 2016 at 11:44 AM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> Hi,
>
> java version 7
>
> mvn command
> ./make-distribution.sh --name custom-spark --tgz  -Phadoop-2.6 -Phive
> -Phive-thriftserver -Pyarn -Dmaven.version=3.0.4
>
>
> yes, I executed script to change scala version to 2.11
> killed  "com.typesafe zinc.Nailgun" process
>
> re-ran mvn with below command again
>
> ./make-distribution.sh --name custom-spark --tgz  -Phadoop-2.6 -Phive
> -Phive-thriftserver -Pyarn -Dmaven.version=3.0.4 -X -rf :spark-sql_2.11
>
> Getting same error
>
> [warn] /home/cloudera/Downloads/spark-1.6.2/sql/core/src/main/
> scala/org/apache/spark/sql/sources/interfaces.scala:911: method isDir in
> class FileStatus is deprecated: see corresponding Javadoc for more
> information.
> [warn] status.isDir,
> [warn]^
> [error] missing or invalid dependency detected while loading class file
> 'WebUI.class'.
> [error] Could not access term eclipse in package org,
> [error] because it (or its dependencies) are missing. Check your build
> definition for
> [error] missing or conflicting dependencies. (Re-run with
> `-Ylog-classpath` to see the problematic classpath.)
> [error] A full rebuild may help if 'WebUI.class' was compiled against an
> incompatible version of org.
> [error] missing or invalid dependency detected while loading class file
> 'WebUI.class'.
> [error] Could not access term jetty in value org.eclipse,
> [error] because it (or its dependencies) are missing. Check your build
> definition for
> [error] missing or conflicting dependencies. (Re-run with
> `-Ylog-classpath` to see the problematic classpath.)
> [error] A full rebuild may help if 'WebUI.class' was compiled against an
> incompatible version of org.eclipse.
> [warn] 17 warnings found
> [error] two errors found
> [debug] Compilation failed (CompilerInterface)
> [error] Compile failed at Sep 3, 2016 11:28:34 AM [21.611s]
> [INFO] 
> 
> [INFO] Reactor Summary:
> [INFO]
> [INFO] Spark Project Parent POM .. SUCCESS [5.583s]
> [INFO] Spark Project Test Tags ... SUCCESS [4.189s]
> [INFO] Spark Project Launcher  SUCCESS
> [12.226s]
> [INFO] Spark Project Networking .. SUCCESS
> [13.386s]
> [INFO] Spark Project Shuffle Streaming Service ... SUCCESS [6.723s]
> [INFO] Spark Project Unsafe .. SUCCESS
> [21.231s]
> [INFO] Spark Project Core  SUCCESS
> [3:46.334s]
> [INFO] Spark Project Bagel ... SUCCESS
> [7.032s]
> [INFO] Spark Project GraphX .. SUCCESS
> [19.558s]
> [INFO] Spark Project Streaming ... SUCCESS
> [50.452s]
> [INFO] Spark Project Catalyst  SUCCESS
> [1:14.172s]
> [INFO] Spark Project SQL . FAILURE
> [23.222s]
> [INFO] Spark Project ML Library .. SKIPPED
> [INFO] Spark Project Tools ... SKIPPED
> [INFO] Spark Project Hive  SKIPPED
> [INFO] Spark Project Docker Integration Tests  SKIPPED
> [INFO] Spark Project REPL  SKIPPED
> [INFO] Spark Project YARN Shuffle Service  SKIPPED
> [INFO] Spark Project YARN  SKIPPED
> [INFO] Spark Project Assembly  SKIPPED
> [INFO] Spark Project External Twitter  SKIPPED
> [INFO] Spark Project External Flume Sink . SKIPPED
> [INFO] Spark Project External Flume .. SKIPPED
> [INFO] Spark Project External Flume Assembly . SKIPPED
> [INFO] Spark Project External MQTT ... SKIPPED
> [INFO] Spark Project External MQTT Assembly .. SKIPPED
> [INFO] Spark Project External ZeroMQ . SKIPPED
> [INFO] Spark Project External Kafka .. SKIPPED
> [INFO] Spark Project Examples  SKIPPED
> [INFO] Spark Project External Kafka Assembly . SKIPPED
> [INFO] 
> 
> [INFO] BUILD FAILURE
> [INFO] -

Re: Spark build 1.6.2 error

2016-09-03 Thread Diwakar Dhanuskodi
utor.java:225)
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84)
at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59)
at
org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183)
at
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161)
at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320)
at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156)
at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537)
at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196)
at org.apache.maven.cli.MavenCli.main(MavenCli.java:141)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:290)
at
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:230)
at
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:409)
at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:352)
Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
scala-compile-first of goal
net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
at
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:110)
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:209)
... 19 more
Caused by: Compile failed via zinc server
at
sbt_inc.SbtIncrementalCompiler.zincCompile(SbtIncrementalCompiler.java:136)
at sbt_inc.SbtIncrementalCompiler.compile(SbtIncrementalCompiler.java:86)
at
scala_maven.ScalaCompilerSupport.incrementalCompile(ScalaCompilerSupport.java:303)
at scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:119)
at scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99)
at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482)
at
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:101)
... 20 more
[ERROR]
[ERROR]
[ERROR] For more information about the errors and possible solutions,
please read the following articles:
[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the
command
[ERROR]   mvn  -rf :spark-sql_2.11


On Thu, Sep 1, 2016 at 8:23 AM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

> Which java version are you using ?
>
> On 31 August 2016 at 04:30, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
>
>> Hi,
>>
>> While building Spark 1.6.2 , getting below error in spark-sql. Much
>> appreciate for any help.
>>
>> ERROR] missing or invalid dependency detected while loading class file
>> 'WebUI.class'.
>> Could not access term eclipse in package org,
>> because it (or its dependencies) are missing. Check your build definition
>> for
>> missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to
>> see the problematic classpath.)
>> A full rebuild may help if 'WebUI.class' was compiled against an
>> incompatible version of org.
>> [ERROR] missing or invalid dependency detected while loading class file
>> 'WebUI.class'.
>> Could not access term jetty in value org.eclipse,
>> because it (or its dependencies) are missing. Check your build definition
>> for
>> missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to
>> see the problematic classpath.)
>> A full rebuild may help if 'WebUI.class' was compiled against an
>> incompatible version of org.eclipse.
>> [WARNING] 17 warnings found
>> [ERROR] two errors found
>> [INFO] 
>> 
>> [INFO] Reactor Summary:
>> [INFO]
>> [INFO] Spark Project Parent POM .. SUCCESS
>> [4.399s]
>> [INFO] Spark Project Test Tags ... SUCCESS
>> [3.443s]
>> [INFO] Spark Project Launcher  SUCCESS
>> [10.131s]
>> [INFO] Spark Project Networking .. SUCCESS
>> [11.849s]
>> [INFO] Spark Project Shuffle Streaming Service ... SUCCESS
>> [6.641s]
>> [INFO] Spark Project Unsafe .. SUCCESS
>> [19.765s]

Spark build 1.6.2 error

2016-08-30 Thread Diwakar Dhanuskodi
Hi,

While building Spark 1.6.2 , getting below error in spark-sql. Much
appreciate for any help.

ERROR] missing or invalid dependency detected while loading class file
'WebUI.class'.
Could not access term eclipse in package org,
because it (or its dependencies) are missing. Check your build definition
for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see
the problematic classpath.)
A full rebuild may help if 'WebUI.class' was compiled against an
incompatible version of org.
[ERROR] missing or invalid dependency detected while loading class file
'WebUI.class'.
Could not access term jetty in value org.eclipse,
because it (or its dependencies) are missing. Check your build definition
for
missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see
the problematic classpath.)
A full rebuild may help if 'WebUI.class' was compiled against an
incompatible version of org.eclipse.
[WARNING] 17 warnings found
[ERROR] two errors found
[INFO]

[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM .. SUCCESS [4.399s]
[INFO] Spark Project Test Tags ... SUCCESS [3.443s]
[INFO] Spark Project Launcher  SUCCESS [10.131s]
[INFO] Spark Project Networking .. SUCCESS [11.849s]
[INFO] Spark Project Shuffle Streaming Service ... SUCCESS [6.641s]
[INFO] Spark Project Unsafe .. SUCCESS [19.765s]
[INFO] Spark Project Core  SUCCESS
[4:16.511s]
[INFO] Spark Project Bagel ... SUCCESS [13.401s]
[INFO] Spark Project GraphX .. SUCCESS
[1:08.824s]
[INFO] Spark Project Streaming ... SUCCESS
[2:18.844s]
[INFO] Spark Project Catalyst  SUCCESS
[2:43.695s]
[INFO] Spark Project SQL . FAILURE
[1:01.762s]
[INFO] Spark Project ML Library .. SKIPPED
[INFO] Spark Project Tools ... SKIPPED
[INFO] Spark Project Hive  SKIPPED
[INFO] Spark Project Docker Integration Tests  SKIPPED
[INFO] Spark Project REPL  SKIPPED
[INFO] Spark Project YARN Shuffle Service  SKIPPED
[INFO] Spark Project YARN  SKIPPED
[INFO] Spark Project Assembly  SKIPPED
[INFO] Spark Project External Twitter  SKIPPED
[INFO] Spark Project External Flume Sink . SKIPPED
[INFO] Spark Project External Flume .. SKIPPED
[INFO] Spark Project External Flume Assembly . SKIPPED
[INFO] Spark Project External MQTT ... SKIPPED
[INFO] Spark Project External MQTT Assembly .. SKIPPED
[INFO] Spark Project External ZeroMQ . SKIPPED
[INFO] Spark Project External Kafka .. SKIPPED
[INFO] Spark Project Examples  SKIPPED
[INFO] Spark Project External Kafka Assembly . SKIPPED
[INFO]

[INFO] BUILD FAILURE
[INFO]

[INFO] Total time: 12:40.525s
[INFO] Finished at: Wed Aug 31 01:56:50 IST 2016
[INFO] Final Memory: 71M/830M
[INFO]

[ERROR] Failed to execute goal
net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first)
on project spark-sql_2.11: Execution scala-compile-first of goal
net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed
-> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions,
please read the following articles:
[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the
command
[ERROR]   mvn  -rf :spark-sql_2.11


Re: Best way to read XML data from RDD

2016-08-22 Thread Diwakar Dhanuskodi
Below is source code for parsing xml RDD which has single line xml data.

import scala.xml.XML
import scala.xml.Elem
import scala.collection.mutable.ArrayBuffer
import scala.xml.Text
import scala.xml.Node


var dataArray= new ArrayBuffer[String]()
def processNode(node: Node, fp1: String):Unit = node match
{
   case Elem(prefix,label,attribs,scope,Text(text)) =>
  dataArray.+=:("Cust.001.001.03-"+fp1+","+text)
   case _ => for (n <- node.child)
  {
   val fp=fp1+"/"+n.label
   processNode(n, fp)
  }
}


val dataDF = xmlData
 .map { x =>
val p =
XML.loadString(x.get(0).toString.mkString)
val xsd = utils.getXSD(p)
println("xsd -- ",xsd)
val f  = "/" + p.label
val msgId = (p \\ "Fnd" \ "Mesg" \ "Paid" \
"Record" \ "CustInit" \ "GroupFirst" \ "MesgId").text
processNode(p,f,xsd)
(mesgId
,utils.dataArray,x.get(1).toString())
 }
   .flatMap{x =>
 val msgId = x._1
 val y = x._2.toIterable.map { x1 =>

 (mesgId,x1.split(",").apply(0),x1.split(",").apply(1),x._3)
 }
 y
 }.toDF("key","attribute","value","type")

On Mon, Aug 22, 2016 at 4:34 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote:

> Do you mind share your codes and sample data? It should be okay with
> single XML if I remember this correctly.
>
> 2016-08-22 19:53 GMT+09:00 Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com>:
>
>> Hi Darin,
>>
>> Ate  you  using  this  utility  to  parse single line XML?
>>
>>
>> Sent from Samsung Mobile.
>>
>>
>>  Original message 
>> From: Darin McBeath <ddmcbe...@yahoo.com>
>> Date:21/08/2016 17:44 (GMT+05:30)
>> To: Hyukjin Kwon <gurwls...@gmail.com>, Jörn Franke <jornfra...@gmail.com>
>>
>> Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, Felix Cheung <
>> felixcheun...@hotmail.com>, user <user@spark.apache.org>
>> Subject: Re: Best way to read XML data from RDD
>>
>> Another option would be to look at spark-xml-utils.  We use this
>> extensively in the manipulation of our XML content.
>>
>> https://github.com/elsevierlabs-os/spark-xml-utils
>>
>>
>>
>> There are quite a few examples.  Depending on your preference (and what
>> you want to do), you could use xpath, xquery, or xslt to transform,
>> extract, or filter.
>>
>> Like mentioned below, you want to initialize the parser in a
>> mapPartitions call (one of the examples shows this).
>>
>> Hope this is helpful.
>>
>> Darin.
>>
>>
>>
>>
>>
>> 
>> From: Hyukjin Kwon <gurwls...@gmail.com>
>> To: Jörn Franke <jornfra...@gmail.com>
>> Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>; Felix Cheung <
>> felixcheun...@hotmail.com>; user <user@spark.apache.org>
>> Sent: Sunday, August 21, 2016 6:10 AM
>> Subject: Re: Best way to read XML data from RDD
>>
>>
>>
>> Hi Diwakar,
>>
>> Spark XML library can take RDD as source.
>>
>> ```
>> val df = new XmlReader()
>>   .withRowTag("book")
>>   .xmlRdd(sqlContext, rdd)
>> ```
>>
>> If performance is critical, I would also recommend to take care of
>> creation and destruction of the parser.
>>
>> If the parser is not serializble, then you can do the creation for each
>> partition within mapPartition just like
>>
>> https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a
>> 48fed9bb188140423/sql/core/src/main/scala/org/apache/
>> spark/sql/DataFrameReader.scala#L322-L325
>>
>>
>> I hope this is helpful.
>>
>>
>>
>>
>> 2016-08-20 15:10 GMT+09:00 Jörn Franke <jornfra...@gmail.com>:
>>
>> I fear the issue is that this will create and destroy a XML parser object
>> 2 mio times, which is very inefficient - it does not really look like a
>> parser performance issue. Can't you do something about the format choice?
>> Ask your suppl

Re: Best way to read XML data from RDD

2016-08-22 Thread Diwakar Dhanuskodi
Hi Darin, 

Ate  you  using  this  utility  to  parse single line XML?


Sent from Samsung Mobile.

 Original message From: Darin McBeath 
<ddmcbe...@yahoo.com> Date:21/08/2016  17:44  (GMT+05:30) 
To: Hyukjin Kwon <gurwls...@gmail.com>, Jörn Franke 
<jornfra...@gmail.com> Cc: Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com>, Felix Cheung <felixcheun...@hotmail.com>, user 
<user@spark.apache.org> Subject: Re: Best way to read XML data from 
RDD 
Another option would be to look at spark-xml-utils.  We use this 
extensively in the manipulation of our XML content.

https://github.com/elsevierlabs-os/spark-xml-utils



There are quite a few examples.  Depending on your preference (and what you 
want to do), you could use xpath, xquery, or xslt to transform, extract, or 
filter.

Like mentioned below, you want to initialize the parser in a mapPartitions call 
(one of the examples shows this).

Hope this is helpful.

Darin.






From: Hyukjin Kwon <gurwls...@gmail.com>
To: Jörn Franke <jornfra...@gmail.com> 
Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>; Felix Cheung 
<felixcheun...@hotmail.com>; user <user@spark.apache.org>
Sent: Sunday, August 21, 2016 6:10 AM
Subject: Re: Best way to read XML data from RDD



Hi Diwakar,

Spark XML library can take RDD as source.

```
val df = new XmlReader()
  .withRowTag("book")
  .xmlRdd(sqlContext, rdd)
```

If performance is critical, I would also recommend to take care of creation and 
destruction of the parser.

If the parser is not serializble, then you can do the creation for each 
partition within mapPartition just like

https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325


I hope this is helpful.




2016-08-20 15:10 GMT+09:00 Jörn Franke <jornfra...@gmail.com>:

I fear the issue is that this will create and destroy a XML parser object 2 mio 
times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
>Otherwise you could just create one XML Parser object / node, but sharing this 
>among the parallel tasks on the same node is tricky.
>The other possibility could be simply more hardware ...
>
>On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> 
>wrote:
>
>
>Yes . It accepts a xml file as source but not RDD. The XML data embedded  
>inside json is streamed from kafka cluster.  So I could get it as RDD. 
>>Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
>>function  but  performance  wise I am not happy as it takes 4 minutes to 
>>parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 
>>
>>
>>
>>
>>Sent from Samsung Mobile.
>>
>>
>> Original message 
>>From: Felix Cheung <felixcheun...@hotmail.com> 
>>Date:20/08/2016  09:49  (GMT+05:30) 
>>To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> , user 
>><user@spark.apache.org> 
>>Cc: 
>>Subject: Re: Best way to read XML data from RDD 
>>
>>
>>Have you tried
>>
>>https://github.com/databricks/ spark-xml
>>?
>>
>>
>>
>>
>>
>>On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
>><diwakar.dhanusk...@gmail.com> wrote:
>>
>>
>>Hi,  
>>
>>
>>There is a RDD with json data. I could read json data using rdd.read.json . 
>>The json data has XML data in couple of key-value paris. 
>>
>>
>>Which is the best method to read and parse XML from rdd. Is there any 
>>specific xml libraries for spark. Could anyone help on this.
>>
>>
>>Thanks.


Re: Best way to read XML data from RDD

2016-08-22 Thread Diwakar Dhanuskodi
Hi Franke, 
Source  format  cannot  be  changed as  of  now  add  it  is  a pretty standard 
format working  for  years.
Yeah  creating  one  parser I can  tryout .

Sent from Samsung Mobile.

 Original message From: Jörn Franke 
<jornfra...@gmail.com> Date:20/08/2016  11:40  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: 
Felix Cheung <felixcheun...@hotmail.com>, user <user@spark.apache.org> 
Subject: Re: Best way to read XML data from RDD 
I fear the issue is that this will create and destroy a XML parser object 
2 mio times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
Otherwise you could just create one XML Parser object / node, but sharing this 
among the parallel tasks on the same node is tricky.
The other possibility could be simply more hardware ...

On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> 
wrote:

Yes . It accepts a xml file as source but not RDD. The XML data embedded  
inside json is streamed from kafka cluster.  So I could get it as RDD. 
Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
function  but  performance  wise I am not happy as it takes 4 minutes to parse 
XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 


Sent from Samsung Mobile.


 Original message 
From: Felix Cheung <felixcheun...@hotmail.com>
Date:20/08/2016 09:49 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, user 
<user@spark.apache.org>
Cc:
Subject: Re: Best way to read XML data from RDD

Have you tried

https://github.com/databricks/spark-xml
?




On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
<diwakar.dhanusk...@gmail.com> wrote:

Hi, 

There is a RDD with json data. I could read json data using rdd.read.json . The 
json data has XML data in couple of key-value paris. 

Which is the best method to read and parse XML from rdd. Is there any specific 
xml libraries for spark. Could anyone help on this.

Thanks. 

Re: Best way to read XML data from RDD

2016-08-22 Thread Diwakar Dhanuskodi

Hi Kwon, 

Was trying  out  spark  XML library .  I keep  on  getting  errors in inferring 
schema. Looks like it cannot infer single line  XML data. 

Sent from Samsung Mobile.

 Original message 
From: Hyukjin Kwon <gurwls...@gmail.com>
Date:21/08/2016 15:40 (GMT+05:30)
To: Jörn Franke <jornfra...@gmail.com>
Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, Felix Cheung 
<felixcheun...@hotmail.com>, user <user@spark.apache.org>
Subject: Re: Best way to read XML data from RDD

Hi Diwakar,

Spark XML library can take RDD as source.

```
val df = new XmlReader()
  .withRowTag("book")
  .xmlRdd(sqlContext, rdd)
```

If performance is critical, I would also recommend to take care of creation and 
destruction of the parser.

If the parser is not serializble, then you can do the creation for each 
partition within mapPartition just like

https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325

I hope this is helpful.



2016-08-20 15:10 GMT+09:00 Jörn Franke <jornfra...@gmail.com>:
I fear the issue is that this will create and destroy a XML parser object 2 mio 
times, which is very inefficient - it does not really look like a parser 
performance issue. Can't you do something about the format choice? Ask your 
supplier to deliver another format (ideally avro or sth like this?)?
Otherwise you could just create one XML Parser object / node, but sharing this 
among the parallel tasks on the same node is tricky.
The other possibility could be simply more hardware ...

On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> 
wrote:

Yes . It accepts a xml file as source but not RDD. The XML data embedded  
inside json is streamed from kafka cluster.  So I could get it as RDD. 
Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
function  but  performance  wise I am not happy as it takes 4 minutes to parse 
XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 


Sent from Samsung Mobile.


 Original message 
From: Felix Cheung <felixcheun...@hotmail.com>
Date:20/08/2016 09:49 (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, user 
<user@spark.apache.org>
Cc:
Subject: Re: Best way to read XML data from RDD

Have you tried

https://github.com/databricks/spark-xml
?




On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
<diwakar.dhanusk...@gmail.com> wrote:

Hi, 

There is a RDD with json data. I could read json data using rdd.read.json . The 
json data has XML data in couple of key-value paris. 

Which is the best method to read and parse XML from rdd. Is there any specific 
xml libraries for spark. Could anyone help on this.

Thanks. 



Re: Best way to read XML data from RDD

2016-08-19 Thread Diwakar Dhanuskodi
Yes . It accepts a xml file as source but not RDD. The XML data embedded  
inside json is streamed from kafka cluster.  So I could get it as RDD. 
Right  now  I am using  spark.xml  XML.loadstring method inside  RDD map 
function  but  performance  wise I am not happy as it takes 4 minutes to parse 
XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. 


Sent from Samsung Mobile.

 Original message From: Felix Cheung 
<felixcheun...@hotmail.com> Date:20/08/2016  09:49  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, user 
<user@spark.apache.org> Cc:  Subject: Re: Best way to 
read XML data from RDD 
Have you tried

https://github.com/databricks/spark-xml
?




On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" 
<diwakar.dhanusk...@gmail.com> wrote:

Hi, 

There is a RDD with json data. I could read json data using rdd.read.json . The 
json data has XML data in couple of key-value paris. 

Which is the best method to read and parse XML from rdd. Is there any specific 
xml libraries for spark. Could anyone help on this.

Thanks. 

Best way to read XML data from RDD

2016-08-19 Thread Diwakar Dhanuskodi
Hi,

There is a RDD with json data. I could read json data using rdd.read.json .
The json data has XML data in couple of key-value paris.

Which is the best method to read and parse XML from rdd. Is there any
specific xml libraries for spark. Could anyone help on this.

Thanks.


Spark streaming

2016-08-18 Thread Diwakar Dhanuskodi
Hi,

Is there a way to  specify in  createDirectStream to receive only last 'n' 
offsets of a specific topic and partition. I don't want to filter out in 
foreachRDD.  


Sent from Samsung Mobile.

createDirectStream parallelism

2016-08-18 Thread Diwakar Dhanuskodi
We are using createDirectStream api  to receive messages from 48
partitioned topic. I am setting up  --num-executors 48  & --executor-cores
1 in spark-submit

All partitions were parallely received  and corresponding RDDs in
foreachRDD were executed in parallel. But when I join a transformed RDD
jsonDF (code below) with another RDD, i could see that they are not
executed  in parallell for each partitions. There were more shuffle read
and writes and no of executors executing were less than the no of
partitions. I mean, no of executors were not equal to no of partitions when
join is executing.

How could I make sure to execute join in all executors. Can anyone provide
help.

kstream.foreachRDD { rdd =>

val  jsonDF = sqlContext.read.json(rdd).toDF
.
...
val metaDF = ssc.sparkContext.textfile("file").toDF

val join  = jsonDF.join(metaDF)

join.map ().count

}


RE: [Spark 2.0] ClassNotFoundException is thrown when using Hive

2016-08-18 Thread Diwakar Dhanuskodi
Hi

Can  you  cross check by providing same library path in --jars of spark-submit 
and run .


Sent from Samsung Mobile.

 Original message From: "颜发才(Yan Facai)" 
 Date:18/08/2016  15:17  (GMT+05:30) 
To: "user.spark"  Cc:  
Subject: [Spark 2.0] ClassNotFoundException is thrown when using 
Hive 
Hi, all.

I copied hdfs-site.xml, core-site.xml and hive-site.xml to $SPARK_HOME/conf. 
And spark-submit is used to submit task to yarn, and run as **client** mode. 
However, ClassNotFoundException is thrown.

some details of logs are list below:
```
16/08/12 17:07:32 INFO hive.HiveUtils: Initializing HiveMetastoreConnection 
version 0.13.1 using 
file:/data0/facai/lib/hive-0.13.1/lib:file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
16/08/12 17:07:32 ERROR yarn.ApplicationMaster: User class threw exception: 
java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: 
org/apache/hadoop/hive/ql/session/SessionState when creating Hive client using 
classpath: file:/data0/facai/lib/hive-0.13.1/lib, 
file:/data0/facai/lib/hadoop-2.4.1/share/hadoop
```

In fact, all the jars needed by hive is  in the directory:
```Bash
[hadoop@h107713699 spark_test]$ ls /data0/facai/lib/hive-0.13.1/lib/ | grep hive
hive-ant-0.13.1.jar
hive-beeline-0.13.1.jar
hive-cli-0.13.1.jar
hive-common-0.13.1.jar
...
```

So, my question is:
why spark cannot find the jars needed? 

Any help will be appreciate, thanks.



Re: KafkaUtils.createStream not picking smallest offset

2016-08-13 Thread Diwakar Dhanuskodi

Not using  check  pointing now.  Source is  producing 1.2million messages to 
topic. We  are  using  zookeeper offsets for other  downstreams  too. That's  
the  reason  going  with  createstream which  stores offsets in zookeeper. 


Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
<c...@koeninger.org> Date:12/08/2016  23:42  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, 
user@spark.apache.org Cc:  Subject: Re: 
KafkaUtils.createStream not picking smallest offset 
Are you checkpointing?


Beyond that, why are you using createStream instead of createDirectStream

On Fri, Aug 12, 2016 at 12:32 PM, Diwakar Dhanuskodi
<diwakar.dhanusk...@gmail.com> wrote:
> Okay .
> I could  delete  the  consumer group in zookeeper and  start  again to  re
> use same consumer group name. But  this  is  not  working  though . Somehow
> createstream  is  picking  the  offset from  some where other than
> /consumers/ from  zookeeper
>
>
> Sent from Samsung Mobile.
>
>
>
>
>
>
>
>
>  Original message 
> From: Cody Koeninger <c...@koeninger.org>
> Date:12/08/2016 18:02 (GMT+05:30)
> To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
> Cc:
> Subject: Re: KafkaUtils.createStream not picking smallest offset
>
> Auto offset reset only applies if there aren't offsets available otherwise.
>
> The old high level consumer stores offsets in zookeeper.
>
> If you want to make sure you're starting clean, use a new consumer group
> I'd.
>
> On Aug 12, 2016 3:35 AM, "Diwakar Dhanuskodi" <diwakar.dhanusk...@gmail.com>
> wrote:
>>
>>
>> Hi,
>> We are  using  spark  1.6.1 and  kafka 0.9.
>>
>> KafkaUtils.createStream is  showing strange behaviour. Though
>> auto.offset.reset is  set  to  smallest .  Whenever we  need  to  restart
>> the  stream it  is  picking up  the  latest  offset which  is not  expected.
>> Do  we  need  to  set  any  other  properties ?.
>>
>> createDirectStream works fine  in  this  above  case.
>>
>>
>> Sent from Samsung Mobile.


KafkaUtils.createStream not picking smallest offset

2016-08-12 Thread Diwakar Dhanuskodi

Hi,
We are  using  spark  1.6.1 and  kafka 0.9.

KafkaUtils.createStream is  showing strange behaviour. Though  
auto.offset.reset is  set  to  smallest .  Whenever we  need  to  restart  the  
stream it  is  picking up  the  latest  offset which  is not  expected.
Do  we  need  to  set  any  other  properties ?.

createDirectStream works fine  in  this  above  case.


Sent from Samsung Mobile.

Re: Spark streaming not processing messages from partitioned topics

2016-08-11 Thread Diwakar Dhanuskodi
Figured it out. All I am doing wrong is testing it out in pseudo node vm
with 1 core. The tasks were hanging out for cpu.
In production cluster this works just fine.


On Thu, Aug 11, 2016 at 12:45 AM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> Checked executor logs and UI . There is no error message or something like
> that.  when there is any action , it is  waiting .
> There are data in partitions. I could use simple-consumer-shell and print
> all data in console.  Am I doing anything wrong in foreachRDD?.
> This just works fine  with single partitioned topic,
>
> On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> zookeeper.connect is irrelevant.
>>
>> Did you look at your executor logs?
>> Did you look at the UI for the (probably failed) stages?
>> Are you actually producing data into all of the kafka partitions?
>> If you use kafka-simple-consumer-shell.sh to read that partition, do
>> you get any data?
>>
>> On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi
>> <diwakar.dhanusk...@gmail.com> wrote:
>> > Hi Cody,
>> >
>> > Just added zookeeper.connect to kafkaparams . It couldn't come out of
>> batch
>> > window. Other batches are queued. I could see foreach(println) of
>> dataFrame
>> > printing one of partition's data and not the other.
>> > Couldn't see any  errors from log.
>> >
>> > val brokers = "localhost:9092,localhost:9093"
>> > val sparkConf = new
>> > SparkConf().setAppName("KafkaWeather").setMaster("local[5]")
>> //spark://localhost:7077
>> > val sc = new SparkContext(sparkConf)
>> > val ssc = new StreamingContext(sc, Seconds(1))
>> > val kafkaParams = Map[String,
>> > String]("bootstrap.servers"->"localhost:9093,localhost:9092"
>> ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","
>> group.id"->"xyz")
>> > val topics = "test"
>> > val topicsSet = topics.split(",").toSet
>> > val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder,
>> > StringDecoder](ssc, kafkaParams, topicsSet)
>> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>> > import sqlContext.implicits._
>> > messages.foreachRDD(rdd => {
>> >   if (rdd.isEmpty()) {
>> > println("Failed to get data from Kafka. Please check that the Kafka
>> > producer is streaming data.")
>> > System.exit(-1)
>> >   }
>> >
>> >val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
>> >   dataframe.foreach(println)
>> >  println( "$$$", dataframe.count())
>> >   })
>> > Logs:
>> >
>> > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
>> > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
>> > library for your platform... using builtin-java classes where applicable
>> > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera
>> resolves to
>> > a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
>> interface
>> > eth1)
>> > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
>> > another address
>> > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
>> > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to:
>> cloudera
>> > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
>> > disabled; ui acls disabled; users with view permissions: Set(cloudera);
>> > users with modify permissions: Set(cloudera)
>> > 16/08/10 18:16:25 INFO Utils: Successfully started service
>> 'sparkDriver' on
>> > port 45031.
>> > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
>> > 16/08/10 18:16:26 INFO Remoting: Starting remoting
>> > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on
>> addresses
>> > :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
>> > 16/08/10 18:16:26 INFO Utils: Successfully started service
>> > 'sparkDriverActorSystem' on port 56638.
>> > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
>> > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
>> > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
>> > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
&g

Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Diwakar Dhanuskodi
Checked executor logs and UI . There is no error message or something like
that.  when there is any action , it is  waiting .
There are data in partitions. I could use simple-consumer-shell and print
all data in console.  Am I doing anything wrong in foreachRDD?.
This just works fine  with single partitioned topic,

On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger <c...@koeninger.org> wrote:

> zookeeper.connect is irrelevant.
>
> Did you look at your executor logs?
> Did you look at the UI for the (probably failed) stages?
> Are you actually producing data into all of the kafka partitions?
> If you use kafka-simple-consumer-shell.sh to read that partition, do
> you get any data?
>
> On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi
> <diwakar.dhanusk...@gmail.com> wrote:
> > Hi Cody,
> >
> > Just added zookeeper.connect to kafkaparams . It couldn't come out of
> batch
> > window. Other batches are queued. I could see foreach(println) of
> dataFrame
> > printing one of partition's data and not the other.
> > Couldn't see any  errors from log.
> >
> > val brokers = "localhost:9092,localhost:9093"
> > val sparkConf = new
> > SparkConf().setAppName("KafkaWeather").setMaster("
> local[5]")//spark://localhost:7077
> > val sc = new SparkContext(sparkConf)
> > val ssc = new StreamingContext(sc, Seconds(1))
> > val kafkaParams = Map[String,
> > String]("bootstrap.servers"->"localhost:9093,localhost:9092"
> ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181","
> group.id"->"xyz")
> > val topics = "test"
> > val topicsSet = topics.split(",").toSet
> > val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder,
> > StringDecoder](ssc, kafkaParams, topicsSet)
> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> > import sqlContext.implicits._
> > messages.foreachRDD(rdd => {
> >   if (rdd.isEmpty()) {
> > println("Failed to get data from Kafka. Please check that the Kafka
> > producer is streaming data.")
> > System.exit(-1)
> >   }
> >
> >val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()
> >   dataframe.foreach(println)
> >  println( "$$$", dataframe.count())
> >   })
> > Logs:
> >
> > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2
> > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop
> > library for your platform... using builtin-java classes where applicable
> > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera
> resolves to
> > a loopback address: 127.0.0.1; using 192.168.126.131 instead (on
> interface
> > eth1)
> > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> > another address
> > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera
> > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera
> > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication
> > disabled; ui acls disabled; users with view permissions: Set(cloudera);
> > users with modify permissions: Set(cloudera)
> > 16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver'
> on
> > port 45031.
> > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started
> > 16/08/10 18:16:26 INFO Remoting: Starting remoting
> > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses
> > :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638]
> > 16/08/10 18:16:26 INFO Utils: Successfully started service
> > 'sparkDriverActorSystem' on port 56638.
> > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker
> > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster
> > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at
> > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee
> > 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity
> 511.5
> > MB
> > 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator
> > 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on
> port
> > 4040.
> > 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at
> > http://192.168.126.131:4040
> > 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is
> > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd-
> 2b2a4e68-2952-41b0-a11b-f07860682749
> > 16/08/10 18:16:27 INFO HttpServer: Starting HTTP

Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Diwakar Dhanuskodi
s-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-streaming-kafka-assembly_2.10-1.6.2.jar
to class loader
16/08/10 18:16:33 INFO KafkaRDD: Computing topic test, partition 0 offsets
0 -> 20
16/08/10 18:16:33 INFO VerifiableProperties: Verifying properties
16/08/10 18:16:33 INFO VerifiableProperties: Property auto.offset.reset is
overridden to smallest
16/08/10 18:16:33 INFO VerifiableProperties: Property group.id is
overridden to xyz
16/08/10 18:16:33 INFO VerifiableProperties: Property zookeeper.connect is
overridden to localhost:2181
16/08/10 18:16:34 INFO JobScheduler: Added jobs for time 1470833194000 ms
16/08/10 18:16:34 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0).
13366 bytes result sent to driver
16/08/10 18:16:34 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
0) in 2423 ms on localhost (1/2)
16/08/10 18:16:35 INFO JobScheduler: Added jobs for time 1470833195000 ms
16/08/10 18:16:36 INFO JobScheduler: Added jobs for time 1470833196000 ms
16/08/10 18:16:37 INFO JobScheduler: Added jobs for time 1470833197000 ms
16/08/10 18:16:38 INFO JobScheduler: Added jobs for time 1470833198000 ms
16/08/10 18:16:39 INFO JobScheduler: Added jobs for time 1470833199000 ms
16/08/10 18:16:40 INFO JobScheduler: Added jobs for time 147083320 ms
16/08/10 18:16:41 INFO JobScheduler: Added jobs for time 1470833201000 ms
16/08/10 18:16:42 INFO JobScheduler: Added jobs for time 1470833202000 ms
16/08/10 18:16:43 INFO JobScheduler: Added jobs for time 1470833203000 ms
16/08/10 18:16:44 INFO JobScheduler: Added jobs for time 1470833204000 ms
16/08/10 18:16:45 INFO JobScheduler: Added jobs for time 1470833205000 ms
16/08/10 18:16:46 INFO JobScheduler: Added jobs for time 1470833206000 ms
16/08/10 18:16:47 INFO JobScheduler: Added jobs for time 1470833207000 ms
16/08/10 18:16:48 INFO JobScheduler: Added jobs for time 1470833208000 ms
16/08/10 18:16:49 INFO JobScheduler: Added jobs for time 1470833209000 ms
16/08/10 18:16:50 INFO JobScheduler: Added jobs for time 147083321 ms
16/08/10 18:16:51 INFO JobScheduler: Added jobs for time 1470833211000 ms
16/08/10 18:16:52 INFO JobScheduler: Added jobs for time 1470833212000 ms
16/08/10 18:16:53 INFO JobScheduler: Added jobs for time 1470833213000 ms
16/08/10 18:16:54 INFO JobSch

On Wed, Aug 10, 2016 at 5:42 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Those logs you're posting are from right after your failure, they don't
> include what actually went wrong when attempting to read json. Look at your
> logs more carefully.
> On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi" <
> diwakar.dhanusk...@gmail.com> wrote:
>
>> Hi Siva,
>>
>> With below code, it is stuck up at
>> * sqlContext.read.json(rdd.map(_._2)).toDF()*
>> There are two partitions in  topic.
>> I am running spark 1.6.2
>>
>> val topics = "topic.name"
>> val brokers = "localhost:9092"
>> val topicsSet = topics.split(",").toSet
>> val sparkConf = new SparkConf().setAppName("KafkaW
>> eather").setMaster("local[5]")//spark://localhost:7077
>> val sc = new SparkContext(sparkConf)
>> val ssc = new StreamingContext(sc, Seconds(60))
>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "
>> group.id" -> "xyz","auto.offset.reset"->"smallest")
>> val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>> messages.foreachRDD(rdd => {
>>   if (rdd.isEmpty()) {
>> println("Failed to get data from Kafka. Please check that the Kafka
>> producer is streaming data.")
>> System.exit(-1)
>>   }
>>   val sqlContext = org.apache.spark.sql.SQLContex
>> t.getOrCreate(rdd.sparkContext)
>>   *val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()*
>>   dataframe.foreach(println)
>>
>>   })
>>
>>
>> Below are logs,
>>
>> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at
>> todelete.scala:34) failed in 110.776 s
>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
>> stopped! Dropping event SparkListenerStageCompleted(or
>> g.apache.spark.scheduler.StageInfo@6d8ff688)
>> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already
>> stopped! Dropping event SparkListenerJobEnd(0,14708122
>> 71971,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because
>> SparkContext was shut down))
>> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint:
>> MapOutputTrackerMasterEndpoint stopped!
>> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared
>> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped
>> 16/08/10 12:27

Re: Spark streaming not processing messages from partitioned topics

2016-08-10 Thread Diwakar Dhanuskodi
e 0.0 (TID 0)
16/08/10 12:29:01 INFO Executor: Fetching
http://192.168.126.131:58957/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
with timestamp 1470812282187
16/08/10 12:29:01 INFO Utils: Fetching
http://192.168.126.131:58957/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar
to
/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp3730815508296553254.tmp
16/08/10 12:29:01 INFO Executor: Adding
file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-streaming-kafka-assembly_2.10-1.6.2.jar
to class loader
16/08/10 12:29:01 INFO Executor: Fetching
http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with
timestamp 1470812282398
16/08/10 12:29:01 INFO Utils: Fetching
http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar to
/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp411333926628523179.tmp
16/08/10 12:29:01 INFO Executor: Adding
file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-assembly-1.6.2-hadoop2.6.0.jar
to class loader
16/08/10 12:29:01 INFO Executor: Fetching
http://192.168.126.131:58957/jars/pain.jar with timestamp 1470812282402
16/08/10 12:29:01 INFO Utils: Fetching
http://192.168.126.131:58957/jars/pain.jar to
/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp100401525133805542.tmp
16/08/10 12:29:02 INFO Executor: Adding
file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/pain.jar
to class loader
16/08/10 12:29:02 INFO KafkaRDD: Computing topic topic.name, partition 0
offsets 0 -> 8
16/08/10 12:29:02 INFO VerifiableProperties: Verifying properties
16/08/10 12:29:02 INFO VerifiableProperties: Property auto.offset.reset is
overridden to smallest
16/08/10 12:29:02 INFO VerifiableProperties: Property group.id is
overridden to xyz
16/08/10 12:29:02 INFO VerifiableProperties: Property zookeeper.connect is
overridden to
16/08/10 12:29:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0).
13366 bytes result sent to driver
16/08/10 12:29:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID
0) in 2380 ms on localhost (1/2)
16/08/10 12:30:00 INFO JobScheduler: Added jobs for time 147081240 ms
16/08/10 12:31:00 INFO JobScheduler: Added jobs for time 147081246 ms
16/08/10 12:32:00 INFO JobScheduler: Added jobs for time 147081252 ms
16/08/10 12:33:00 INFO JobScheduler: Added jobs for time 147081258 ms
16/08/10 12:34:00 INFO JobScheduler: Added jobs for time 147081264 ms


On Wed, Aug 10, 2016 at 10:26 AM, Diwakar Dhanuskodi <
diwakar.dhanusk...@gmail.com> wrote:

> Hi Siva,
>
> Does topic  has partitions? which version of Spark you are using?
>
> On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com>
> wrote:
>
>> Hi,
>>
>> Here is a working example I did.
>>
>> HTH
>>
>> Regards,
>>
>> Sivakumaran S
>>
>> val topics = "test"
>> val brokers = "localhost:9092"
>> val topicsSet = topics.split(",").toSet
>> val sparkConf = new SparkConf().setAppName("KafkaW
>> eatherCalc").setMaster("local") //spark://localhost:7077
>> val sc = new SparkContext(sparkConf)
>> val ssc = new StreamingContext(sc, Seconds(60))
>> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
>> val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>> messages.foreachRDD(rdd => {
>>   if (rdd.isEmpty()) {
>> println("Failed to get data from Kafka. Please check that the Kafka
>> producer is streaming data.")
>> System.exit(-1)
>>   }
>>   val sqlContext = org.apache.spark.sql.SQLContex
>> t.getOrCreate(rdd.sparkContext)
>>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
>>   //Process your DF as required here on
>> }
>>
>>
>>
>> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi <
>> diwakar.dhanusk...@gmail.com> wrote:
>>
>> Hi,
>>
>> I am reading json messages from kafka . Topics has 2 partitions. When
>> running streaming job using spark-submit, I could see that * val
>> dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes indefinitely.
>> Am I doing something wrong here. Below is code .This environment is
>> cloudera sandbox env. Same issue in hadoop production cluster mode except
>> that it is restricted thats why tried to reproduce issue in Cloudera
>> sandbox. Kafka 0.10 and  Spark 1.4.
>>
>> val kafkaParams = M

Re: Spark streaming not processing messages from partitioned topics

2016-08-09 Thread Diwakar Dhanuskodi
Hi Siva,

Does topic  has partitions? which version of Spark you are using?

On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com> wrote:

> Hi,
>
> Here is a working example I did.
>
> HTH
>
> Regards,
>
> Sivakumaran S
>
> val topics = "test"
> val brokers = "localhost:9092"
> val topicsSet = topics.split(",").toSet
> val sparkConf = new 
> SparkConf().setAppName("KafkaWeatherCalc").setMaster("local")
> //spark://localhost:7077
> val sc = new SparkContext(sparkConf)
> val ssc = new StreamingContext(sc, Seconds(60))
> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
> messages.foreachRDD(rdd => {
>   if (rdd.isEmpty()) {
> println("Failed to get data from Kafka. Please check that the Kafka
> producer is streaming data.")
> System.exit(-1)
>   }
>   val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd.
> sparkContext)
>   val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF()
>   //Process your DF as required here on
> }
>
>
>
> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
>
> Hi,
>
> I am reading json messages from kafka . Topics has 2 partitions. When
> running streaming job using spark-submit, I could see that * val
> dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes indefinitely.
> Am I doing something wrong here. Below is code .This environment is
> cloudera sandbox env. Same issue in hadoop production cluster mode except
> that it is restricted thats why tried to reproduce issue in Cloudera
> sandbox. Kafka 0.10 and  Spark 1.4.
>
> val kafkaParams = Map[String,String]("bootstrap.
> servers"->"localhost:9093,localhost:9092", "group.id" ->
> "xyz","auto.offset.reset"->"smallest")
> val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> val ssc = new StreamingContext(conf, Seconds(1))
>
> val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>
> val topics = Set("gpp.minf")
> val kafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder,StringDecoder](ssc, kafkaParams, topics)
>
> kafkaStream.foreachRDD(
>   rdd => {
> if (rdd.count > 0){
>* val dataFrame = sqlContext.read.json(rdd.map(_._2)) *
>dataFrame.printSchema()
> //dataFrame.foreach(println)
> }
> }
>
>
>


Re: Spark streaming not processing messages from partitioned topics

2016-08-09 Thread Diwakar Dhanuskodi
It stops working at sqlContext.read.json(rdd.map(_._2)) . Topics without
partitions is working fine. Do I need to set any other configs
val kafkaParams =
Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", "
group.id" -> "xyz","auto.offset.reset"->"smallest")
Spark version is 1.6.2

kafkaStream.foreachRDD(
  rdd => {
   rdd.foreach(println)
   val dataFrame = sqlContext.read.json(rdd.map(_._2))
   dataFrame.foreach(println)
}
)

On Wed, Aug 10, 2016 at 9:05 AM, Cody Koeninger <c...@koeninger.org> wrote:

> No, you don't need a conditional.  read.json on an empty rdd will
> return an empty dataframe.  Foreach on an empty dataframe or an empty
> rdd won't do anything (a task will still get run, but it won't do
> anything).
>
> Leave the conditional out.  Add one thing at a time to the working
> rdd.foreach example and see when it stops working, then take a closer
> look at the logs.
>
>
> On Tue, Aug 9, 2016 at 10:20 PM, Diwakar Dhanuskodi
> <diwakar.dhanusk...@gmail.com> wrote:
> > Hi Cody,
> >
> > Without conditional . It is going with fine. But any processing inside
> > conditional get on to waiting (or) something.
> > Facing this issue with partitioned topics. I would need conditional to
> skip
> > processing when batch is empty.
> > kafkaStream.foreachRDD(
> >   rdd => {
> >
> >val dataFrame = sqlContext.read.json(rdd.map(_._2))
> >/*if (dataFrame.count() > 0) {
> >dataFrame.foreach(println)
> >}
> >else
> >{
> >  println("Empty DStream ")
> >}*/
> > })
> >
> > On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>
> >> Take out the conditional and the sqlcontext and just do
> >>
> >> rdd => {
> >>   rdd.foreach(println)
> >>
> >>
> >> as a base line to see if you're reading the data you expect
> >>
> >> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi
> >> <diwakar.dhanusk...@gmail.com> wrote:
> >> > Hi,
> >> >
> >> > I am reading json messages from kafka . Topics has 2 partitions. When
> >> > running streaming job using spark-submit, I could see that  val
> >> > dataFrame =
> >> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
> >> > something wrong here. Below is code .This environment is cloudera
> >> > sandbox
> >> > env. Same issue in hadoop production cluster mode except that it is
> >> > restricted thats why tried to reproduce issue in Cloudera sandbox.
> Kafka
> >> > 0.10 and  Spark 1.4.
> >> >
> >> > val kafkaParams =
> >> > Map[String,String]("bootstrap.servers"->"localhost:9093,
> localhost:9092",
> >> > "group.id" -> "xyz","auto.offset.reset"->"smallest")
> >> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> >> > val ssc = new StreamingContext(conf, Seconds(1))
> >> >
> >> > val sqlContext = new org.apache.spark.sql.
> SQLContext(ssc.sparkContext)
> >> >
> >> > val topics = Set("gpp.minf")
> >> > val kafkaStream = KafkaUtils.createDirectStream[String, String,
> >> > StringDecoder,StringDecoder](ssc, kafkaParams, topics)
> >> >
> >> > kafkaStream.foreachRDD(
> >> >   rdd => {
> >> > if (rdd.count > 0){
> >> > val dataFrame = sqlContext.read.json(rdd.map(_._2))
> >> >dataFrame.printSchema()
> >> > //dataFrame.foreach(println)
> >> > }
> >> > }
> >
> >
>


Re: Spark streaming not processing messages from partitioned topics

2016-08-09 Thread Diwakar Dhanuskodi
Hi Cody,

Without conditional . It is going with fine. But any processing inside
conditional get on to waiting (or) something.
Facing this issue with partitioned topics. I would need conditional to skip
processing when batch is empty.
kafkaStream.foreachRDD(
  rdd => {

   val dataFrame = sqlContext.read.json(rdd.map(_._2))
   /*if (dataFrame.count() > 0) {
   dataFrame.foreach(println)
   }
   else
   {
 println("Empty DStream ")
   }*/
})

On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Take out the conditional and the sqlcontext and just do
>
> rdd => {
>   rdd.foreach(println)
>
>
> as a base line to see if you're reading the data you expect
>
> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi
> <diwakar.dhanusk...@gmail.com> wrote:
> > Hi,
> >
> > I am reading json messages from kafka . Topics has 2 partitions. When
> > running streaming job using spark-submit, I could see that  val
> dataFrame =
> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing
> > something wrong here. Below is code .This environment is cloudera sandbox
> > env. Same issue in hadoop production cluster mode except that it is
> > restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka
> > 0.10 and  Spark 1.4.
> >
> > val kafkaParams =
> > Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092",
> > "group.id" -> "xyz","auto.offset.reset"->"smallest")
> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
> > val ssc = new StreamingContext(conf, Seconds(1))
> >
> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
> >
> > val topics = Set("gpp.minf")
> > val kafkaStream = KafkaUtils.createDirectStream[String, String,
> > StringDecoder,StringDecoder](ssc, kafkaParams, topics)
> >
> > kafkaStream.foreachRDD(
> >   rdd => {
> > if (rdd.count > 0){
> > val dataFrame = sqlContext.read.json(rdd.map(_._2))
> >dataFrame.printSchema()
> > //dataFrame.foreach(println)
> > }
> > }
>


Spark streaming not processing messages from partitioned topics

2016-08-09 Thread Diwakar Dhanuskodi
Hi,

I am reading json messages from kafka . Topics has 2 partitions. When
running streaming job using spark-submit, I could see that * val dataFrame
= sqlContext.read.json(rdd.map(_._2)) *executes indefinitely. Am I doing
something wrong here. Below is code .This environment is cloudera sandbox
env. Same issue in hadoop production cluster mode except that it is
restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka
0.10 and  Spark 1.4.

val kafkaParams =
Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092",
"group.id" -> "xyz","auto.offset.reset"->"smallest")
val conf = new SparkConf().setMaster("local[3]").setAppName("topic")
val ssc = new StreamingContext(conf, Seconds(1))

val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)

val topics = Set("gpp.minf")
val kafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder,StringDecoder](ssc, kafkaParams, topics)

kafkaStream.foreachRDD(
  rdd => {
if (rdd.count > 0){
   * val dataFrame = sqlContext.read.json(rdd.map(_._2)) *
   dataFrame.printSchema()
//dataFrame.foreach(println)
}
}


Re: Spark streaming takes longer time to read json into dataframes

2016-07-19 Thread Diwakar Dhanuskodi
Okay got it regarding parallelism that  you  are  saying .

Yes , We use dataframe to infer schema and process data. The json schema has 
xml data as one of key value pair.  Xml data needs to be processed  in 
foreachRDD. Json schema doesn't  change often. 

Regards,
Diwakar


Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
<c...@koeninger.org> Date:19/07/2016  20:49  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: 
Martin Eden <martineden...@gmail.com>, user <user@spark.apache.org> 
Subject: Re: Spark streaming takes longer time to read json into 
dataframes 
Yes, if you need more parallelism, you need to either add more kafka
partitions or shuffle in spark.

Do you actually need the dataframe api, or are you just using it as a
way to infer the json schema?  Inferring the schema is going to
require reading through the RDD once before doing any other work.  You
may be better off defining your schema in advance.

On Sun, Jul 17, 2016 at 9:33 PM, Diwakar Dhanuskodi
<diwakar.dhanusk...@gmail.com> wrote:
> Hi,
>
> Repartition would  create  shuffle  over  network  which  I should  avoid
> to  reduce processing time because the size of messages at most in a batch
> will  be  5G.
>  Partitioning topic and parallelize receiving in Direct Stream might do  the
> trick.
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: Martin Eden <martineden...@gmail.com>
> Date:16/07/2016 14:01 (GMT+05:30)
> To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
> Cc: user <user@spark.apache.org>
> Subject: Re: Spark streaming takes longer time to read json into dataframes
>
> Hi,
>
> I would just do a repartition on the initial direct DStream since otherwise
> each RDD in the stream has exactly as many partitions as you have partitions
> in the Kafka topic (in your case 1). Like that receiving is still done in
> only 1 thread but at least the processing further down is done in parallel.
>
> If you want to parallelize your receiving as well I would partition my Kafka
> topic and then the RDDs in the initial DStream will have as many partitions
> as you set in Kafka.
>
> Have you seen this?
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> M
>
> On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi
> <diwakar.dhanusk...@gmail.com> wrote:
>>
>>
>> -- Forwarded message --
>> From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
>> Date: Sat, Jul 16, 2016 at 9:30 AM
>> Subject: Re: Spark streaming takes longer time to read json into
>> dataframes
>> To: Jean Georges Perrin <j...@jgp.net>
>>
>>
>> Hello,
>>
>> I need it on memory.  Increased executor memory to 25G and executor cores
>> to 3. Got same result. There is always one task running under executor for
>> rdd.read.json() because rdd partition size is 1 . Doing hash partitioning
>> inside foreachRDD is a good approach?
>>
>> Regards,
>> Diwakar.
>>
>> On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin <j...@jgp.net> wrote:
>>>
>>> Do you need it on disk or just push it to memory? Can you try to increase
>>> memory or # of cores (I know it sounds basic)
>>>
>>> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi
>>> > <diwakar.dhanusk...@gmail.com> wrote:
>>> >
>>> > Hello,
>>> >
>>> > I have 400K json messages pulled from Kafka into spark streaming using
>>> > DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
>>> > single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
>>> > convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
>>> > dataframe.
>>> >
>>> > I am running in Yarn client mode with executor memory as 15G and
>>> > executor cores as 2.
>>> >
>>> > Caching rdd before converting into dataframe  doesn't change processing
>>> > time. Whether introducing hash partitions inside foreachRDD  will help? 
>>> > (or)
>>> > Will partitioning topic and have more than one DirectStream help?. How 
>>> > can I
>>> > approach this situation to reduce time in converting to dataframe..
>>> >
>>> > Regards,
>>> > Diwakar.
>>>
>>
>>
>


Re: Spark streaming takes longer time to read json into dataframes

2016-07-19 Thread Diwakar Dhanuskodi
Okay got that bout parallelism that  you  are  saying .

Yes , We use dataframe to infer schema and process data. The json schema has 
xml data as one of key value pair.  Xml data needs to be processed  in 
foreachRDD. Json schema doesn't  change often. 

Regards,
Diwakar.




Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
<c...@koeninger.org> Date:19/07/2016  20:49  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: 
Martin Eden <martineden...@gmail.com>, user <user@spark.apache.org> 
Subject: Re: Spark streaming takes longer time to read json into 
dataframes 
Yes, if you need more parallelism, you need to either add more kafka
partitions or shuffle in spark.

Do you actually need the dataframe api, or are you just using it as a
way to infer the json schema?  Inferring the schema is going to
require reading through the RDD once before doing any other work.  You
may be better off defining your schema in advance.

On Sun, Jul 17, 2016 at 9:33 PM, Diwakar Dhanuskodi
<diwakar.dhanusk...@gmail.com> wrote:
> Hi,
>
> Repartition would  create  shuffle  over  network  which  I should  avoid
> to  reduce processing time because the size of messages at most in a batch
> will  be  5G.
>  Partitioning topic and parallelize receiving in Direct Stream might do  the
> trick.
>
>
> Sent from Samsung Mobile.
>
>
>  Original message 
> From: Martin Eden <martineden...@gmail.com>
> Date:16/07/2016 14:01 (GMT+05:30)
> To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
> Cc: user <user@spark.apache.org>
> Subject: Re: Spark streaming takes longer time to read json into dataframes
>
> Hi,
>
> I would just do a repartition on the initial direct DStream since otherwise
> each RDD in the stream has exactly as many partitions as you have partitions
> in the Kafka topic (in your case 1). Like that receiving is still done in
> only 1 thread but at least the processing further down is done in parallel.
>
> If you want to parallelize your receiving as well I would partition my Kafka
> topic and then the RDDs in the initial DStream will have as many partitions
> as you set in Kafka.
>
> Have you seen this?
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> M
>
> On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi
> <diwakar.dhanusk...@gmail.com> wrote:
>>
>>
>> -- Forwarded message --
>> From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
>> Date: Sat, Jul 16, 2016 at 9:30 AM
>> Subject: Re: Spark streaming takes longer time to read json into
>> dataframes
>> To: Jean Georges Perrin <j...@jgp.net>
>>
>>
>> Hello,
>>
>> I need it on memory.  Increased executor memory to 25G and executor cores
>> to 3. Got same result. There is always one task running under executor for
>> rdd.read.json() because rdd partition size is 1 . Doing hash partitioning
>> inside foreachRDD is a good approach?
>>
>> Regards,
>> Diwakar.
>>
>> On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin <j...@jgp.net> wrote:
>>>
>>> Do you need it on disk or just push it to memory? Can you try to increase
>>> memory or # of cores (I know it sounds basic)
>>>
>>> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi
>>> > <diwakar.dhanusk...@gmail.com> wrote:
>>> >
>>> > Hello,
>>> >
>>> > I have 400K json messages pulled from Kafka into spark streaming using
>>> > DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
>>> > single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
>>> > convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
>>> > dataframe.
>>> >
>>> > I am running in Yarn client mode with executor memory as 15G and
>>> > executor cores as 2.
>>> >
>>> > Caching rdd before converting into dataframe  doesn't change processing
>>> > time. Whether introducing hash partitions inside foreachRDD  will help? 
>>> > (or)
>>> > Will partitioning topic and have more than one DirectStream help?. How 
>>> > can I
>>> > approach this situation to reduce time in converting to dataframe..
>>> >
>>> > Regards,
>>> > Diwakar.
>>>
>>
>>
>


Re: Spark streaming takes longer time to read json into dataframes

2016-07-17 Thread Diwakar Dhanuskodi
Hi,

Repartition would  create  shuffle  over  network  which  I should  avoid  to  
reduce processing time because the size of messages at most in a batch will  be 
 5G. 
 Partitioning topic and parallelize receiving in Direct Stream might do  the  
trick.


Sent from Samsung Mobile.

 Original message From: Martin Eden 
<martineden...@gmail.com> Date:16/07/2016  14:01  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: 
user <user@spark.apache.org> Subject: Re: Spark streaming takes 
longer time to read json into dataframes 
Hi,

I would just do a repartition on the initial direct DStream since otherwise 
each RDD in the stream has exactly as many partitions as you have partitions in 
the Kafka topic (in your case 1). Like that receiving is still done in only 1 
thread but at least the processing further down is done in parallel. 

If you want to parallelize your receiving as well I would partition my Kafka 
topic and then the RDDs in the initial DStream will have as many partitions as 
you set in Kafka.

Have you seen this? 
http://spark.apache.org/docs/latest/streaming-kafka-integration.html

M

On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:

-- Forwarded message --
From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Date: Sat, Jul 16, 2016 at 9:30 AM
Subject: Re: Spark streaming takes longer time to read json into dataframes
To: Jean Georges Perrin <j...@jgp.net>


Hello, 

I need it on memory.  Increased executor memory to 25G and executor cores to 3. 
Got same result. There is always one task running under executor for 
rdd.read.json() because rdd partition size is 1 . Doing hash partitioning 
inside foreachRDD is a good approach?

Regards, 
Diwakar. 

On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin <j...@jgp.net> wrote:
Do you need it on disk or just push it to memory? Can you try to increase 
memory or # of cores (I know it sounds basic)

> On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi 
> <diwakar.dhanusk...@gmail.com> wrote:
>
> Hello,
>
> I have 400K json messages pulled from Kafka into spark streaming using 
> DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is 
> single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to 
> convert  rdd into dataframe. It takes almost 2.3 minutes to convert into 
> dataframe.
>
> I am running in Yarn client mode with executor memory as 15G and executor 
> cores as 2.
>
> Caching rdd before converting into dataframe  doesn't change processing time. 
> Whether introducing hash partitions inside foreachRDD  will help? (or) Will 
> partitioning topic and have more than one DirectStream help?. How can I 
> approach this situation to reduce time in converting to dataframe..
>
> Regards,
> Diwakar.






Fwd: Spark streaming takes longer time to read json into dataframes

2016-07-15 Thread Diwakar Dhanuskodi
-- Forwarded message --
From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Date: Sat, Jul 16, 2016 at 9:30 AM
Subject: Re: Spark streaming takes longer time to read json into dataframes
To: Jean Georges Perrin <j...@jgp.net>


Hello,

I need it on memory.  Increased executor memory to 25G and executor cores
to 3. Got same result. There is always one task running under executor for
rdd.read.json() because rdd partition size is 1 . Doing hash partitioning
inside foreachRDD is a good approach?

Regards,
Diwakar.

On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin <j...@jgp.net> wrote:

> Do you need it on disk or just push it to memory? Can you try to increase
> memory or # of cores (I know it sounds basic)
>
> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi <
> diwakar.dhanusk...@gmail.com> wrote:
> >
> > Hello,
> >
> > I have 400K json messages pulled from Kafka into spark streaming using
> DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
> single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
> convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
> dataframe.
> >
> > I am running in Yarn client mode with executor memory as 15G and
> executor cores as 2.
> >
> > Caching rdd before converting into dataframe  doesn't change processing
> time. Whether introducing hash partitions inside foreachRDD  will help?
> (or) Will partitioning topic and have more than one DirectStream help?. How
> can I approach this situation to reduce time in converting to dataframe..
> >
> > Regards,
> > Diwakar.
>
>


Spark streaming takes longer time to read json into dataframes

2016-07-15 Thread Diwakar Dhanuskodi
Hello,

I have 400K json messages pulled from Kafka into spark streaming using
DirectStream approach. Size of 400K messages is around 5G.  Kafka topic is
single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to
convert  rdd into dataframe. It takes almost 2.3 minutes to convert into
dataframe.

I am running in Yarn client mode with executor memory as 15G and executor
cores as 2.

Caching rdd before converting into dataframe  doesn't change processing
time. Whether introducing hash partitions inside foreachRDD  will help?
(or) Will partitioning topic and have more than one DirectStream help?. How
can I approach this situation to reduce time in converting to dataframe..

Regards,
Diwakar.


RE: Fwd: DF creation

2016-03-19 Thread Diwakar Dhanuskodi
Import sqlContext.implicits._  before  using  df ()


Sent from Samsung Mobile.

 Original message From: satyajit vegesna 
 Date:19/03/2016  06:00  (GMT+05:30) 
To: user@spark.apache.org, d...@spark.apache.org Cc:  
Subject: Fwd: DF creation 

Hi ,

I am trying to create separate val reference to object DATA (as shown below),  

case class data(name:String,age:String)

Creation of this object is done separately and the reference to the object is 
stored into val data.

i use val samplerdd = sc.parallelize(Seq(data)) , to create RDD.
org.apache.spark.rdd.RDD[data] = ParallelCollectionRDD[10] at parallelize at 
:24

is there a way to create dataframe out of this, without using  createDataFrame, 
and by using toDF() which i was unable to convert.(would like to avoid 
providing the structtype).

Regards,
Satyajit.





Re: Spark Submit

2016-02-12 Thread Diwakar Dhanuskodi
Try 
spark-submit  --conf "spark.executor.memory=512m" --conf 
"spark.executor.extraJavaOptions=x" --conf "Dlog4j.configuration=log4j.xml"

Sent from Samsung Mobile.

 Original message From: Ted Yu 
 Date:12/02/2016  21:24  (GMT+05:30) 
To: Ashish Soni  Cc: user 
 Subject: Re: Spark Submit 
Have you tried specifying multiple '--conf key=value' ?

Cheers

On Fri, Feb 12, 2016 at 7:44 AM, Ashish Soni  wrote:
Hi All , 

How do i pass multiple configuration parameter while spark submit

Please help i am trying as below 

spark-submit  --conf "spark.executor.memory=512m 
spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.xml"

Thanks,



RE: HADOOP_HOME are not set when try to run spark application in yarn cluster mode

2016-02-09 Thread Diwakar Dhanuskodi
It  should  work  which  version  of  spark  are  you  using ?.Try setting it  
up  in  program  using  sparkConf set .


Sent from Samsung Mobile.

 Original message From: 
rachana.srivast...@thomsonreuters.com Date:10/02/2016  00:47  
(GMT+05:30) To: diwakar.dhanusk...@gmail.com, 
rachana.srivast...@markmonitor.com, user@spark.apache.org Cc:  
Subject: RE: HADOOP_HOME are not set when try to run spark 
application in yarn cluster mode 
Thanks so much Diwakar.
 
spark-submit --class "com.MyClass"  \
--files=/usr/lib/hadoop/etc/hadoop/core-site.xml,/usr/lib/hadoop/etc/hadoop/hdfs-site.xml,/usr/lib/hadoop/etc/hadoop/mapred-site.xml,/usr/lib/hadoop/etc/hadoop/ssl-client.xml,/usr/lib/hadoop/etc/hadoop/yarn-site.xml
 \
--num-executors 2 \
--master yarn-cluster \
 
I have added all the xml files in the spark-submit but still getting the same 
error.  I see all the Hadoop files logged.
 
16/02/09 11:07:00 INFO Client: Uploading resource 
file:/usr/lib/hadoop/etc/hadoop/core-site.xml -> 
hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1455041341343_0002/core-site.xml
16/02/09 11:07:00 INFO Client: Uploading resource 
file:/usr/lib/hadoop/etc/hadoop/hdfs-site.xml -> 
hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1455041341343_0002/hdfs-site.xml
16/02/09 11:07:00 INFO Client: Uploading resource 
file:/usr/lib/hadoop/etc/hadoop/mapred-site.xml -> 
hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1455041341343_0002/mapred-site.xml
16/02/09 11:07:00 INFO Client: Uploading resource 
file:/usr/lib/hadoop/etc/hadoop/ssl-client.xml -> 
hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1455041341343_0002/ssl-client.xml
16/02/09 11:07:00 INFO Client: Uploading resource 
file:/usr/lib/hadoop/etc/hadoop/yarn-site.xml -> 
hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1455041341343_0002/yarn-site.xml
 
From: Diwakar Dhanuskodi [mailto:diwakar.dhanusk...@gmail.com] 
Sent: Tuesday, February 09, 2016 10:00 AM
To: Rachana Srivastava; user@spark.apache.org
Subject: RE: HADOOP_HOME are not set when try to run spark application in yarn 
cluster mode
 
Pass  on  all  hadoop conf files  as  spark-submit parameters in --files
 
 
Sent from Samsung Mobile.
 

 Original message 
From: Rachana Srivastava <rachana.srivast...@markmonitor.com>
Date:09/02/2016 22:53 (GMT+05:30)
To: user@spark.apache.org
Cc:
Subject: HADOOP_HOME are not set when try to run spark application in yarn 
cluster mode
 
I am trying to run an application in yarn cluster mode.
 
Spark-Submit with Yarn Cluster
Here are setting of the shell script:
spark-submit --class "com.Myclass"  \
--num-executors 2 \
--executor-cores 2 \
--master yarn \
--supervise \
--deploy-mode cluster \
../target/ \
 
My application is working fine in yarn-client and local mode.
 
Excerpt for error when we submit application from spark-submit in yarn cluster 
mode.
 
&&&&&&&&&&&&&&&&&&&&&& HADOOP HOME correct path logged but still getting the 
error
/usr/lib/hadoop
&&&&&&&&&&&&&&&&&&&&&& HADOOP_CONF_DIR
/usr/lib/hadoop/etc/hadoop
...
Diagnostics: Exception from container-launch.
Container id: container_1454984479786_0006_02_01
Exit code: 15
Stack trace: ExitCodeException exitCode=15:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 
Further I am getting following error
ERROR DETAILS FROM YARN LOGS APPLICATIONID
INFO : org.apache.spark.deploy.yarn.ApplicationMaster - Registered signal 
handlers for [TERM, HUP, INT]
DEBUG: org.apache.hadoop.util.Shell - Failed to detect a valid hadoop home 
directory
java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:307)
at org.apache.hadoop.util.Shell.(Shell.java:332)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)
at 
org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:590)
at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.newConfiguration(YarnSparkHadoopUtil.scala:62)
at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:52)
at 
org.apache.spark.deploy.yarn.YarnSparkH

Re: AM creation in yarn client mode

2016-02-09 Thread Diwakar Dhanuskodi
Your  2nd assumption  is  correct .
There  is  yarn client  which  polls AM while  running  in  yarn client mode 

Sent from Samsung Mobile.

 Original message From: ayan guha 
 Date:10/02/2016  10:55  (GMT+05:30) 
To: praveen S  Cc: user 
 Subject: Re: AM creation in yarn client mode 

It depends on yarn-cluster and yarn-client mode. 

On Wed, Feb 10, 2016 at 3:42 PM, praveen S  wrote:
Hi,

I have 2 questions when running the spark jobs on yarn in client mode :

1) Where is the AM(application master) created :

A) is it created on the client where the job was submitted? i.e driver and AM 
on the same client? 
Or 
B) yarn decides where the the AM should be created?

2) Driver and AM run in different processes : is my assumption correct?

Regards, 
Praveen




-- 
Best Regards,
Ayan Guha


RE: HADOOP_HOME are not set when try to run spark application in yarn cluster mode

2016-02-09 Thread Diwakar Dhanuskodi
Pass  on  all  hadoop conf files  as  spark-submit parameters in --files


Sent from Samsung Mobile.

 Original message From: Rachana Srivastava 
 Date:09/02/2016  22:53  
(GMT+05:30) To: user@spark.apache.org Cc:  
Subject: HADOOP_HOME are not set when try to run spark application 
in yarn cluster mode 
I am trying to run an application in yarn cluster mode.
 
Spark-Submit with Yarn Cluster
Here are setting of the shell script:
spark-submit --class "com.Myclass"  \
--num-executors 2 \
--executor-cores 2 \
--master yarn \
--supervise \
--deploy-mode cluster \
../target/ \
 
My application is working fine in yarn-client and local mode.
 
Excerpt for error when we submit application from spark-submit in yarn cluster 
mode.
 
&& HADOOP HOME correct path logged but still getting the 
error
/usr/lib/hadoop
&& HADOOP_CONF_DIR
/usr/lib/hadoop/etc/hadoop
...
Diagnostics: Exception from container-launch.
Container id: container_1454984479786_0006_02_01
Exit code: 15
Stack trace: ExitCodeException exitCode=15:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 
Further I am getting following error
ERROR DETAILS FROM YARN LOGS APPLICATIONID
INFO : org.apache.spark.deploy.yarn.ApplicationMaster - Registered signal 
handlers for [TERM, HUP, INT]
DEBUG: org.apache.hadoop.util.Shell - Failed to detect a valid hadoop home 
directory
java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set.
at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:307)
at org.apache.hadoop.util.Shell.(Shell.java:332)
at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)
at 
org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:590)
at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.newConfiguration(YarnSparkHadoopUtil.scala:62)
at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:52)
at 
org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.(YarnSparkHadoopUtil.scala:47)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
at 
org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:386)
at 
org.apache.spark.deploy.SparkHadoopUtil$.yarn$lzycompute(SparkHadoopUtil.scala:384)
at org.apache.spark.deploy.SparkHadoopUtil$.yarn(SparkHadoopUtil.scala:384)
at org.apache.spark.deploy.SparkHadoopUtil$.get(SparkHadoopUtil.scala:401)
at 
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:623)
at 
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
 
I tried modifying spark-env.sh like following and I see Hadoop_Home logged but 
still getting above error:
Modified added following entries to spark-env.sh
export HADOOP_HOME="/usr/lib/hadoop"
echo "&& HADOOP HOME "
echo "$HADOOP_HOME"
export HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
echo "&& HADOOP_CONF_DIR "
echo "$HADOOP_CONF_DIR"

Re: Kafka directsream receiving rate

2016-02-08 Thread Diwakar Dhanuskodi

Now, using  DirectStream   I am  able  to  process  2 Million messages from 20 
partition topic  in a batch  interval of  2000ms.

Finally  figured  out  that  Kafka producer from  a source system  is  sending  
same  topic  name  instead  of  key in  keyedmessage . It  could  put  messages 
 into  all  partitions .

But I still  don't understand why  DirectStream couldn't process  entire 
messages when  there  is single  partitioned topic . Let me  look  closely  and 
 update here .Let us close  this  mail  thread. 

Thanks  Cody for  bringing  insights.

Regards 
Diwakar .


Sent from Samsung Mobile.

 Original message From: Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> Date:07/02/2016  01:39  (GMT+05:30) 
To: Cody Koeninger <c...@koeninger.org> Cc: 
user@spark.apache.org Subject: Re: Kafka directsream receiving rate 

Thanks  Cody for  trying to  understand the  issue .
Sorry if  I am  not  clear .
The scenario  is  to  process all messages at once  in  single  dstream block  
when  source  system  publishes messages .Source  system  will  publish x 
messages  / 10 minutes  once.

By events I meant that  total no of messages processed by each batch interval  
( in my case 2000ms)   by executor ( web UI shows each block  processing  as  
events)  

DirectStream is processing only  10 messages per batch. It  is  same if  100 or 
 1 million  messages published. 

xyz topic having 20 partitions. 
I am  using  kafka producer api to publish messages. 
Below  is  the  code  that  I am  using 
{
val topics = "xyz"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)
k. foreachRDD { rdd =>
val dstreamToRDD =rdd.cache ()
println (current time & dtreamToRDD.partitions.length.)
val accTran  = dstream To RDD. filter { ...}
accTran.map {...}
}
ssc.start ()
ssc.awaitTermination
}


 }

I tried using DirectStream with map which  I had  issue with  
offsetRange . After  your  suggestion  offset  issue  is  resolved  when  I 
used  above  DirectStream code with topic only.

spark-submit setting that  I am  using  is  in  the  mail  chain  below .
Is  there  any bottlebeck I am  hitting to  process maximum messages at one 
batch interval using  directsream rdd?  .
If  this  is  not  clear . I would  take  this  offline  and  explain   
scenario briefly. 

Sent from Samsung Mobile.








 Original message 
From: Cody Koeninger <c...@koeninger.org>
Date:06/02/2016 22:32 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

I am not at all clear on what you are saying.

"Yes , I am  printing  each  messages .  It is  processing all  messages under 
each  dstream block."  If it is processing all messages, what is the problem 
you are having?

"The issue is  with  Directsream processing 10 message per event. "  What 
distinction are you making between a message and an event?

"I am  expecting  Directsream to  process  1 million messages"   Your first 
email said you were publishing 100 messages but only processing 10.  Why are 
you now trying to process 1 million messages without understanding what is 
going on?  Make sure you can process a limited number of messages correctly 
first.  The first code examples you posted to the list had some pretty serious 
errors (ie only trying to process 1 partition, trying to process offsets that 
didn't exist).  Make sure that is all fixed first.

To be clear, I use direct kakfa rdds to process batches with like 4gb of 
messages per partition, you shouldn't be hitting some kind of limit with 1 
million messages per batch.  You may of course hit executor resource issues 
depending on what you're trying to do with each message, but that doesn't sound 
like the case here.

If you want help, either clarify what you are saying, or post a minimal 
reproducible code example, with expected output vs actual output.






On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
Cody, 
Yes , I am  printing  each  messages . It is  processing all  messages under 
each  dstream block.

Source systems are   publishing  1 Million messages /4 secs which is less than 
batch interval. The issue is  with  Directsream processing 10 message per 
event. When partitions were  increased to  20 in topic, DirectStream picksup 
only 200 messages ( I guess 10 for  each partition ) at a time for  processing 
. I have  16 executors running for  streaming ( both  yarn client & cluster 
mode). 
I am  expecting  Directsream to  process  1 million messages which  published 
in topic < batch interval . 

Using  createStream , It could  batch 150K messages and pro

Re: Apache Spark data locality when integrating with Kafka

2016-02-07 Thread Diwakar Dhanuskodi
Fanoos, 
Where  you  want the solution to  be deployed ?. On premise or cloud?

Regards 
Diwakar .



Sent from Samsung Mobile.

 Original message From: "Yuval.Itzchakov" 
 Date:07/02/2016  19:38  (GMT+05:30) 
To: user@spark.apache.org Cc:  Subject: Re: 
Apache Spark data locality when integrating with Kafka 
I would definitely try to avoid hosting Kafka and Spark on the same 
servers. 

Kafka and Spark will be doing alot of IO between them, so you'll want to
maximize on those resources and not share them on the same server. You'll
want each Kafka broker to be on a dedicated server, as well as your spark
master and workers. If you're hosting them on Amazon EC2 instances, then
you'll want these to be on the same availability zone, so you can benefit
from low latency in that same zone. If you're on a dedicated servers,
perhaps you'll want to create a VPC between the two clusters so you can,
again, benefit from low IO latency and high throughput.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165p26170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Apache Spark data locality when integrating with Kafka

2016-02-07 Thread Diwakar Dhanuskodi
We   are using spark in  two  ways 
1. Yarn with spark support. Kafka running along with  data nodes 
2.  Spark master and workers  running  with  some  of  Kafka brokers. 
Data locality is  important.

Regards
Diwakar 


Sent from Samsung Mobile.

 Original message From: أنس الليثي 
<dev.fano...@gmail.com> Date:08/02/2016  02:07  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: 
"Yuval.Itzchakov" <yuva...@gmail.com>, user <user@spark.apache.org> 
Subject: Re: Apache Spark data locality when integrating with Kafka 

Diwakar 

We have our own servers. We will not use any cloud service like Amazon's 

On 7 February 2016 at 18:24, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> 
wrote:
Fanoos, 
Where  you  want the solution to  be deployed ?. On premise or cloud?

Regards 
Diwakar .



Sent from Samsung Mobile.


 Original message 
From: "Yuval.Itzchakov" <yuva...@gmail.com>
Date:07/02/2016 19:38 (GMT+05:30)
To: user@spark.apache.org
Cc:
Subject: Re: Apache Spark data locality when integrating with Kafka

I would definitely try to avoid hosting Kafka and Spark on the same servers. 

Kafka and Spark will be doing alot of IO between them, so you'll want to
maximize on those resources and not share them on the same server. You'll
want each Kafka broker to be on a dedicated server, as well as your spark
master and workers. If you're hosting them on Amazon EC2 instances, then
you'll want these to be on the same availability zone, so you can benefit
from low latency in that same zone. If you're on a dedicated servers,
perhaps you'll want to create a VPC between the two clusters so you can,
again, benefit from low IO latency and high throughput.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165p26170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-- 
Anas Rabei
Senior Software Developer
Mubasher.info
anas.ra...@mubasher.info

Re: Kafka directsream receiving rate

2016-02-06 Thread Diwakar Dhanuskodi
Thanks  Cody for  trying to  understand the  issue .
Sorry if  I am  not  clear .
The scenario  is  to  process all messages at once  in  single  dstream block  
when  source  system  publishes messages .Source  system  will  publish x 
messages  / 10 minutes  once.

By events I meant that  total no of messages processed by each batch interval  
( in my case 2000ms)   by executor ( web UI shows each block  processing  as  
events)  

DirectStream is processing only  10 messages per batch. It  is  same if  100 or 
 1 million  messages published. 

xyz topic having 20 partitions. 
I am  using  kafka producer api to publish messages. 
Below  is  the  code  that  I am  using 
{
val topics = "xyz"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)
k. foreachRDD { rdd =>
val dstreamToRDD =rdd.cache ()
println (current time & dtreamToRDD.partitions.length.)
val accTran  = dstream To RDD. filter { ...}
accTran.map {...}
}
ssc.start ()
ssc.awaitTermination
}


 }

I tried using DirectStream with map which  I had  issue with  
offsetRange . After  your  suggestion  offset  issue  is  resolved  when  I 
used  above  DirectStream code with topic only.

spark-submit setting that  I am  using  is  in  the  mail  chain  below .
Is  there  any bottlebeck I am  hitting to  process maximum messages at one 
batch interval using  directsream rdd?  .
If  this  is  not  clear . I would  take  this  offline  and  explain   
scenario briefly. 

Sent from Samsung Mobile.







 Original message From: Cody Koeninger 
<c...@koeninger.org> Date:06/02/2016  22:32  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: 
user@spark.apache.org Subject: Re: Kafka directsream receiving rate 

I am not at all clear on what you are saying.

"Yes , I am  printing  each  messages .  It is  processing all  messages under 
each  dstream block."  If it is processing all messages, what is the problem 
you are having?

"The issue is  with  Directsream processing 10 message per event. "  What 
distinction are you making between a message and an event?

"I am  expecting  Directsream to  process  1 million messages"   Your first 
email said you were publishing 100 messages but only processing 10.  Why are 
you now trying to process 1 million messages without understanding what is 
going on?  Make sure you can process a limited number of messages correctly 
first.  The first code examples you posted to the list had some pretty serious 
errors (ie only trying to process 1 partition, trying to process offsets that 
didn't exist).  Make sure that is all fixed first.

To be clear, I use direct kakfa rdds to process batches with like 4gb of 
messages per partition, you shouldn't be hitting some kind of limit with 1 
million messages per batch.  You may of course hit executor resource issues 
depending on what you're trying to do with each message, but that doesn't sound 
like the case here.

If you want help, either clarify what you are saying, or post a minimal 
reproducible code example, with expected output vs actual output.






On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
Cody, 
Yes , I am  printing  each  messages . It is  processing all  messages under 
each  dstream block.

Source systems are   publishing  1 Million messages /4 secs which is less than 
batch interval. The issue is  with  Directsream processing 10 message per 
event. When partitions were  increased to  20 in topic, DirectStream picksup 
only 200 messages ( I guess 10 for  each partition ) at a time for  processing 
. I have  16 executors running for  streaming ( both  yarn client & cluster 
mode). 
I am  expecting  Directsream to  process  1 million messages which  published 
in topic < batch interval . 

Using  createStream , It could  batch 150K messages and process . createStream 
is  better than  Directsream in  this  case . Again why only  150K.

Any  clarification is  much  appreciated  on directStream processing millions 
per batch .




Sent from Samsung Mobile.


 Original message ----
From: Cody Koeninger <c...@koeninger.org>
Date:06/02/2016 01:30 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

Have you tried just printing each message, to see which ones are being 
processed?

On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
I am  able  to  see  no of  messages processed  per  event  in  sparkstreaming 
web UI . Also  I am  counting  the  messages inside  foreachRDD .
Removed  the  settings for  backpressure but still  the  same .





Sent from Sams

RE: Apache Spark data locality when integrating with Kafka

2016-02-06 Thread Diwakar Dhanuskodi
Yes . To  reduce  network  latency .


Sent from Samsung Mobile.

 Original message From: fanooos 
 Date:07/02/2016  09:24  (GMT+05:30) 
To: user@spark.apache.org Cc:  Subject: Apache 
Spark data locality when integrating with Kafka 
Dears

If I will use Kafka as a streaming source to some spark jobs, is it advised
to install spark to the same nodes of kafka cluster? 

What are the benefits and drawbacks of such a decision? 

regards 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka directsream receiving rate

2016-02-06 Thread Diwakar Dhanuskodi
Cody, 
Yes , I am  printing  each  messages . It is  processing all  messages under 
each  dstream block.

Source systems are   publishing  1 Million messages /4 secs which is less than 
batch interval. The issue is  with  Directsream processing 10 message per 
event. When partitions were  increased to  20 in topic, DirectStream picksup 
only 200 messages ( I guess 10 for  each partition ) at a time for  processing 
. I have  16 executors running for  streaming ( both  yarn client & cluster 
mode). 
I am  expecting  Directsream to  process  1 million messages which  published 
in topic < batch interval . 

Using  createStream , It could  batch 150K messages and process . createStream 
is  better than  Directsream in  this  case . Again why only  150K.

Any  clarification is  much  appreciated  on directStream processing millions 
per batch .




Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
<c...@koeninger.org> Date:06/02/2016  01:30  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: 
user@spark.apache.org Subject: Re: Kafka directsream receiving rate 

Have you tried just printing each message, to see which ones are being 
processed?

On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
I am  able  to  see  no of  messages processed  per  event  in  sparkstreaming 
web UI . Also  I am  counting  the  messages inside  foreachRDD .
Removed  the  settings for  backpressure but still  the  same .





Sent from Samsung Mobile.


 Original message 
From: Cody Koeninger <c...@koeninger.org>
Date:06/02/2016 00:33 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

How are you counting the number of messages?

I'd go ahead and remove the settings for backpressure and maxrateperpartition, 
just to eliminate that as a variable.

On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
I am  using  one  directsream. Below  is  the  call  to directsream:-

val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)

When  I replace   DirectStream call  to  createStream,  all  messages were  
read  by  one  Dstream block.:-
val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap 
,StorageLevel.MEMORY_ONLY)

I am  using   below  spark-submit to execute:
./spark-submit --master yarn-client --conf 
"spark.dynamicAllocation.enabled=true" --conf 
"spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" 
--conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf 
"spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" 
--conf "spark.shuffle.consolidateFiles=true"   --conf 
"spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g 
--executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files 
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
 --jars 
/root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
 /root/Jars/sparkreceiver.jar




Sent from Samsung Mobile.


 Original message 
From: Cody Koeninger <c...@koeninger.org>
Date:05/02/2016 22:07 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

If you're using the direct stream, you have 0 receivers.  Do you mean you have 
1 executor?

Can you post the relevant call to createDirectStream from your code, as well as 
any relevant spark configuration?

On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
Adding more info

Batch  interval  is  2000ms.
I expect all 100 messages  go thru one  dstream from  directsream but it 
receives at rate of 10 messages at time. Am  I missing  some  configurations 
here. Any help appreciated. 

Regards 
Diwakar.


Sent from Samsung Mobile.


 Original message 
From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Date:05/02/2016 07:33 (GMT+05:30)
To: user@spark.apache.org
Cc:
Subject: Kafka directsream receiving rate

Hi,
Using spark 1.5.1.
I have a topic with 20 partitions.  When I publish 100 messages. Spark direct 
stream is receiving 10 messages per  dstream. I have  only  one  receiver . 
When I used createStream the  receiver  received  entire 100 messages  at once. 
 

Appreciate  any  help .

Regards 
Diwakar


Sent from Samsung Mobile.





Re: Kafka directsream receiving rate

2016-02-05 Thread Diwakar Dhanuskodi
I am  using  one  directsream. Below  is  the  call  to directsream:-

val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)

When  I replace   DirectStream call  to  createStream,  all  messages were  
read  by  one  Dstream block.:-
val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap 
,StorageLevel.MEMORY_ONLY)

I am  using   below  spark-submit to execute:
./spark-submit --master yarn-client --conf 
"spark.dynamicAllocation.enabled=true" --conf 
"spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" 
--conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf 
"spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" 
--conf "spark.shuffle.consolidateFiles=true"   --conf 
"spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g 
--executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files 
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
 --jars 
/root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
 /root/Jars/sparkreceiver.jar




Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
<c...@koeninger.org> Date:05/02/2016  22:07  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: 
user@spark.apache.org Subject: Re: Kafka directsream receiving rate 

If you're using the direct stream, you have 0 receivers.  Do you mean you 
have 1 executor?

Can you post the relevant call to createDirectStream from your code, as well as 
any relevant spark configuration?

On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
Adding more info

Batch  interval  is  2000ms.
I expect all 100 messages  go thru one  dstream from  directsream but it 
receives at rate of 10 messages at time. Am  I missing  some  configurations 
here. Any help appreciated. 

Regards 
Diwakar.


Sent from Samsung Mobile.


 Original message 
From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Date:05/02/2016 07:33 (GMT+05:30)
To: user@spark.apache.org
Cc:
Subject: Kafka directsream receiving rate

Hi,
Using spark 1.5.1.
I have a topic with 20 partitions.  When I publish 100 messages. Spark direct 
stream is receiving 10 messages per  dstream. I have  only  one  receiver . 
When I used createStream the  receiver  received  entire 100 messages  at once. 
 

Appreciate  any  help .

Regards 
Diwakar


Sent from Samsung Mobile.



Re: Kafka directsream receiving rate

2016-02-05 Thread Diwakar Dhanuskodi
I am  able  to  see  no of  messages processed  per  event  in  sparkstreaming 
web UI . Also  I am  counting  the  messages inside  foreachRDD .
Removed  the  settings for  backpressure but still  the  same .





Sent from Samsung Mobile.

 Original message From: Cody Koeninger 
<c...@koeninger.org> Date:06/02/2016  00:33  (GMT+05:30) 
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: 
user@spark.apache.org Subject: Re: Kafka directsream receiving rate 

How are you counting the number of messages?

I'd go ahead and remove the settings for backpressure and maxrateperpartition, 
just to eliminate that as a variable.

On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
I am  using  one  directsream. Below  is  the  call  to directsream:-

val topicSet = topics.split(",").toSet
val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)

When  I replace   DirectStream call  to  createStream,  all  messages were  
read  by  one  Dstream block.:-
val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap 
,StorageLevel.MEMORY_ONLY)

I am  using   below  spark-submit to execute:
./spark-submit --master yarn-client --conf 
"spark.dynamicAllocation.enabled=true" --conf 
"spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" 
--conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf 
"spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" 
--conf "spark.shuffle.consolidateFiles=true"   --conf 
"spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g 
--executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files 
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
 --jars 
/root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar
 /root/Jars/sparkreceiver.jar




Sent from Samsung Mobile.


 Original message 
From: Cody Koeninger <c...@koeninger.org>
Date:05/02/2016 22:07 (GMT+05:30)
To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Cc: user@spark.apache.org
Subject: Re: Kafka directsream receiving rate

If you're using the direct stream, you have 0 receivers.  Do you mean you have 
1 executor?

Can you post the relevant call to createDirectStream from your code, as well as 
any relevant spark configuration?

On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> wrote:
Adding more info

Batch  interval  is  2000ms.
I expect all 100 messages  go thru one  dstream from  directsream but it 
receives at rate of 10 messages at time. Am  I missing  some  configurations 
here. Any help appreciated. 

Regards 
Diwakar.


Sent from Samsung Mobile.


 Original message 
From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>
Date:05/02/2016 07:33 (GMT+05:30)
To: user@spark.apache.org
Cc:
Subject: Kafka directsream receiving rate

Hi,
Using spark 1.5.1.
I have a topic with 20 partitions.  When I publish 100 messages. Spark direct 
stream is receiving 10 messages per  dstream. I have  only  one  receiver . 
When I used createStream the  receiver  received  entire 100 messages  at once. 
 

Appreciate  any  help .

Regards 
Diwakar


Sent from Samsung Mobile.




RE: Kafka directsream receiving rate

2016-02-04 Thread Diwakar Dhanuskodi
Adding more info

Batch  interval  is  2000ms.
I expect all 100 messages  go thru one  dstream from  directsream but it 
receives at rate of 10 messages at time. Am  I missing  some  configurations 
here. Any help appreciated. 

Regards 
Diwakar.


Sent from Samsung Mobile.

 Original message From: Diwakar Dhanuskodi 
<diwakar.dhanusk...@gmail.com> Date:05/02/2016  07:33  (GMT+05:30) 
To: user@spark.apache.org Cc:  Subject: Kafka 
directsream  receiving  rate 
Hi,
Using spark 1.5.1.
I have a topic with 20 partitions.  When I publish 100 messages. Spark direct 
stream is receiving 10 messages per  dstream. I have  only  one  receiver . 
When I used createStream the  receiver  received  entire 100 messages  at once. 
 

Appreciate  any  help .

Regards 
Diwakar


Sent from Samsung Mobile.

kafkaDirectStream usage error

2016-02-04 Thread Diwakar Dhanuskodi
:04:41 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on 
datanode3.isdp.com:42736 in memory (size: 2.4 KB, free: 530.3 MB)
16/02/04 21:04:41 INFO spark.ContextCleaner: Cleaned accumulator 1
16/02/04 21:04:41 INFO ui.SparkUI: Stopped Spark web UI at 
http://10.132.117.208:4040
16/02/04 21:04:41 INFO scheduler.DAGScheduler: Stopping DAGScheduler
16/02/04 21:04:41 INFO scheduler.DAGScheduler: ShuffleMapStage 2 (foreachRDD at 
SparkReceiver.scala:74) failed in 1.512 s
16/02/04 21:04:41 INFO cluster.YarnClientSchedulerBackend: Shutting down all 
executors
16/02/04 21:04:41 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor 
thread
16/02/04 21:04:41 INFO cluster.YarnClientSchedulerBackend: Asking each executor 
to shut down
16/02/04 21:04:41 INFO cluster.YarnClientSchedulerBackend: Stopped
16/02/04 21:04:41 WARN spark.ExecutorAllocationManager: No stages are running, 
but numRunningTasks != 0
16/02/04 21:04:41 INFO spark.MapOutputTrackerMasterEndpoint: 
MapOutputTrackerMasterEndpoint stopped!
16/02/04 21:04:41 INFO storage.MemoryStore: MemoryStore cleared
16/02/04 21:04:41 INFO storage.BlockManager: BlockManager stopped
16/02/04 21:04:41 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
16/02/04 21:04:41 INFO spark.SparkContext: Successfully stopped SparkContext
16/02/04 21:04:41 INFO 
scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: 
OutputCommitCoordinator stopped!
16/02/04 21:04:41 INFO util.ShutdownHookManager: Shutdown hook called
16/02/04 21:04:41 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-2014834b-fb03-4418-99e0-f35171acc67e
16/02/04 21:04:41 INFO util.ShutdownHookManager: Deleting directory 
/tmp/spark-9afdd351-d7a6-44cc-a47f-cce3170293e9
16/02/04 21:04:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Shutting down remote daemon.
16/02/04 21:04:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote 
daemon shut down; proceeding with flushing remote transports.
[root@Datanode4 bin]#


//    val k  = KafkaUtils.createStream(ssc, 
"datanode4.isdp.com:2181","DIME",topicMap ,StorageLevel.MEMORY_ONLY)
//Changes for DirectStream - START
    val brokers = "datanode4.isdp.com:9092"+","+"datanode5.isdp.com:9093"
    val topicSet = topics.split(",").toSet
    val kafkaParams = Map[String,String]("bootstrap.servers" -> 
"datanode4.isdp.com:9092")
    //when executing DirectStream , comment out createStream statement
    val k = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, 
kafkaParams, topicSet)
    val topicAndPart =  OffsetRange.create("request5",0, 
1,10).topicAndPartition()
    val fromOffsets = Map[kafka.common.TopicAndPartition,Long](topicAndPart->0)
    val messageHandler = (mmd : MessageAndMetadata[String,String]) => 
(mmd.key(),mmd.message())
    val k1 = 
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc,
 kafkaParams, fromOffsets,messageHandler)

./spark-submit --master yarn-client  --conf 
"spark.dynamicAllocation.enabled=true" --conf 
"spark.shuffle.service.enabled=true"  --conf "spark.sql.tungsten.enabled=false" 
--conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf 
"spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" 
--conf "spark.shuffle.consolidateFiles=true"  --conf 
"spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g 
--executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files 
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
 --jars 
/root/Downloads/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Desktop/Jars/sparkreceiver.jar
 /root/Desktop/Jars/sparkreceiver.jar > log.txt

./spark-submit --master yarn --deploy-mode client  --num-executors 10 
--executor-cores 1  --conf "spark.streaming.kafka.maxRatePerPartition=1000" 
--conf "spark.streaming.blockInterval=50ms" --conf 
"spark.ui.showConsoleProgress=false" --conf "spark.sql.tungsten.enabled=false" 
--conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf 
"spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" 
--conf "spark.shuffle.consolidateFiles=true"   --driver-memory 2g 
--executor-memory 1g --class com.tcs.dime.spark.SparkReceiver   --files 
/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml
 --jars 
hdfs://Namenode:8020/user/root/kafka-assembly.jar,hdfs://Namenode:8020/user/root/sparkreceiver.jar
 /root/Desktop/Jars/sparkreceiver.jar  &> log.txt