Re: Spark yarn cluster
Thanks Martin. I was not clear in my question initially . Thanks for understanding and briefing. The idea as you said is to explore the possibility of using yarn for cluster scheduling with spark being used without hdfs. Thanks again for clarification. On Sat, Jul 11, 2020 at 1:27 PM Juan Martín Guillén < juanmartinguil...@yahoo.com.ar> wrote: > Hi Diwakar, > > A Yarn cluster not having Hadoop is kind of a fuzzy concept. > > Definitely you may want to have Hadoop and don't need to use MapReduce and > use Spark instead. That is the main reason to use Spark in a Hadoop cluster > anyway. > > On the other hand it is highly probable you may want to use HDFS although > not strictly necessary. > > So answering your question you are using Hadoop by using Yarn because it > is one of the 3 main components of it but that doesn't mean you need to use > other components of the Hadoop cluster, namely MapReduce and HDFS. > > That being said, if you just need cluster scheduling and not using > MapReduce nor HDFS it is possible you will be fine with the Spark > Standalone cluster. > > Regards, > Juan Martín. > > El sábado, 11 de julio de 2020 13:57:40 ART, Diwakar Dhanuskodi < > diwakar.dhanusk...@gmail.com> escribió: > > > Hi , > > Could it be possible to setup Spark within Yarn cluster which may not have > Hadoop?. > > Thanks. >
Spark yarn cluster
Hi , Could it be possible to setup Spark within Yarn cluster which may not have Hadoop?. Thanks.
Foreachpartition in spark streaming
Just wanted to clarify!!! Is foreachPartition in spark an output operation? Which one is better use mapPartitions or foreachPartitions? Regards Diwakar
Re: Pls assist: Spark 2.0 build failure on Ubuntu 16.06
Please run with -X and post logs here. We can get exact error from it. On Sat, Sep 3, 2016 at 7:24 PM, Marco Mistroniwrote: > hi all > > i am getting failures when building spark 2.0 on Ubuntu 16.06 > Here's details of what i have installed on the ubuntu host > - java 8 > - scala 2.11 > - git > > When i launch the command > > ./build/mvn -Pyarn -Phadoop-2.7 -DskipTests clean package > > everything compiles sort of fine and at the end i get this exception > > INFO] BUILD FAILURE > [INFO] > > [INFO] Total time: 01:05 h > [INFO] Finished at: 2016-09-03T13:25:27+00:00 > [INFO] Final Memory: 57M/208M > [INFO] > > [ERROR] Failed to execute goal > net.alchim31.maven:scala-maven-plugin:3.2.2:testCompile > (scala-test-compile-first) on project spark-streaming_2.11: Execution > scala-test-compile-first of goal > net.alchim31.maven:scala-maven-plugin:3.2.2:testCompile > failed. CompileFailed -> [Help 1] > [ERROR] > [ERROR] To see the full stack trace of the errors, re-run Maven with the > -e switch. > [ERROR] Re-run Maven using the -X switch to enable full debug logging. > [ERROR] > [ERROR] For more information about the errors and possible solutions, > please read the following articles: > [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/ > PluginExecutionException > [ERROR] > [ERROR] After correcting the problems, you can resume the build with the > command > [ERROR] mvn -rf :spark-streaming_2.11 > > could anyone help? > > kr >
Hive connection issues in spark-shell
Hi, I recently built spark using maven. Now when starting spark-shell, it couldn't connect hive and getting below error I couldn't find datanucleus jar in built library. But datanucleus jar is available in hive/lib folders. java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at javax.jdo.JDOHelper$18.run(JDOHelper.java:2018) at javax.jdo.JDOHelper$18.run(JDOHelper.java:2016) at java.security.AccessController.doPrivileged(Native Method) at javax.jdo.JDOHelper.forName(JDOHelper.java:2015) at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1162) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RawStoreProxy.(RawStoreProxy.java:57) at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:66) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:593) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:571) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.(RetryingHMSHandler.java:66) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72) at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:199) at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:74) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:86) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503) at org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:204) at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:238) at org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:218) at org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:208) at org.apache.spark.sql.hive.HiveContext.functionRegistry$lzycompute(HiveContext.scala:462) at org.apache.spark.sql.hive.HiveContext.functionRegistry(HiveContext.scala:461) at org.apache.spark.sql.UDFRegistration.(UDFRegistration.scala:40) at org.apache.spark.sql.SQLContext.(SQLContext.scala:330) at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:97) at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:101) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.spark.repl.Main$.createSQLContext(Main.scala:89) at $line4.$read$$iw$$iw.(:12) at $line4.$read$$iw.(:21) at $line4.$read.(:23) at $line4.$read$.(:27) at $line4.$read$.() at $line4.$eval$.$print$lzycompute(:7) at $line4.$eval$.$print(:6) at $line4.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
Re: Spark build 1.6.2 error
Sorry my bad. In both the runs I included -Dscala-2.11 On Sat, Sep 3, 2016 at 12:39 PM, Nachiketa <nachiketa.shu...@gmail.com> wrote: > I think the difference was the -Dscala2.11 to the command line. > > I have seen this show up when I miss that. > > Regards, > Nachiketa > > On Sat 3 Sep, 2016, 12:14 PM Diwakar Dhanuskodi, < > diwakar.dhanusk...@gmail.com> wrote: > >> Hi, >> >> Just re-ran again without killing zinc server process >> >> /make-distribution.sh --name custom-spark --tgz -Phadoop-2.6 -Phive >> -Pyarn -Dmaven.version=3.0.4 -Dscala-2.11 -X -rf :spark-sql_2.11 >> >> Build is success. Not sure how it worked with just re-running command >> again. >> >> On Sat, Sep 3, 2016 at 11:44 AM, Diwakar Dhanuskodi < >> diwakar.dhanusk...@gmail.com> wrote: >> >>> Hi, >>> >>> java version 7 >>> >>> mvn command >>> ./make-distribution.sh --name custom-spark --tgz -Phadoop-2.6 -Phive >>> -Phive-thriftserver -Pyarn -Dmaven.version=3.0.4 >>> >>> >>> yes, I executed script to change scala version to 2.11 >>> killed "com.typesafe zinc.Nailgun" process >>> >>> re-ran mvn with below command again >>> >>> ./make-distribution.sh --name custom-spark --tgz -Phadoop-2.6 -Phive >>> -Phive-thriftserver -Pyarn -Dmaven.version=3.0.4 -X -rf :spark-sql_2.11 >>> >>> Getting same error >>> >>> [warn] /home/cloudera/Downloads/spark-1.6.2/sql/core/src/main/ >>> scala/org/apache/spark/sql/sources/interfaces.scala:911: method isDir >>> in class FileStatus is deprecated: see corresponding Javadoc for more >>> information. >>> [warn] status.isDir, >>> [warn]^ >>> [error] missing or invalid dependency detected while loading class file >>> 'WebUI.class'. >>> [error] Could not access term eclipse in package org, >>> [error] because it (or its dependencies) are missing. Check your build >>> definition for >>> [error] missing or conflicting dependencies. (Re-run with >>> `-Ylog-classpath` to see the problematic classpath.) >>> [error] A full rebuild may help if 'WebUI.class' was compiled against an >>> incompatible version of org. >>> [error] missing or invalid dependency detected while loading class file >>> 'WebUI.class'. >>> [error] Could not access term jetty in value org.eclipse, >>> [error] because it (or its dependencies) are missing. Check your build >>> definition for >>> [error] missing or conflicting dependencies. (Re-run with >>> `-Ylog-classpath` to see the problematic classpath.) >>> [error] A full rebuild may help if 'WebUI.class' was compiled against an >>> incompatible version of org.eclipse. >>> [warn] 17 warnings found >>> [error] two errors found >>> [debug] Compilation failed (CompilerInterface) >>> [error] Compile failed at Sep 3, 2016 11:28:34 AM [21.611s] >>> [INFO] >>> >>> [INFO] Reactor Summary: >>> [INFO] >>> [INFO] Spark Project Parent POM .. SUCCESS >>> [5.583s] >>> [INFO] Spark Project Test Tags ... SUCCESS >>> [4.189s] >>> [INFO] Spark Project Launcher SUCCESS >>> [12.226s] >>> [INFO] Spark Project Networking .. SUCCESS >>> [13.386s] >>> [INFO] Spark Project Shuffle Streaming Service ... SUCCESS >>> [6.723s] >>> [INFO] Spark Project Unsafe .. SUCCESS >>> [21.231s] >>> [INFO] Spark Project Core SUCCESS >>> [3:46.334s] >>> [INFO] Spark Project Bagel ... SUCCESS >>> [7.032s] >>> [INFO] Spark Project GraphX .. SUCCESS >>> [19.558s] >>> [INFO] Spark Project Streaming ... SUCCESS >>> [50.452s] >>> [INFO] Spark Project Catalyst SUCCESS >>> [1:14.172s] >>> [INFO] Spark Project SQL . FAILURE >>> [23.222s] >>> [INFO] Spark Project ML Library .. SKIPPED >>> [INFO] Spark Project Tools ... SKIPPED >>> [INFO] Spark Project Hive SKIPPED >>> [INFO] Spark Project Docker Integration
Re: Spark build 1.6.2 error
Hi, Just re-ran again without killing zinc server process /make-distribution.sh --name custom-spark --tgz -Phadoop-2.6 -Phive -Pyarn -Dmaven.version=3.0.4 -Dscala-2.11 -X -rf :spark-sql_2.11 Build is success. Not sure how it worked with just re-running command again. On Sat, Sep 3, 2016 at 11:44 AM, Diwakar Dhanuskodi < diwakar.dhanusk...@gmail.com> wrote: > Hi, > > java version 7 > > mvn command > ./make-distribution.sh --name custom-spark --tgz -Phadoop-2.6 -Phive > -Phive-thriftserver -Pyarn -Dmaven.version=3.0.4 > > > yes, I executed script to change scala version to 2.11 > killed "com.typesafe zinc.Nailgun" process > > re-ran mvn with below command again > > ./make-distribution.sh --name custom-spark --tgz -Phadoop-2.6 -Phive > -Phive-thriftserver -Pyarn -Dmaven.version=3.0.4 -X -rf :spark-sql_2.11 > > Getting same error > > [warn] /home/cloudera/Downloads/spark-1.6.2/sql/core/src/main/ > scala/org/apache/spark/sql/sources/interfaces.scala:911: method isDir in > class FileStatus is deprecated: see corresponding Javadoc for more > information. > [warn] status.isDir, > [warn]^ > [error] missing or invalid dependency detected while loading class file > 'WebUI.class'. > [error] Could not access term eclipse in package org, > [error] because it (or its dependencies) are missing. Check your build > definition for > [error] missing or conflicting dependencies. (Re-run with > `-Ylog-classpath` to see the problematic classpath.) > [error] A full rebuild may help if 'WebUI.class' was compiled against an > incompatible version of org. > [error] missing or invalid dependency detected while loading class file > 'WebUI.class'. > [error] Could not access term jetty in value org.eclipse, > [error] because it (or its dependencies) are missing. Check your build > definition for > [error] missing or conflicting dependencies. (Re-run with > `-Ylog-classpath` to see the problematic classpath.) > [error] A full rebuild may help if 'WebUI.class' was compiled against an > incompatible version of org.eclipse. > [warn] 17 warnings found > [error] two errors found > [debug] Compilation failed (CompilerInterface) > [error] Compile failed at Sep 3, 2016 11:28:34 AM [21.611s] > [INFO] > > [INFO] Reactor Summary: > [INFO] > [INFO] Spark Project Parent POM .. SUCCESS [5.583s] > [INFO] Spark Project Test Tags ... SUCCESS [4.189s] > [INFO] Spark Project Launcher SUCCESS > [12.226s] > [INFO] Spark Project Networking .. SUCCESS > [13.386s] > [INFO] Spark Project Shuffle Streaming Service ... SUCCESS [6.723s] > [INFO] Spark Project Unsafe .. SUCCESS > [21.231s] > [INFO] Spark Project Core SUCCESS > [3:46.334s] > [INFO] Spark Project Bagel ... SUCCESS > [7.032s] > [INFO] Spark Project GraphX .. SUCCESS > [19.558s] > [INFO] Spark Project Streaming ... SUCCESS > [50.452s] > [INFO] Spark Project Catalyst SUCCESS > [1:14.172s] > [INFO] Spark Project SQL . FAILURE > [23.222s] > [INFO] Spark Project ML Library .. SKIPPED > [INFO] Spark Project Tools ... SKIPPED > [INFO] Spark Project Hive SKIPPED > [INFO] Spark Project Docker Integration Tests SKIPPED > [INFO] Spark Project REPL SKIPPED > [INFO] Spark Project YARN Shuffle Service SKIPPED > [INFO] Spark Project YARN SKIPPED > [INFO] Spark Project Assembly SKIPPED > [INFO] Spark Project External Twitter SKIPPED > [INFO] Spark Project External Flume Sink . SKIPPED > [INFO] Spark Project External Flume .. SKIPPED > [INFO] Spark Project External Flume Assembly . SKIPPED > [INFO] Spark Project External MQTT ... SKIPPED > [INFO] Spark Project External MQTT Assembly .. SKIPPED > [INFO] Spark Project External ZeroMQ . SKIPPED > [INFO] Spark Project External Kafka .. SKIPPED > [INFO] Spark Project Examples SKIPPED > [INFO] Spark Project External Kafka Assembly . SKIPPED > [INFO] > > [INFO] BUILD FAILURE > [INFO] -
Re: Spark build 1.6.2 error
utor.java:225) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:84) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:59) at org.apache.maven.lifecycle.internal.LifecycleStarter.singleThreadedBuild(LifecycleStarter.java:183) at org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:161) at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:320) at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:156) at org.apache.maven.cli.MavenCli.execute(MavenCli.java:537) at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:196) at org.apache.maven.cli.MavenCli.main(MavenCli.java:141) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:290) at org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:230) at org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:409) at org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:352) Caused by: org.apache.maven.plugin.PluginExecutionException: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:110) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:209) ... 19 more Caused by: Compile failed via zinc server at sbt_inc.SbtIncrementalCompiler.zincCompile(SbtIncrementalCompiler.java:136) at sbt_inc.SbtIncrementalCompiler.compile(SbtIncrementalCompiler.java:86) at scala_maven.ScalaCompilerSupport.incrementalCompile(ScalaCompilerSupport.java:303) at scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:119) at scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99) at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:101) ... 20 more [ERROR] [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn -rf :spark-sql_2.11 On Thu, Sep 1, 2016 at 8:23 AM, Divya Gehlot <divya.htco...@gmail.com> wrote: > Which java version are you using ? > > On 31 August 2016 at 04:30, Diwakar Dhanuskodi < > diwakar.dhanusk...@gmail.com> wrote: > >> Hi, >> >> While building Spark 1.6.2 , getting below error in spark-sql. Much >> appreciate for any help. >> >> ERROR] missing or invalid dependency detected while loading class file >> 'WebUI.class'. >> Could not access term eclipse in package org, >> because it (or its dependencies) are missing. Check your build definition >> for >> missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to >> see the problematic classpath.) >> A full rebuild may help if 'WebUI.class' was compiled against an >> incompatible version of org. >> [ERROR] missing or invalid dependency detected while loading class file >> 'WebUI.class'. >> Could not access term jetty in value org.eclipse, >> because it (or its dependencies) are missing. Check your build definition >> for >> missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to >> see the problematic classpath.) >> A full rebuild may help if 'WebUI.class' was compiled against an >> incompatible version of org.eclipse. >> [WARNING] 17 warnings found >> [ERROR] two errors found >> [INFO] >> >> [INFO] Reactor Summary: >> [INFO] >> [INFO] Spark Project Parent POM .. SUCCESS >> [4.399s] >> [INFO] Spark Project Test Tags ... SUCCESS >> [3.443s] >> [INFO] Spark Project Launcher SUCCESS >> [10.131s] >> [INFO] Spark Project Networking .. SUCCESS >> [11.849s] >> [INFO] Spark Project Shuffle Streaming Service ... SUCCESS >> [6.641s] >> [INFO] Spark Project Unsafe .. SUCCESS >> [19.765s]
Spark build 1.6.2 error
Hi, While building Spark 1.6.2 , getting below error in spark-sql. Much appreciate for any help. ERROR] missing or invalid dependency detected while loading class file 'WebUI.class'. Could not access term eclipse in package org, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.) A full rebuild may help if 'WebUI.class' was compiled against an incompatible version of org. [ERROR] missing or invalid dependency detected while loading class file 'WebUI.class'. Could not access term jetty in value org.eclipse, because it (or its dependencies) are missing. Check your build definition for missing or conflicting dependencies. (Re-run with `-Ylog-classpath` to see the problematic classpath.) A full rebuild may help if 'WebUI.class' was compiled against an incompatible version of org.eclipse. [WARNING] 17 warnings found [ERROR] two errors found [INFO] [INFO] Reactor Summary: [INFO] [INFO] Spark Project Parent POM .. SUCCESS [4.399s] [INFO] Spark Project Test Tags ... SUCCESS [3.443s] [INFO] Spark Project Launcher SUCCESS [10.131s] [INFO] Spark Project Networking .. SUCCESS [11.849s] [INFO] Spark Project Shuffle Streaming Service ... SUCCESS [6.641s] [INFO] Spark Project Unsafe .. SUCCESS [19.765s] [INFO] Spark Project Core SUCCESS [4:16.511s] [INFO] Spark Project Bagel ... SUCCESS [13.401s] [INFO] Spark Project GraphX .. SUCCESS [1:08.824s] [INFO] Spark Project Streaming ... SUCCESS [2:18.844s] [INFO] Spark Project Catalyst SUCCESS [2:43.695s] [INFO] Spark Project SQL . FAILURE [1:01.762s] [INFO] Spark Project ML Library .. SKIPPED [INFO] Spark Project Tools ... SKIPPED [INFO] Spark Project Hive SKIPPED [INFO] Spark Project Docker Integration Tests SKIPPED [INFO] Spark Project REPL SKIPPED [INFO] Spark Project YARN Shuffle Service SKIPPED [INFO] Spark Project YARN SKIPPED [INFO] Spark Project Assembly SKIPPED [INFO] Spark Project External Twitter SKIPPED [INFO] Spark Project External Flume Sink . SKIPPED [INFO] Spark Project External Flume .. SKIPPED [INFO] Spark Project External Flume Assembly . SKIPPED [INFO] Spark Project External MQTT ... SKIPPED [INFO] Spark Project External MQTT Assembly .. SKIPPED [INFO] Spark Project External ZeroMQ . SKIPPED [INFO] Spark Project External Kafka .. SKIPPED [INFO] Spark Project Examples SKIPPED [INFO] Spark Project External Kafka Assembly . SKIPPED [INFO] [INFO] BUILD FAILURE [INFO] [INFO] Total time: 12:40.525s [INFO] Finished at: Wed Aug 31 01:56:50 IST 2016 [INFO] Final Memory: 71M/830M [INFO] [ERROR] Failed to execute goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on project spark-sql_2.11: Execution scala-compile-first of goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/PluginExecutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn -rf :spark-sql_2.11
Re: Best way to read XML data from RDD
Below is source code for parsing xml RDD which has single line xml data. import scala.xml.XML import scala.xml.Elem import scala.collection.mutable.ArrayBuffer import scala.xml.Text import scala.xml.Node var dataArray= new ArrayBuffer[String]() def processNode(node: Node, fp1: String):Unit = node match { case Elem(prefix,label,attribs,scope,Text(text)) => dataArray.+=:("Cust.001.001.03-"+fp1+","+text) case _ => for (n <- node.child) { val fp=fp1+"/"+n.label processNode(n, fp) } } val dataDF = xmlData .map { x => val p = XML.loadString(x.get(0).toString.mkString) val xsd = utils.getXSD(p) println("xsd -- ",xsd) val f = "/" + p.label val msgId = (p \\ "Fnd" \ "Mesg" \ "Paid" \ "Record" \ "CustInit" \ "GroupFirst" \ "MesgId").text processNode(p,f,xsd) (mesgId ,utils.dataArray,x.get(1).toString()) } .flatMap{x => val msgId = x._1 val y = x._2.toIterable.map { x1 => (mesgId,x1.split(",").apply(0),x1.split(",").apply(1),x._3) } y }.toDF("key","attribute","value","type") On Mon, Aug 22, 2016 at 4:34 PM, Hyukjin Kwon <gurwls...@gmail.com> wrote: > Do you mind share your codes and sample data? It should be okay with > single XML if I remember this correctly. > > 2016-08-22 19:53 GMT+09:00 Diwakar Dhanuskodi < > diwakar.dhanusk...@gmail.com>: > >> Hi Darin, >> >> Ate you using this utility to parse single line XML? >> >> >> Sent from Samsung Mobile. >> >> >> Original message >> From: Darin McBeath <ddmcbe...@yahoo.com> >> Date:21/08/2016 17:44 (GMT+05:30) >> To: Hyukjin Kwon <gurwls...@gmail.com>, Jörn Franke <jornfra...@gmail.com> >> >> Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, Felix Cheung < >> felixcheun...@hotmail.com>, user <user@spark.apache.org> >> Subject: Re: Best way to read XML data from RDD >> >> Another option would be to look at spark-xml-utils. We use this >> extensively in the manipulation of our XML content. >> >> https://github.com/elsevierlabs-os/spark-xml-utils >> >> >> >> There are quite a few examples. Depending on your preference (and what >> you want to do), you could use xpath, xquery, or xslt to transform, >> extract, or filter. >> >> Like mentioned below, you want to initialize the parser in a >> mapPartitions call (one of the examples shows this). >> >> Hope this is helpful. >> >> Darin. >> >> >> >> >> >> >> From: Hyukjin Kwon <gurwls...@gmail.com> >> To: Jörn Franke <jornfra...@gmail.com> >> Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>; Felix Cheung < >> felixcheun...@hotmail.com>; user <user@spark.apache.org> >> Sent: Sunday, August 21, 2016 6:10 AM >> Subject: Re: Best way to read XML data from RDD >> >> >> >> Hi Diwakar, >> >> Spark XML library can take RDD as source. >> >> ``` >> val df = new XmlReader() >> .withRowTag("book") >> .xmlRdd(sqlContext, rdd) >> ``` >> >> If performance is critical, I would also recommend to take care of >> creation and destruction of the parser. >> >> If the parser is not serializble, then you can do the creation for each >> partition within mapPartition just like >> >> https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a >> 48fed9bb188140423/sql/core/src/main/scala/org/apache/ >> spark/sql/DataFrameReader.scala#L322-L325 >> >> >> I hope this is helpful. >> >> >> >> >> 2016-08-20 15:10 GMT+09:00 Jörn Franke <jornfra...@gmail.com>: >> >> I fear the issue is that this will create and destroy a XML parser object >> 2 mio times, which is very inefficient - it does not really look like a >> parser performance issue. Can't you do something about the format choice? >> Ask your suppl
Re: Best way to read XML data from RDD
Hi Darin, Ate you using this utility to parse single line XML? Sent from Samsung Mobile. Original message From: Darin McBeath <ddmcbe...@yahoo.com> Date:21/08/2016 17:44 (GMT+05:30) To: Hyukjin Kwon <gurwls...@gmail.com>, Jörn Franke <jornfra...@gmail.com> Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, Felix Cheung <felixcheun...@hotmail.com>, user <user@spark.apache.org> Subject: Re: Best way to read XML data from RDD Another option would be to look at spark-xml-utils. We use this extensively in the manipulation of our XML content. https://github.com/elsevierlabs-os/spark-xml-utils There are quite a few examples. Depending on your preference (and what you want to do), you could use xpath, xquery, or xslt to transform, extract, or filter. Like mentioned below, you want to initialize the parser in a mapPartitions call (one of the examples shows this). Hope this is helpful. Darin. From: Hyukjin Kwon <gurwls...@gmail.com> To: Jörn Franke <jornfra...@gmail.com> Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>; Felix Cheung <felixcheun...@hotmail.com>; user <user@spark.apache.org> Sent: Sunday, August 21, 2016 6:10 AM Subject: Re: Best way to read XML data from RDD Hi Diwakar, Spark XML library can take RDD as source. ``` val df = new XmlReader() .withRowTag("book") .xmlRdd(sqlContext, rdd) ``` If performance is critical, I would also recommend to take care of creation and destruction of the parser. If the parser is not serializble, then you can do the creation for each partition within mapPartition just like https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325 I hope this is helpful. 2016-08-20 15:10 GMT+09:00 Jörn Franke <jornfra...@gmail.com>: I fear the issue is that this will create and destroy a XML parser object 2 mio times, which is very inefficient - it does not really look like a parser performance issue. Can't you do something about the format choice? Ask your supplier to deliver another format (ideally avro or sth like this?)? >Otherwise you could just create one XML Parser object / node, but sharing this >among the parallel tasks on the same node is tricky. >The other possibility could be simply more hardware ... > >On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> >wrote: > > >Yes . It accepts a xml file as source but not RDD. The XML data embedded >inside json is streamed from kafka cluster. So I could get it as RDD. >>Right now I am using spark.xml XML.loadstring method inside RDD map >>function but performance wise I am not happy as it takes 4 minutes to >>parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. >> >> >> >> >>Sent from Samsung Mobile. >> >> >> Original message >>From: Felix Cheung <felixcheun...@hotmail.com> >>Date:20/08/2016 09:49 (GMT+05:30) >>To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> , user >><user@spark.apache.org> >>Cc: >>Subject: Re: Best way to read XML data from RDD >> >> >>Have you tried >> >>https://github.com/databricks/ spark-xml >>? >> >> >> >> >> >>On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" >><diwakar.dhanusk...@gmail.com> wrote: >> >> >>Hi, >> >> >>There is a RDD with json data. I could read json data using rdd.read.json . >>The json data has XML data in couple of key-value paris. >> >> >>Which is the best method to read and parse XML from rdd. Is there any >>specific xml libraries for spark. Could anyone help on this. >> >> >>Thanks.
Re: Best way to read XML data from RDD
Hi Franke, Source format cannot be changed as of now add it is a pretty standard format working for years. Yeah creating one parser I can tryout . Sent from Samsung Mobile. Original message From: Jörn Franke <jornfra...@gmail.com> Date:20/08/2016 11:40 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: Felix Cheung <felixcheun...@hotmail.com>, user <user@spark.apache.org> Subject: Re: Best way to read XML data from RDD I fear the issue is that this will create and destroy a XML parser object 2 mio times, which is very inefficient - it does not really look like a parser performance issue. Can't you do something about the format choice? Ask your supplier to deliver another format (ideally avro or sth like this?)? Otherwise you could just create one XML Parser object / node, but sharing this among the parallel tasks on the same node is tricky. The other possibility could be simply more hardware ... On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: Yes . It accepts a xml file as source but not RDD. The XML data embedded inside json is streamed from kafka cluster. So I could get it as RDD. Right now I am using spark.xml XML.loadstring method inside RDD map function but performance wise I am not happy as it takes 4 minutes to parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. Sent from Samsung Mobile. Original message From: Felix Cheung <felixcheun...@hotmail.com> Date:20/08/2016 09:49 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, user <user@spark.apache.org> Cc: Subject: Re: Best way to read XML data from RDD Have you tried https://github.com/databricks/spark-xml ? On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" <diwakar.dhanusk...@gmail.com> wrote: Hi, There is a RDD with json data. I could read json data using rdd.read.json . The json data has XML data in couple of key-value paris. Which is the best method to read and parse XML from rdd. Is there any specific xml libraries for spark. Could anyone help on this. Thanks.
Re: Best way to read XML data from RDD
Hi Kwon, Was trying out spark XML library . I keep on getting errors in inferring schema. Looks like it cannot infer single line XML data. Sent from Samsung Mobile. Original message From: Hyukjin Kwon <gurwls...@gmail.com> Date:21/08/2016 15:40 (GMT+05:30) To: Jörn Franke <jornfra...@gmail.com> Cc: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, Felix Cheung <felixcheun...@hotmail.com>, user <user@spark.apache.org> Subject: Re: Best way to read XML data from RDD Hi Diwakar, Spark XML library can take RDD as source. ``` val df = new XmlReader() .withRowTag("book") .xmlRdd(sqlContext, rdd) ``` If performance is critical, I would also recommend to take care of creation and destruction of the parser. If the parser is not serializble, then you can do the creation for each partition within mapPartition just like https://github.com/apache/spark/blob/ac84fb64dd85257da06f93a48fed9bb188140423/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L322-L325 I hope this is helpful. 2016-08-20 15:10 GMT+09:00 Jörn Franke <jornfra...@gmail.com>: I fear the issue is that this will create and destroy a XML parser object 2 mio times, which is very inefficient - it does not really look like a parser performance issue. Can't you do something about the format choice? Ask your supplier to deliver another format (ideally avro or sth like this?)? Otherwise you could just create one XML Parser object / node, but sharing this among the parallel tasks on the same node is tricky. The other possibility could be simply more hardware ... On 20 Aug 2016, at 06:41, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: Yes . It accepts a xml file as source but not RDD. The XML data embedded inside json is streamed from kafka cluster. So I could get it as RDD. Right now I am using spark.xml XML.loadstring method inside RDD map function but performance wise I am not happy as it takes 4 minutes to parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. Sent from Samsung Mobile. Original message From: Felix Cheung <felixcheun...@hotmail.com> Date:20/08/2016 09:49 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, user <user@spark.apache.org> Cc: Subject: Re: Best way to read XML data from RDD Have you tried https://github.com/databricks/spark-xml ? On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" <diwakar.dhanusk...@gmail.com> wrote: Hi, There is a RDD with json data. I could read json data using rdd.read.json . The json data has XML data in couple of key-value paris. Which is the best method to read and parse XML from rdd. Is there any specific xml libraries for spark. Could anyone help on this. Thanks.
Re: Best way to read XML data from RDD
Yes . It accepts a xml file as source but not RDD. The XML data embedded inside json is streamed from kafka cluster. So I could get it as RDD. Right now I am using spark.xml XML.loadstring method inside RDD map function but performance wise I am not happy as it takes 4 minutes to parse XML from 2 million messages in a 3 nodes 100G 4 cpu each environment. Sent from Samsung Mobile. Original message From: Felix Cheung <felixcheun...@hotmail.com> Date:20/08/2016 09:49 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, user <user@spark.apache.org> Cc: Subject: Re: Best way to read XML data from RDD Have you tried https://github.com/databricks/spark-xml ? On Fri, Aug 19, 2016 at 1:07 PM -0700, "Diwakar Dhanuskodi" <diwakar.dhanusk...@gmail.com> wrote: Hi, There is a RDD with json data. I could read json data using rdd.read.json . The json data has XML data in couple of key-value paris. Which is the best method to read and parse XML from rdd. Is there any specific xml libraries for spark. Could anyone help on this. Thanks.
Best way to read XML data from RDD
Hi, There is a RDD with json data. I could read json data using rdd.read.json . The json data has XML data in couple of key-value paris. Which is the best method to read and parse XML from rdd. Is there any specific xml libraries for spark. Could anyone help on this. Thanks.
Spark streaming
Hi, Is there a way to specify in createDirectStream to receive only last 'n' offsets of a specific topic and partition. I don't want to filter out in foreachRDD. Sent from Samsung Mobile.
createDirectStream parallelism
We are using createDirectStream api to receive messages from 48 partitioned topic. I am setting up --num-executors 48 & --executor-cores 1 in spark-submit All partitions were parallely received and corresponding RDDs in foreachRDD were executed in parallel. But when I join a transformed RDD jsonDF (code below) with another RDD, i could see that they are not executed in parallell for each partitions. There were more shuffle read and writes and no of executors executing were less than the no of partitions. I mean, no of executors were not equal to no of partitions when join is executing. How could I make sure to execute join in all executors. Can anyone provide help. kstream.foreachRDD { rdd => val jsonDF = sqlContext.read.json(rdd).toDF . ... val metaDF = ssc.sparkContext.textfile("file").toDF val join = jsonDF.join(metaDF) join.map ().count }
RE: [Spark 2.0] ClassNotFoundException is thrown when using Hive
Hi Can you cross check by providing same library path in --jars of spark-submit and run . Sent from Samsung Mobile. Original message From: "颜发才(Yan Facai)"Date:18/08/2016 15:17 (GMT+05:30) To: "user.spark" Cc: Subject: [Spark 2.0] ClassNotFoundException is thrown when using Hive Hi, all. I copied hdfs-site.xml, core-site.xml and hive-site.xml to $SPARK_HOME/conf. And spark-submit is used to submit task to yarn, and run as **client** mode. However, ClassNotFoundException is thrown. some details of logs are list below: ``` 16/08/12 17:07:32 INFO hive.HiveUtils: Initializing HiveMetastoreConnection version 0.13.1 using file:/data0/facai/lib/hive-0.13.1/lib:file:/data0/facai/lib/hadoop-2.4.1/share/hadoop 16/08/12 17:07:32 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError: org/apache/hadoop/hive/ql/session/SessionState when creating Hive client using classpath: file:/data0/facai/lib/hive-0.13.1/lib, file:/data0/facai/lib/hadoop-2.4.1/share/hadoop ``` In fact, all the jars needed by hive is in the directory: ```Bash [hadoop@h107713699 spark_test]$ ls /data0/facai/lib/hive-0.13.1/lib/ | grep hive hive-ant-0.13.1.jar hive-beeline-0.13.1.jar hive-cli-0.13.1.jar hive-common-0.13.1.jar ... ``` So, my question is: why spark cannot find the jars needed? Any help will be appreciate, thanks.
Re: KafkaUtils.createStream not picking smallest offset
Not using check pointing now. Source is producing 1.2million messages to topic. We are using zookeeper offsets for other downstreams too. That's the reason going with createstream which stores offsets in zookeeper. Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:12/08/2016 23:42 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com>, user@spark.apache.org Cc: Subject: Re: KafkaUtils.createStream not picking smallest offset Are you checkpointing? Beyond that, why are you using createStream instead of createDirectStream On Fri, Aug 12, 2016 at 12:32 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: > Okay . > I could delete the consumer group in zookeeper and start again to re > use same consumer group name. But this is not working though . Somehow > createstream is picking the offset from some where other than > /consumers/ from zookeeper > > > Sent from Samsung Mobile. > > > > > > > > > Original message > From: Cody Koeninger <c...@koeninger.org> > Date:12/08/2016 18:02 (GMT+05:30) > To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> > Cc: > Subject: Re: KafkaUtils.createStream not picking smallest offset > > Auto offset reset only applies if there aren't offsets available otherwise. > > The old high level consumer stores offsets in zookeeper. > > If you want to make sure you're starting clean, use a new consumer group > I'd. > > On Aug 12, 2016 3:35 AM, "Diwakar Dhanuskodi" <diwakar.dhanusk...@gmail.com> > wrote: >> >> >> Hi, >> We are using spark 1.6.1 and kafka 0.9. >> >> KafkaUtils.createStream is showing strange behaviour. Though >> auto.offset.reset is set to smallest . Whenever we need to restart >> the stream it is picking up the latest offset which is not expected. >> Do we need to set any other properties ?. >> >> createDirectStream works fine in this above case. >> >> >> Sent from Samsung Mobile.
KafkaUtils.createStream not picking smallest offset
Hi, We are using spark 1.6.1 and kafka 0.9. KafkaUtils.createStream is showing strange behaviour. Though auto.offset.reset is set to smallest . Whenever we need to restart the stream it is picking up the latest offset which is not expected. Do we need to set any other properties ?. createDirectStream works fine in this above case. Sent from Samsung Mobile.
Re: Spark streaming not processing messages from partitioned topics
Figured it out. All I am doing wrong is testing it out in pseudo node vm with 1 core. The tasks were hanging out for cpu. In production cluster this works just fine. On Thu, Aug 11, 2016 at 12:45 AM, Diwakar Dhanuskodi < diwakar.dhanusk...@gmail.com> wrote: > Checked executor logs and UI . There is no error message or something like > that. when there is any action , it is waiting . > There are data in partitions. I could use simple-consumer-shell and print > all data in console. Am I doing anything wrong in foreachRDD?. > This just works fine with single partitioned topic, > > On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> zookeeper.connect is irrelevant. >> >> Did you look at your executor logs? >> Did you look at the UI for the (probably failed) stages? >> Are you actually producing data into all of the kafka partitions? >> If you use kafka-simple-consumer-shell.sh to read that partition, do >> you get any data? >> >> On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi >> <diwakar.dhanusk...@gmail.com> wrote: >> > Hi Cody, >> > >> > Just added zookeeper.connect to kafkaparams . It couldn't come out of >> batch >> > window. Other batches are queued. I could see foreach(println) of >> dataFrame >> > printing one of partition's data and not the other. >> > Couldn't see any errors from log. >> > >> > val brokers = "localhost:9092,localhost:9093" >> > val sparkConf = new >> > SparkConf().setAppName("KafkaWeather").setMaster("local[5]") >> //spark://localhost:7077 >> > val sc = new SparkContext(sparkConf) >> > val ssc = new StreamingContext(sc, Seconds(1)) >> > val kafkaParams = Map[String, >> > String]("bootstrap.servers"->"localhost:9093,localhost:9092" >> ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181"," >> group.id"->"xyz") >> > val topics = "test" >> > val topicsSet = topics.split(",").toSet >> > val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, >> > StringDecoder](ssc, kafkaParams, topicsSet) >> > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) >> > import sqlContext.implicits._ >> > messages.foreachRDD(rdd => { >> > if (rdd.isEmpty()) { >> > println("Failed to get data from Kafka. Please check that the Kafka >> > producer is streaming data.") >> > System.exit(-1) >> > } >> > >> >val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF() >> > dataframe.foreach(println) >> > println( "$$$", dataframe.count()) >> > }) >> > Logs: >> > >> > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2 >> > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop >> > library for your platform... using builtin-java classes where applicable >> > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera >> resolves to >> > a loopback address: 127.0.0.1; using 192.168.126.131 instead (on >> interface >> > eth1) >> > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to >> > another address >> > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera >> > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: >> cloudera >> > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication >> > disabled; ui acls disabled; users with view permissions: Set(cloudera); >> > users with modify permissions: Set(cloudera) >> > 16/08/10 18:16:25 INFO Utils: Successfully started service >> 'sparkDriver' on >> > port 45031. >> > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started >> > 16/08/10 18:16:26 INFO Remoting: Starting remoting >> > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on >> addresses >> > :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638] >> > 16/08/10 18:16:26 INFO Utils: Successfully started service >> > 'sparkDriverActorSystem' on port 56638. >> > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker >> > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster >> > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at >> > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee &g
Re: Spark streaming not processing messages from partitioned topics
Checked executor logs and UI . There is no error message or something like that. when there is any action , it is waiting . There are data in partitions. I could use simple-consumer-shell and print all data in console. Am I doing anything wrong in foreachRDD?. This just works fine with single partitioned topic, On Wed, Aug 10, 2016 at 8:41 PM, Cody Koeninger <c...@koeninger.org> wrote: > zookeeper.connect is irrelevant. > > Did you look at your executor logs? > Did you look at the UI for the (probably failed) stages? > Are you actually producing data into all of the kafka partitions? > If you use kafka-simple-consumer-shell.sh to read that partition, do > you get any data? > > On Wed, Aug 10, 2016 at 9:40 AM, Diwakar Dhanuskodi > <diwakar.dhanusk...@gmail.com> wrote: > > Hi Cody, > > > > Just added zookeeper.connect to kafkaparams . It couldn't come out of > batch > > window. Other batches are queued. I could see foreach(println) of > dataFrame > > printing one of partition's data and not the other. > > Couldn't see any errors from log. > > > > val brokers = "localhost:9092,localhost:9093" > > val sparkConf = new > > SparkConf().setAppName("KafkaWeather").setMaster(" > local[5]")//spark://localhost:7077 > > val sc = new SparkContext(sparkConf) > > val ssc = new StreamingContext(sc, Seconds(1)) > > val kafkaParams = Map[String, > > String]("bootstrap.servers"->"localhost:9093,localhost:9092" > ,"auto.offset.reset"->"smallest","zookeeper.connect"->"localhost:2181"," > group.id"->"xyz") > > val topics = "test" > > val topicsSet = topics.split(",").toSet > > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, > > StringDecoder](ssc, kafkaParams, topicsSet) > > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) > > import sqlContext.implicits._ > > messages.foreachRDD(rdd => { > > if (rdd.isEmpty()) { > > println("Failed to get data from Kafka. Please check that the Kafka > > producer is streaming data.") > > System.exit(-1) > > } > > > >val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF() > > dataframe.foreach(println) > > println( "$$$", dataframe.count()) > > }) > > Logs: > > > > 16/08/10 18:16:24 INFO SparkContext: Running Spark version 1.6.2 > > 16/08/10 18:16:24 WARN NativeCodeLoader: Unable to load native-hadoop > > library for your platform... using builtin-java classes where applicable > > 16/08/10 18:16:24 WARN Utils: Your hostname, quickstart.cloudera > resolves to > > a loopback address: 127.0.0.1; using 192.168.126.131 instead (on > interface > > eth1) > > 16/08/10 18:16:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to > > another address > > 16/08/10 18:16:25 INFO SecurityManager: Changing view acls to: cloudera > > 16/08/10 18:16:25 INFO SecurityManager: Changing modify acls to: cloudera > > 16/08/10 18:16:25 INFO SecurityManager: SecurityManager: authentication > > disabled; ui acls disabled; users with view permissions: Set(cloudera); > > users with modify permissions: Set(cloudera) > > 16/08/10 18:16:25 INFO Utils: Successfully started service 'sparkDriver' > on > > port 45031. > > 16/08/10 18:16:26 INFO Slf4jLogger: Slf4jLogger started > > 16/08/10 18:16:26 INFO Remoting: Starting remoting > > 16/08/10 18:16:26 INFO Remoting: Remoting started; listening on addresses > > :[akka.tcp://sparkDriverActorSystem@192.168.126.131:56638] > > 16/08/10 18:16:26 INFO Utils: Successfully started service > > 'sparkDriverActorSystem' on port 56638. > > 16/08/10 18:16:26 INFO SparkEnv: Registering MapOutputTracker > > 16/08/10 18:16:27 INFO SparkEnv: Registering BlockManagerMaster > > 16/08/10 18:16:27 INFO DiskBlockManager: Created local directory at > > /tmp/blockmgr-0f110a7e-1edb-4140-9243-5579a7bc95ee > > 16/08/10 18:16:27 INFO MemoryStore: MemoryStore started with capacity > 511.5 > > MB > > 16/08/10 18:16:27 INFO SparkEnv: Registering OutputCommitCoordinator > > 16/08/10 18:16:27 INFO Utils: Successfully started service 'SparkUI' on > port > > 4040. > > 16/08/10 18:16:27 INFO SparkUI: Started SparkUI at > > http://192.168.126.131:4040 > > 16/08/10 18:16:27 INFO HttpFileServer: HTTP File server directory is > > /tmp/spark-b60f692d-f5ea-44c1-aa21-ae132813828c/httpd- > 2b2a4e68-2952-41b0-a11b-f07860682749 > > 16/08/10 18:16:27 INFO HttpServer: Starting HTTP
Re: Spark streaming not processing messages from partitioned topics
s-e3b08984-7bc4-428c-b214-776fa2bf45d6/spark-streaming-kafka-assembly_2.10-1.6.2.jar to class loader 16/08/10 18:16:33 INFO KafkaRDD: Computing topic test, partition 0 offsets 0 -> 20 16/08/10 18:16:33 INFO VerifiableProperties: Verifying properties 16/08/10 18:16:33 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 16/08/10 18:16:33 INFO VerifiableProperties: Property group.id is overridden to xyz 16/08/10 18:16:33 INFO VerifiableProperties: Property zookeeper.connect is overridden to localhost:2181 16/08/10 18:16:34 INFO JobScheduler: Added jobs for time 1470833194000 ms 16/08/10 18:16:34 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 13366 bytes result sent to driver 16/08/10 18:16:34 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 0) in 2423 ms on localhost (1/2) 16/08/10 18:16:35 INFO JobScheduler: Added jobs for time 1470833195000 ms 16/08/10 18:16:36 INFO JobScheduler: Added jobs for time 1470833196000 ms 16/08/10 18:16:37 INFO JobScheduler: Added jobs for time 1470833197000 ms 16/08/10 18:16:38 INFO JobScheduler: Added jobs for time 1470833198000 ms 16/08/10 18:16:39 INFO JobScheduler: Added jobs for time 1470833199000 ms 16/08/10 18:16:40 INFO JobScheduler: Added jobs for time 147083320 ms 16/08/10 18:16:41 INFO JobScheduler: Added jobs for time 1470833201000 ms 16/08/10 18:16:42 INFO JobScheduler: Added jobs for time 1470833202000 ms 16/08/10 18:16:43 INFO JobScheduler: Added jobs for time 1470833203000 ms 16/08/10 18:16:44 INFO JobScheduler: Added jobs for time 1470833204000 ms 16/08/10 18:16:45 INFO JobScheduler: Added jobs for time 1470833205000 ms 16/08/10 18:16:46 INFO JobScheduler: Added jobs for time 1470833206000 ms 16/08/10 18:16:47 INFO JobScheduler: Added jobs for time 1470833207000 ms 16/08/10 18:16:48 INFO JobScheduler: Added jobs for time 1470833208000 ms 16/08/10 18:16:49 INFO JobScheduler: Added jobs for time 1470833209000 ms 16/08/10 18:16:50 INFO JobScheduler: Added jobs for time 147083321 ms 16/08/10 18:16:51 INFO JobScheduler: Added jobs for time 1470833211000 ms 16/08/10 18:16:52 INFO JobScheduler: Added jobs for time 1470833212000 ms 16/08/10 18:16:53 INFO JobScheduler: Added jobs for time 1470833213000 ms 16/08/10 18:16:54 INFO JobSch On Wed, Aug 10, 2016 at 5:42 PM, Cody Koeninger <c...@koeninger.org> wrote: > Those logs you're posting are from right after your failure, they don't > include what actually went wrong when attempting to read json. Look at your > logs more carefully. > On Aug 10, 2016 2:07 AM, "Diwakar Dhanuskodi" < > diwakar.dhanusk...@gmail.com> wrote: > >> Hi Siva, >> >> With below code, it is stuck up at >> * sqlContext.read.json(rdd.map(_._2)).toDF()* >> There are two partitions in topic. >> I am running spark 1.6.2 >> >> val topics = "topic.name" >> val brokers = "localhost:9092" >> val topicsSet = topics.split(",").toSet >> val sparkConf = new SparkConf().setAppName("KafkaW >> eather").setMaster("local[5]")//spark://localhost:7077 >> val sc = new SparkContext(sparkConf) >> val ssc = new StreamingContext(sc, Seconds(60)) >> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, " >> group.id" -> "xyz","auto.offset.reset"->"smallest") >> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) >> messages.foreachRDD(rdd => { >> if (rdd.isEmpty()) { >> println("Failed to get data from Kafka. Please check that the Kafka >> producer is streaming data.") >> System.exit(-1) >> } >> val sqlContext = org.apache.spark.sql.SQLContex >> t.getOrCreate(rdd.sparkContext) >> *val dataframe = sqlContext.read.json(rdd.map(_._2)).toDF()* >> dataframe.foreach(println) >> >> }) >> >> >> Below are logs, >> >> 16/08/10 12:27:51 INFO DAGScheduler: ResultStage 0 (json at >> todelete.scala:34) failed in 110.776 s >> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already >> stopped! Dropping event SparkListenerStageCompleted(or >> g.apache.spark.scheduler.StageInfo@6d8ff688) >> 16/08/10 12:27:51 ERROR LiveListenerBus: SparkListenerBus has already >> stopped! Dropping event SparkListenerJobEnd(0,14708122 >> 71971,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because >> SparkContext was shut down)) >> 16/08/10 12:27:51 INFO MapOutputTrackerMasterEndpoint: >> MapOutputTrackerMasterEndpoint stopped! >> 16/08/10 12:27:51 INFO MemoryStore: MemoryStore cleared >> 16/08/10 12:27:51 INFO BlockManager: BlockManager stopped >> 16/08/10 12:27
Re: Spark streaming not processing messages from partitioned topics
e 0.0 (TID 0) 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar with timestamp 1470812282187 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/jars/spark-streaming-kafka-assembly_2.10-1.6.2.jar to /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp3730815508296553254.tmp 16/08/10 12:29:01 INFO Executor: Adding file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-streaming-kafka-assembly_2.10-1.6.2.jar to class loader 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar with timestamp 1470812282398 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/jars/spark-assembly-1.6.2-hadoop2.6.0.jar to /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp411333926628523179.tmp 16/08/10 12:29:01 INFO Executor: Adding file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/spark-assembly-1.6.2-hadoop2.6.0.jar to class loader 16/08/10 12:29:01 INFO Executor: Fetching http://192.168.126.131:58957/jars/pain.jar with timestamp 1470812282402 16/08/10 12:29:01 INFO Utils: Fetching http://192.168.126.131:58957/jars/pain.jar to /tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/fetchFileTemp100401525133805542.tmp 16/08/10 12:29:02 INFO Executor: Adding file:/tmp/spark-861074da-9bfb-475c-a21b-fc68e4f05d54/userFiles-56864e74-3ee4-4559-aa89-9dfde5d62a37/pain.jar to class loader 16/08/10 12:29:02 INFO KafkaRDD: Computing topic topic.name, partition 0 offsets 0 -> 8 16/08/10 12:29:02 INFO VerifiableProperties: Verifying properties 16/08/10 12:29:02 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest 16/08/10 12:29:02 INFO VerifiableProperties: Property group.id is overridden to xyz 16/08/10 12:29:02 INFO VerifiableProperties: Property zookeeper.connect is overridden to 16/08/10 12:29:03 INFO Executor: Finished task 1.0 in stage 0.0 (TID 0). 13366 bytes result sent to driver 16/08/10 12:29:03 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 0) in 2380 ms on localhost (1/2) 16/08/10 12:30:00 INFO JobScheduler: Added jobs for time 147081240 ms 16/08/10 12:31:00 INFO JobScheduler: Added jobs for time 147081246 ms 16/08/10 12:32:00 INFO JobScheduler: Added jobs for time 147081252 ms 16/08/10 12:33:00 INFO JobScheduler: Added jobs for time 147081258 ms 16/08/10 12:34:00 INFO JobScheduler: Added jobs for time 147081264 ms On Wed, Aug 10, 2016 at 10:26 AM, Diwakar Dhanuskodi < diwakar.dhanusk...@gmail.com> wrote: > Hi Siva, > > Does topic has partitions? which version of Spark you are using? > > On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com> > wrote: > >> Hi, >> >> Here is a working example I did. >> >> HTH >> >> Regards, >> >> Sivakumaran S >> >> val topics = "test" >> val brokers = "localhost:9092" >> val topicsSet = topics.split(",").toSet >> val sparkConf = new SparkConf().setAppName("KafkaW >> eatherCalc").setMaster("local") //spark://localhost:7077 >> val sc = new SparkContext(sparkConf) >> val ssc = new StreamingContext(sc, Seconds(60)) >> val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) >> val messages = KafkaUtils.createDirectStream[String, String, >> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) >> messages.foreachRDD(rdd => { >> if (rdd.isEmpty()) { >> println("Failed to get data from Kafka. Please check that the Kafka >> producer is streaming data.") >> System.exit(-1) >> } >> val sqlContext = org.apache.spark.sql.SQLContex >> t.getOrCreate(rdd.sparkContext) >> val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF() >> //Process your DF as required here on >> } >> >> >> >> On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi < >> diwakar.dhanusk...@gmail.com> wrote: >> >> Hi, >> >> I am reading json messages from kafka . Topics has 2 partitions. When >> running streaming job using spark-submit, I could see that * val >> dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes indefinitely. >> Am I doing something wrong here. Below is code .This environment is >> cloudera sandbox env. Same issue in hadoop production cluster mode except >> that it is restricted thats why tried to reproduce issue in Cloudera >> sandbox. Kafka 0.10 and Spark 1.4. >> >> val kafkaParams = M
Re: Spark streaming not processing messages from partitioned topics
Hi Siva, Does topic has partitions? which version of Spark you are using? On Wed, Aug 10, 2016 at 2:38 AM, Sivakumaran S <siva.kuma...@me.com> wrote: > Hi, > > Here is a working example I did. > > HTH > > Regards, > > Sivakumaran S > > val topics = "test" > val brokers = "localhost:9092" > val topicsSet = topics.split(",").toSet > val sparkConf = new > SparkConf().setAppName("KafkaWeatherCalc").setMaster("local") > //spark://localhost:7077 > val sc = new SparkContext(sparkConf) > val ssc = new StreamingContext(sc, Seconds(60)) > val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) > val messages = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) > messages.foreachRDD(rdd => { > if (rdd.isEmpty()) { > println("Failed to get data from Kafka. Please check that the Kafka > producer is streaming data.") > System.exit(-1) > } > val sqlContext = org.apache.spark.sql.SQLContext.getOrCreate(rdd. > sparkContext) > val weatherDF = sqlContext.read.json(rdd.map(_._2)).toDF() > //Process your DF as required here on > } > > > > On 09-Aug-2016, at 9:47 PM, Diwakar Dhanuskodi < > diwakar.dhanusk...@gmail.com> wrote: > > Hi, > > I am reading json messages from kafka . Topics has 2 partitions. When > running streaming job using spark-submit, I could see that * val > dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes indefinitely. > Am I doing something wrong here. Below is code .This environment is > cloudera sandbox env. Same issue in hadoop production cluster mode except > that it is restricted thats why tried to reproduce issue in Cloudera > sandbox. Kafka 0.10 and Spark 1.4. > > val kafkaParams = Map[String,String]("bootstrap. > servers"->"localhost:9093,localhost:9092", "group.id" -> > "xyz","auto.offset.reset"->"smallest") > val conf = new SparkConf().setMaster("local[3]").setAppName("topic") > val ssc = new StreamingContext(conf, Seconds(1)) > > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) > > val topics = Set("gpp.minf") > val kafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder,StringDecoder](ssc, kafkaParams, topics) > > kafkaStream.foreachRDD( > rdd => { > if (rdd.count > 0){ >* val dataFrame = sqlContext.read.json(rdd.map(_._2)) * >dataFrame.printSchema() > //dataFrame.foreach(println) > } > } > > >
Re: Spark streaming not processing messages from partitioned topics
It stops working at sqlContext.read.json(rdd.map(_._2)) . Topics without partitions is working fine. Do I need to set any other configs val kafkaParams = Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", " group.id" -> "xyz","auto.offset.reset"->"smallest") Spark version is 1.6.2 kafkaStream.foreachRDD( rdd => { rdd.foreach(println) val dataFrame = sqlContext.read.json(rdd.map(_._2)) dataFrame.foreach(println) } ) On Wed, Aug 10, 2016 at 9:05 AM, Cody Koeninger <c...@koeninger.org> wrote: > No, you don't need a conditional. read.json on an empty rdd will > return an empty dataframe. Foreach on an empty dataframe or an empty > rdd won't do anything (a task will still get run, but it won't do > anything). > > Leave the conditional out. Add one thing at a time to the working > rdd.foreach example and see when it stops working, then take a closer > look at the logs. > > > On Tue, Aug 9, 2016 at 10:20 PM, Diwakar Dhanuskodi > <diwakar.dhanusk...@gmail.com> wrote: > > Hi Cody, > > > > Without conditional . It is going with fine. But any processing inside > > conditional get on to waiting (or) something. > > Facing this issue with partitioned topics. I would need conditional to > skip > > processing when batch is empty. > > kafkaStream.foreachRDD( > > rdd => { > > > >val dataFrame = sqlContext.read.json(rdd.map(_._2)) > >/*if (dataFrame.count() > 0) { > >dataFrame.foreach(println) > >} > >else > >{ > > println("Empty DStream ") > >}*/ > > }) > > > > On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> > >> Take out the conditional and the sqlcontext and just do > >> > >> rdd => { > >> rdd.foreach(println) > >> > >> > >> as a base line to see if you're reading the data you expect > >> > >> On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi > >> <diwakar.dhanusk...@gmail.com> wrote: > >> > Hi, > >> > > >> > I am reading json messages from kafka . Topics has 2 partitions. When > >> > running streaming job using spark-submit, I could see that val > >> > dataFrame = > >> > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing > >> > something wrong here. Below is code .This environment is cloudera > >> > sandbox > >> > env. Same issue in hadoop production cluster mode except that it is > >> > restricted thats why tried to reproduce issue in Cloudera sandbox. > Kafka > >> > 0.10 and Spark 1.4. > >> > > >> > val kafkaParams = > >> > Map[String,String]("bootstrap.servers"->"localhost:9093, > localhost:9092", > >> > "group.id" -> "xyz","auto.offset.reset"->"smallest") > >> > val conf = new SparkConf().setMaster("local[3]").setAppName("topic") > >> > val ssc = new StreamingContext(conf, Seconds(1)) > >> > > >> > val sqlContext = new org.apache.spark.sql. > SQLContext(ssc.sparkContext) > >> > > >> > val topics = Set("gpp.minf") > >> > val kafkaStream = KafkaUtils.createDirectStream[String, String, > >> > StringDecoder,StringDecoder](ssc, kafkaParams, topics) > >> > > >> > kafkaStream.foreachRDD( > >> > rdd => { > >> > if (rdd.count > 0){ > >> > val dataFrame = sqlContext.read.json(rdd.map(_._2)) > >> >dataFrame.printSchema() > >> > //dataFrame.foreach(println) > >> > } > >> > } > > > > >
Re: Spark streaming not processing messages from partitioned topics
Hi Cody, Without conditional . It is going with fine. But any processing inside conditional get on to waiting (or) something. Facing this issue with partitioned topics. I would need conditional to skip processing when batch is empty. kafkaStream.foreachRDD( rdd => { val dataFrame = sqlContext.read.json(rdd.map(_._2)) /*if (dataFrame.count() > 0) { dataFrame.foreach(println) } else { println("Empty DStream ") }*/ }) On Wed, Aug 10, 2016 at 2:35 AM, Cody Koeninger <c...@koeninger.org> wrote: > Take out the conditional and the sqlcontext and just do > > rdd => { > rdd.foreach(println) > > > as a base line to see if you're reading the data you expect > > On Tue, Aug 9, 2016 at 3:47 PM, Diwakar Dhanuskodi > <diwakar.dhanusk...@gmail.com> wrote: > > Hi, > > > > I am reading json messages from kafka . Topics has 2 partitions. When > > running streaming job using spark-submit, I could see that val > dataFrame = > > sqlContext.read.json(rdd.map(_._2)) executes indefinitely. Am I doing > > something wrong here. Below is code .This environment is cloudera sandbox > > env. Same issue in hadoop production cluster mode except that it is > > restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka > > 0.10 and Spark 1.4. > > > > val kafkaParams = > > Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", > > "group.id" -> "xyz","auto.offset.reset"->"smallest") > > val conf = new SparkConf().setMaster("local[3]").setAppName("topic") > > val ssc = new StreamingContext(conf, Seconds(1)) > > > > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) > > > > val topics = Set("gpp.minf") > > val kafkaStream = KafkaUtils.createDirectStream[String, String, > > StringDecoder,StringDecoder](ssc, kafkaParams, topics) > > > > kafkaStream.foreachRDD( > > rdd => { > > if (rdd.count > 0){ > > val dataFrame = sqlContext.read.json(rdd.map(_._2)) > >dataFrame.printSchema() > > //dataFrame.foreach(println) > > } > > } >
Spark streaming not processing messages from partitioned topics
Hi, I am reading json messages from kafka . Topics has 2 partitions. When running streaming job using spark-submit, I could see that * val dataFrame = sqlContext.read.json(rdd.map(_._2)) *executes indefinitely. Am I doing something wrong here. Below is code .This environment is cloudera sandbox env. Same issue in hadoop production cluster mode except that it is restricted thats why tried to reproduce issue in Cloudera sandbox. Kafka 0.10 and Spark 1.4. val kafkaParams = Map[String,String]("bootstrap.servers"->"localhost:9093,localhost:9092", "group.id" -> "xyz","auto.offset.reset"->"smallest") val conf = new SparkConf().setMaster("local[3]").setAppName("topic") val ssc = new StreamingContext(conf, Seconds(1)) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) val topics = Set("gpp.minf") val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder,StringDecoder](ssc, kafkaParams, topics) kafkaStream.foreachRDD( rdd => { if (rdd.count > 0){ * val dataFrame = sqlContext.read.json(rdd.map(_._2)) * dataFrame.printSchema() //dataFrame.foreach(println) } }
Re: Spark streaming takes longer time to read json into dataframes
Okay got it regarding parallelism that you are saying . Yes , We use dataframe to infer schema and process data. The json schema has xml data as one of key value pair. Xml data needs to be processed in foreachRDD. Json schema doesn't change often. Regards, Diwakar Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:19/07/2016 20:49 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: Martin Eden <martineden...@gmail.com>, user <user@spark.apache.org> Subject: Re: Spark streaming takes longer time to read json into dataframes Yes, if you need more parallelism, you need to either add more kafka partitions or shuffle in spark. Do you actually need the dataframe api, or are you just using it as a way to infer the json schema? Inferring the schema is going to require reading through the RDD once before doing any other work. You may be better off defining your schema in advance. On Sun, Jul 17, 2016 at 9:33 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: > Hi, > > Repartition would create shuffle over network which I should avoid > to reduce processing time because the size of messages at most in a batch > will be 5G. > Partitioning topic and parallelize receiving in Direct Stream might do the > trick. > > > Sent from Samsung Mobile. > > > Original message > From: Martin Eden <martineden...@gmail.com> > Date:16/07/2016 14:01 (GMT+05:30) > To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> > Cc: user <user@spark.apache.org> > Subject: Re: Spark streaming takes longer time to read json into dataframes > > Hi, > > I would just do a repartition on the initial direct DStream since otherwise > each RDD in the stream has exactly as many partitions as you have partitions > in the Kafka topic (in your case 1). Like that receiving is still done in > only 1 thread but at least the processing further down is done in parallel. > > If you want to parallelize your receiving as well I would partition my Kafka > topic and then the RDDs in the initial DStream will have as many partitions > as you set in Kafka. > > Have you seen this? > http://spark.apache.org/docs/latest/streaming-kafka-integration.html > > M > > On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi > <diwakar.dhanusk...@gmail.com> wrote: >> >> >> -- Forwarded message -- >> From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> >> Date: Sat, Jul 16, 2016 at 9:30 AM >> Subject: Re: Spark streaming takes longer time to read json into >> dataframes >> To: Jean Georges Perrin <j...@jgp.net> >> >> >> Hello, >> >> I need it on memory. Increased executor memory to 25G and executor cores >> to 3. Got same result. There is always one task running under executor for >> rdd.read.json() because rdd partition size is 1 . Doing hash partitioning >> inside foreachRDD is a good approach? >> >> Regards, >> Diwakar. >> >> On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin <j...@jgp.net> wrote: >>> >>> Do you need it on disk or just push it to memory? Can you try to increase >>> memory or # of cores (I know it sounds basic) >>> >>> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi >>> > <diwakar.dhanusk...@gmail.com> wrote: >>> > >>> > Hello, >>> > >>> > I have 400K json messages pulled from Kafka into spark streaming using >>> > DirectStream approach. Size of 400K messages is around 5G. Kafka topic is >>> > single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to >>> > convert rdd into dataframe. It takes almost 2.3 minutes to convert into >>> > dataframe. >>> > >>> > I am running in Yarn client mode with executor memory as 15G and >>> > executor cores as 2. >>> > >>> > Caching rdd before converting into dataframe doesn't change processing >>> > time. Whether introducing hash partitions inside foreachRDD will help? >>> > (or) >>> > Will partitioning topic and have more than one DirectStream help?. How >>> > can I >>> > approach this situation to reduce time in converting to dataframe.. >>> > >>> > Regards, >>> > Diwakar. >>> >> >> >
Re: Spark streaming takes longer time to read json into dataframes
Okay got that bout parallelism that you are saying . Yes , We use dataframe to infer schema and process data. The json schema has xml data as one of key value pair. Xml data needs to be processed in foreachRDD. Json schema doesn't change often. Regards, Diwakar. Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:19/07/2016 20:49 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: Martin Eden <martineden...@gmail.com>, user <user@spark.apache.org> Subject: Re: Spark streaming takes longer time to read json into dataframes Yes, if you need more parallelism, you need to either add more kafka partitions or shuffle in spark. Do you actually need the dataframe api, or are you just using it as a way to infer the json schema? Inferring the schema is going to require reading through the RDD once before doing any other work. You may be better off defining your schema in advance. On Sun, Jul 17, 2016 at 9:33 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: > Hi, > > Repartition would create shuffle over network which I should avoid > to reduce processing time because the size of messages at most in a batch > will be 5G. > Partitioning topic and parallelize receiving in Direct Stream might do the > trick. > > > Sent from Samsung Mobile. > > > Original message > From: Martin Eden <martineden...@gmail.com> > Date:16/07/2016 14:01 (GMT+05:30) > To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> > Cc: user <user@spark.apache.org> > Subject: Re: Spark streaming takes longer time to read json into dataframes > > Hi, > > I would just do a repartition on the initial direct DStream since otherwise > each RDD in the stream has exactly as many partitions as you have partitions > in the Kafka topic (in your case 1). Like that receiving is still done in > only 1 thread but at least the processing further down is done in parallel. > > If you want to parallelize your receiving as well I would partition my Kafka > topic and then the RDDs in the initial DStream will have as many partitions > as you set in Kafka. > > Have you seen this? > http://spark.apache.org/docs/latest/streaming-kafka-integration.html > > M > > On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi > <diwakar.dhanusk...@gmail.com> wrote: >> >> >> -- Forwarded message -- >> From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> >> Date: Sat, Jul 16, 2016 at 9:30 AM >> Subject: Re: Spark streaming takes longer time to read json into >> dataframes >> To: Jean Georges Perrin <j...@jgp.net> >> >> >> Hello, >> >> I need it on memory. Increased executor memory to 25G and executor cores >> to 3. Got same result. There is always one task running under executor for >> rdd.read.json() because rdd partition size is 1 . Doing hash partitioning >> inside foreachRDD is a good approach? >> >> Regards, >> Diwakar. >> >> On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin <j...@jgp.net> wrote: >>> >>> Do you need it on disk or just push it to memory? Can you try to increase >>> memory or # of cores (I know it sounds basic) >>> >>> > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi >>> > <diwakar.dhanusk...@gmail.com> wrote: >>> > >>> > Hello, >>> > >>> > I have 400K json messages pulled from Kafka into spark streaming using >>> > DirectStream approach. Size of 400K messages is around 5G. Kafka topic is >>> > single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to >>> > convert rdd into dataframe. It takes almost 2.3 minutes to convert into >>> > dataframe. >>> > >>> > I am running in Yarn client mode with executor memory as 15G and >>> > executor cores as 2. >>> > >>> > Caching rdd before converting into dataframe doesn't change processing >>> > time. Whether introducing hash partitions inside foreachRDD will help? >>> > (or) >>> > Will partitioning topic and have more than one DirectStream help?. How >>> > can I >>> > approach this situation to reduce time in converting to dataframe.. >>> > >>> > Regards, >>> > Diwakar. >>> >> >> >
Re: Spark streaming takes longer time to read json into dataframes
Hi, Repartition would create shuffle over network which I should avoid to reduce processing time because the size of messages at most in a batch will be 5G. Partitioning topic and parallelize receiving in Direct Stream might do the trick. Sent from Samsung Mobile. Original message From: Martin Eden <martineden...@gmail.com> Date:16/07/2016 14:01 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user <user@spark.apache.org> Subject: Re: Spark streaming takes longer time to read json into dataframes Hi, I would just do a repartition on the initial direct DStream since otherwise each RDD in the stream has exactly as many partitions as you have partitions in the Kafka topic (in your case 1). Like that receiving is still done in only 1 thread but at least the processing further down is done in parallel. If you want to parallelize your receiving as well I would partition my Kafka topic and then the RDDs in the initial DStream will have as many partitions as you set in Kafka. Have you seen this? http://spark.apache.org/docs/latest/streaming-kafka-integration.html M On Sat, Jul 16, 2016 at 5:26 AM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: -- Forwarded message -- From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Date: Sat, Jul 16, 2016 at 9:30 AM Subject: Re: Spark streaming takes longer time to read json into dataframes To: Jean Georges Perrin <j...@jgp.net> Hello, I need it on memory. Increased executor memory to 25G and executor cores to 3. Got same result. There is always one task running under executor for rdd.read.json() because rdd partition size is 1 . Doing hash partitioning inside foreachRDD is a good approach? Regards, Diwakar. On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin <j...@jgp.net> wrote: Do you need it on disk or just push it to memory? Can you try to increase memory or # of cores (I know it sounds basic) > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi > <diwakar.dhanusk...@gmail.com> wrote: > > Hello, > > I have 400K json messages pulled from Kafka into spark streaming using > DirectStream approach. Size of 400K messages is around 5G. Kafka topic is > single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to > convert rdd into dataframe. It takes almost 2.3 minutes to convert into > dataframe. > > I am running in Yarn client mode with executor memory as 15G and executor > cores as 2. > > Caching rdd before converting into dataframe doesn't change processing time. > Whether introducing hash partitions inside foreachRDD will help? (or) Will > partitioning topic and have more than one DirectStream help?. How can I > approach this situation to reduce time in converting to dataframe.. > > Regards, > Diwakar.
Fwd: Spark streaming takes longer time to read json into dataframes
-- Forwarded message -- From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Date: Sat, Jul 16, 2016 at 9:30 AM Subject: Re: Spark streaming takes longer time to read json into dataframes To: Jean Georges Perrin <j...@jgp.net> Hello, I need it on memory. Increased executor memory to 25G and executor cores to 3. Got same result. There is always one task running under executor for rdd.read.json() because rdd partition size is 1 . Doing hash partitioning inside foreachRDD is a good approach? Regards, Diwakar. On Sat, Jul 16, 2016 at 9:20 AM, Jean Georges Perrin <j...@jgp.net> wrote: > Do you need it on disk or just push it to memory? Can you try to increase > memory or # of cores (I know it sounds basic) > > > On Jul 15, 2016, at 11:43 PM, Diwakar Dhanuskodi < > diwakar.dhanusk...@gmail.com> wrote: > > > > Hello, > > > > I have 400K json messages pulled from Kafka into spark streaming using > DirectStream approach. Size of 400K messages is around 5G. Kafka topic is > single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to > convert rdd into dataframe. It takes almost 2.3 minutes to convert into > dataframe. > > > > I am running in Yarn client mode with executor memory as 15G and > executor cores as 2. > > > > Caching rdd before converting into dataframe doesn't change processing > time. Whether introducing hash partitions inside foreachRDD will help? > (or) Will partitioning topic and have more than one DirectStream help?. How > can I approach this situation to reduce time in converting to dataframe.. > > > > Regards, > > Diwakar. > >
Spark streaming takes longer time to read json into dataframes
Hello, I have 400K json messages pulled from Kafka into spark streaming using DirectStream approach. Size of 400K messages is around 5G. Kafka topic is single partitioned. I am using rdd.read.json(_._2) inside foreachRDD to convert rdd into dataframe. It takes almost 2.3 minutes to convert into dataframe. I am running in Yarn client mode with executor memory as 15G and executor cores as 2. Caching rdd before converting into dataframe doesn't change processing time. Whether introducing hash partitions inside foreachRDD will help? (or) Will partitioning topic and have more than one DirectStream help?. How can I approach this situation to reduce time in converting to dataframe.. Regards, Diwakar.
RE: Fwd: DF creation
Import sqlContext.implicits._ before using df () Sent from Samsung Mobile. Original message From: satyajit vegesnaDate:19/03/2016 06:00 (GMT+05:30) To: user@spark.apache.org, d...@spark.apache.org Cc: Subject: Fwd: DF creation Hi , I am trying to create separate val reference to object DATA (as shown below), case class data(name:String,age:String) Creation of this object is done separately and the reference to the object is stored into val data. i use val samplerdd = sc.parallelize(Seq(data)) , to create RDD. org.apache.spark.rdd.RDD[data] = ParallelCollectionRDD[10] at parallelize at :24 is there a way to create dataframe out of this, without using createDataFrame, and by using toDF() which i was unable to convert.(would like to avoid providing the structtype). Regards, Satyajit.
Re: Spark Submit
Try spark-submit --conf "spark.executor.memory=512m" --conf "spark.executor.extraJavaOptions=x" --conf "Dlog4j.configuration=log4j.xml" Sent from Samsung Mobile. Original message From: Ted YuDate:12/02/2016 21:24 (GMT+05:30) To: Ashish Soni Cc: user Subject: Re: Spark Submit Have you tried specifying multiple '--conf key=value' ? Cheers On Fri, Feb 12, 2016 at 7:44 AM, Ashish Soni wrote: Hi All , How do i pass multiple configuration parameter while spark submit Please help i am trying as below spark-submit --conf "spark.executor.memory=512m spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j.xml" Thanks,
RE: HADOOP_HOME are not set when try to run spark application in yarn cluster mode
It should work which version of spark are you using ?.Try setting it up in program using sparkConf set . Sent from Samsung Mobile. Original message From: rachana.srivast...@thomsonreuters.com Date:10/02/2016 00:47 (GMT+05:30) To: diwakar.dhanusk...@gmail.com, rachana.srivast...@markmonitor.com, user@spark.apache.org Cc: Subject: RE: HADOOP_HOME are not set when try to run spark application in yarn cluster mode Thanks so much Diwakar. spark-submit --class "com.MyClass" \ --files=/usr/lib/hadoop/etc/hadoop/core-site.xml,/usr/lib/hadoop/etc/hadoop/hdfs-site.xml,/usr/lib/hadoop/etc/hadoop/mapred-site.xml,/usr/lib/hadoop/etc/hadoop/ssl-client.xml,/usr/lib/hadoop/etc/hadoop/yarn-site.xml \ --num-executors 2 \ --master yarn-cluster \ I have added all the xml files in the spark-submit but still getting the same error. I see all the Hadoop files logged. 16/02/09 11:07:00 INFO Client: Uploading resource file:/usr/lib/hadoop/etc/hadoop/core-site.xml -> hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1455041341343_0002/core-site.xml 16/02/09 11:07:00 INFO Client: Uploading resource file:/usr/lib/hadoop/etc/hadoop/hdfs-site.xml -> hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1455041341343_0002/hdfs-site.xml 16/02/09 11:07:00 INFO Client: Uploading resource file:/usr/lib/hadoop/etc/hadoop/mapred-site.xml -> hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1455041341343_0002/mapred-site.xml 16/02/09 11:07:00 INFO Client: Uploading resource file:/usr/lib/hadoop/etc/hadoop/ssl-client.xml -> hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1455041341343_0002/ssl-client.xml 16/02/09 11:07:00 INFO Client: Uploading resource file:/usr/lib/hadoop/etc/hadoop/yarn-site.xml -> hdfs://quickstart.cloudera:8020/user/cloudera/.sparkStaging/application_1455041341343_0002/yarn-site.xml From: Diwakar Dhanuskodi [mailto:diwakar.dhanusk...@gmail.com] Sent: Tuesday, February 09, 2016 10:00 AM To: Rachana Srivastava; user@spark.apache.org Subject: RE: HADOOP_HOME are not set when try to run spark application in yarn cluster mode Pass on all hadoop conf files as spark-submit parameters in --files Sent from Samsung Mobile. Original message From: Rachana Srivastava <rachana.srivast...@markmonitor.com> Date:09/02/2016 22:53 (GMT+05:30) To: user@spark.apache.org Cc: Subject: HADOOP_HOME are not set when try to run spark application in yarn cluster mode I am trying to run an application in yarn cluster mode. Spark-Submit with Yarn Cluster Here are setting of the shell script: spark-submit --class "com.Myclass" \ --num-executors 2 \ --executor-cores 2 \ --master yarn \ --supervise \ --deploy-mode cluster \ ../target/ \ My application is working fine in yarn-client and local mode. Excerpt for error when we submit application from spark-submit in yarn cluster mode. &&&&&&&&&&&&&&&&&&&&&& HADOOP HOME correct path logged but still getting the error /usr/lib/hadoop &&&&&&&&&&&&&&&&&&&&&& HADOOP_CONF_DIR /usr/lib/hadoop/etc/hadoop ... Diagnostics: Exception from container-launch. Container id: container_1454984479786_0006_02_01 Exit code: 15 Stack trace: ExitCodeException exitCode=15: at org.apache.hadoop.util.Shell.runCommand(Shell.java:543) at org.apache.hadoop.util.Shell.run(Shell.java:460) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:262) Further I am getting following error ERROR DETAILS FROM YARN LOGS APPLICATIONID INFO : org.apache.spark.deploy.yarn.ApplicationMaster - Registered signal handlers for [TERM, HUP, INT] DEBUG: org.apache.hadoop.util.Shell - Failed to detect a valid hadoop home directory java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set. at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:307) at org.apache.hadoop.util.Shell.(Shell.java:332) at org.apache.hadoop.util.StringUtils.(StringUtils.java:79) at org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:590) at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.newConfiguration(YarnSparkHadoopUtil.scala:62) at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:52) at org.apache.spark.deploy.yarn.YarnSparkH
Re: AM creation in yarn client mode
Your 2nd assumption is correct . There is yarn client which polls AM while running in yarn client mode Sent from Samsung Mobile. Original message From: ayan guhaDate:10/02/2016 10:55 (GMT+05:30) To: praveen S Cc: user Subject: Re: AM creation in yarn client mode It depends on yarn-cluster and yarn-client mode. On Wed, Feb 10, 2016 at 3:42 PM, praveen S wrote: Hi, I have 2 questions when running the spark jobs on yarn in client mode : 1) Where is the AM(application master) created : A) is it created on the client where the job was submitted? i.e driver and AM on the same client? Or B) yarn decides where the the AM should be created? 2) Driver and AM run in different processes : is my assumption correct? Regards, Praveen -- Best Regards, Ayan Guha
RE: HADOOP_HOME are not set when try to run spark application in yarn cluster mode
Pass on all hadoop conf files as spark-submit parameters in --files Sent from Samsung Mobile. Original message From: Rachana SrivastavaDate:09/02/2016 22:53 (GMT+05:30) To: user@spark.apache.org Cc: Subject: HADOOP_HOME are not set when try to run spark application in yarn cluster mode I am trying to run an application in yarn cluster mode. Spark-Submit with Yarn Cluster Here are setting of the shell script: spark-submit --class "com.Myclass" \ --num-executors 2 \ --executor-cores 2 \ --master yarn \ --supervise \ --deploy-mode cluster \ ../target/ \ My application is working fine in yarn-client and local mode. Excerpt for error when we submit application from spark-submit in yarn cluster mode. && HADOOP HOME correct path logged but still getting the error /usr/lib/hadoop && HADOOP_CONF_DIR /usr/lib/hadoop/etc/hadoop ... Diagnostics: Exception from container-launch. Container id: container_1454984479786_0006_02_01 Exit code: 15 Stack trace: ExitCodeException exitCode=15: at org.apache.hadoop.util.Shell.runCommand(Shell.java:543) at org.apache.hadoop.util.Shell.run(Shell.java:460) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82) at java.util.concurrent.FutureTask.run(FutureTask.java:262) Further I am getting following error ERROR DETAILS FROM YARN LOGS APPLICATIONID INFO : org.apache.spark.deploy.yarn.ApplicationMaster - Registered signal handlers for [TERM, HUP, INT] DEBUG: org.apache.hadoop.util.Shell - Failed to detect a valid hadoop home directory java.io.IOException: HADOOP_HOME or hadoop.home.dir are not set. at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:307) at org.apache.hadoop.util.Shell.(Shell.java:332) at org.apache.hadoop.util.StringUtils.(StringUtils.java:79) at org.apache.hadoop.yarn.conf.YarnConfiguration.(YarnConfiguration.java:590) at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.newConfiguration(YarnSparkHadoopUtil.scala:62) at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:52) at org.apache.spark.deploy.yarn.YarnSparkHadoopUtil.(YarnSparkHadoopUtil.scala:47) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:374) at org.apache.spark.deploy.SparkHadoopUtil$.liftedTree1$1(SparkHadoopUtil.scala:386) at org.apache.spark.deploy.SparkHadoopUtil$.yarn$lzycompute(SparkHadoopUtil.scala:384) at org.apache.spark.deploy.SparkHadoopUtil$.yarn(SparkHadoopUtil.scala:384) at org.apache.spark.deploy.SparkHadoopUtil$.get(SparkHadoopUtil.scala:401) at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:623) at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) I tried modifying spark-env.sh like following and I see Hadoop_Home logged but still getting above error: Modified added following entries to spark-env.sh export HADOOP_HOME="/usr/lib/hadoop" echo "&& HADOOP HOME " echo "$HADOOP_HOME" export HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop" echo "&& HADOOP_CONF_DIR " echo "$HADOOP_CONF_DIR"
Re: Kafka directsream receiving rate
Now, using DirectStream I am able to process 2 Million messages from 20 partition topic in a batch interval of 2000ms. Finally figured out that Kafka producer from a source system is sending same topic name instead of key in keyedmessage . It could put messages into all partitions . But I still don't understand why DirectStream couldn't process entire messages when there is single partitioned topic . Let me look closely and update here .Let us close this mail thread. Thanks Cody for bringing insights. Regards Diwakar . Sent from Samsung Mobile. Original message From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Date:07/02/2016 01:39 (GMT+05:30) To: Cody Koeninger <c...@koeninger.org> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate Thanks Cody for trying to understand the issue . Sorry if I am not clear . The scenario is to process all messages at once in single dstream block when source system publishes messages .Source system will publish x messages / 10 minutes once. By events I meant that total no of messages processed by each batch interval ( in my case 2000ms) by executor ( web UI shows each block processing as events) DirectStream is processing only 10 messages per batch. It is same if 100 or 1 million messages published. xyz topic having 20 partitions. I am using kafka producer api to publish messages. Below is the code that I am using { val topics = "xyz" val topicSet = topics.split(",").toSet val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092") val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet) k. foreachRDD { rdd => val dstreamToRDD =rdd.cache () println (current time & dtreamToRDD.partitions.length.) val accTran = dstream To RDD. filter { ...} accTran.map {...} } ssc.start () ssc.awaitTermination } } I tried using DirectStream with map which I had issue with offsetRange . After your suggestion offset issue is resolved when I used above DirectStream code with topic only. spark-submit setting that I am using is in the mail chain below . Is there any bottlebeck I am hitting to process maximum messages at one batch interval using directsream rdd? . If this is not clear . I would take this offline and explain scenario briefly. Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:06/02/2016 22:32 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate I am not at all clear on what you are saying. "Yes , I am printing each messages . It is processing all messages under each dstream block." If it is processing all messages, what is the problem you are having? "The issue is with Directsream processing 10 message per event. " What distinction are you making between a message and an event? "I am expecting Directsream to process 1 million messages" Your first email said you were publishing 100 messages but only processing 10. Why are you now trying to process 1 million messages without understanding what is going on? Make sure you can process a limited number of messages correctly first. The first code examples you posted to the list had some pretty serious errors (ie only trying to process 1 partition, trying to process offsets that didn't exist). Make sure that is all fixed first. To be clear, I use direct kakfa rdds to process batches with like 4gb of messages per partition, you shouldn't be hitting some kind of limit with 1 million messages per batch. You may of course hit executor resource issues depending on what you're trying to do with each message, but that doesn't sound like the case here. If you want help, either clarify what you are saying, or post a minimal reproducible code example, with expected output vs actual output. On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: Cody, Yes , I am printing each messages . It is processing all messages under each dstream block. Source systems are publishing 1 Million messages /4 secs which is less than batch interval. The issue is with Directsream processing 10 message per event. When partitions were increased to 20 in topic, DirectStream picksup only 200 messages ( I guess 10 for each partition ) at a time for processing . I have 16 executors running for streaming ( both yarn client & cluster mode). I am expecting Directsream to process 1 million messages which published in topic < batch interval . Using createStream , It could batch 150K messages and pro
Re: Apache Spark data locality when integrating with Kafka
Fanoos, Where you want the solution to be deployed ?. On premise or cloud? Regards Diwakar . Sent from Samsung Mobile. Original message From: "Yuval.Itzchakov"Date:07/02/2016 19:38 (GMT+05:30) To: user@spark.apache.org Cc: Subject: Re: Apache Spark data locality when integrating with Kafka I would definitely try to avoid hosting Kafka and Spark on the same servers. Kafka and Spark will be doing alot of IO between them, so you'll want to maximize on those resources and not share them on the same server. You'll want each Kafka broker to be on a dedicated server, as well as your spark master and workers. If you're hosting them on Amazon EC2 instances, then you'll want these to be on the same availability zone, so you can benefit from low latency in that same zone. If you're on a dedicated servers, perhaps you'll want to create a VPC between the two clusters so you can, again, benefit from low IO latency and high throughput. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165p26170.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Apache Spark data locality when integrating with Kafka
We are using spark in two ways 1. Yarn with spark support. Kafka running along with data nodes 2. Spark master and workers running with some of Kafka brokers. Data locality is important. Regards Diwakar Sent from Samsung Mobile. Original message From: أنس الليثي <dev.fano...@gmail.com> Date:08/02/2016 02:07 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: "Yuval.Itzchakov" <yuva...@gmail.com>, user <user@spark.apache.org> Subject: Re: Apache Spark data locality when integrating with Kafka Diwakar We have our own servers. We will not use any cloud service like Amazon's On 7 February 2016 at 18:24, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: Fanoos, Where you want the solution to be deployed ?. On premise or cloud? Regards Diwakar . Sent from Samsung Mobile. Original message From: "Yuval.Itzchakov" <yuva...@gmail.com> Date:07/02/2016 19:38 (GMT+05:30) To: user@spark.apache.org Cc: Subject: Re: Apache Spark data locality when integrating with Kafka I would definitely try to avoid hosting Kafka and Spark on the same servers. Kafka and Spark will be doing alot of IO between them, so you'll want to maximize on those resources and not share them on the same server. You'll want each Kafka broker to be on a dedicated server, as well as your spark master and workers. If you're hosting them on Amazon EC2 instances, then you'll want these to be on the same availability zone, so you can benefit from low latency in that same zone. If you're on a dedicated servers, perhaps you'll want to create a VPC between the two clusters so you can, again, benefit from low IO latency and high throughput. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165p26170.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Anas Rabei Senior Software Developer Mubasher.info anas.ra...@mubasher.info
Re: Kafka directsream receiving rate
Thanks Cody for trying to understand the issue . Sorry if I am not clear . The scenario is to process all messages at once in single dstream block when source system publishes messages .Source system will publish x messages / 10 minutes once. By events I meant that total no of messages processed by each batch interval ( in my case 2000ms) by executor ( web UI shows each block processing as events) DirectStream is processing only 10 messages per batch. It is same if 100 or 1 million messages published. xyz topic having 20 partitions. I am using kafka producer api to publish messages. Below is the code that I am using { val topics = "xyz" val topicSet = topics.split(",").toSet val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092") val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet) k. foreachRDD { rdd => val dstreamToRDD =rdd.cache () println (current time & dtreamToRDD.partitions.length.) val accTran = dstream To RDD. filter { ...} accTran.map {...} } ssc.start () ssc.awaitTermination } } I tried using DirectStream with map which I had issue with offsetRange . After your suggestion offset issue is resolved when I used above DirectStream code with topic only. spark-submit setting that I am using is in the mail chain below . Is there any bottlebeck I am hitting to process maximum messages at one batch interval using directsream rdd? . If this is not clear . I would take this offline and explain scenario briefly. Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:06/02/2016 22:32 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate I am not at all clear on what you are saying. "Yes , I am printing each messages . It is processing all messages under each dstream block." If it is processing all messages, what is the problem you are having? "The issue is with Directsream processing 10 message per event. " What distinction are you making between a message and an event? "I am expecting Directsream to process 1 million messages" Your first email said you were publishing 100 messages but only processing 10. Why are you now trying to process 1 million messages without understanding what is going on? Make sure you can process a limited number of messages correctly first. The first code examples you posted to the list had some pretty serious errors (ie only trying to process 1 partition, trying to process offsets that didn't exist). Make sure that is all fixed first. To be clear, I use direct kakfa rdds to process batches with like 4gb of messages per partition, you shouldn't be hitting some kind of limit with 1 million messages per batch. You may of course hit executor resource issues depending on what you're trying to do with each message, but that doesn't sound like the case here. If you want help, either clarify what you are saying, or post a minimal reproducible code example, with expected output vs actual output. On Sat, Feb 6, 2016 at 6:16 AM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: Cody, Yes , I am printing each messages . It is processing all messages under each dstream block. Source systems are publishing 1 Million messages /4 secs which is less than batch interval. The issue is with Directsream processing 10 message per event. When partitions were increased to 20 in topic, DirectStream picksup only 200 messages ( I guess 10 for each partition ) at a time for processing . I have 16 executors running for streaming ( both yarn client & cluster mode). I am expecting Directsream to process 1 million messages which published in topic < batch interval . Using createStream , It could batch 150K messages and process . createStream is better than Directsream in this case . Again why only 150K. Any clarification is much appreciated on directStream processing millions per batch . Sent from Samsung Mobile. Original message ---- From: Cody Koeninger <c...@koeninger.org> Date:06/02/2016 01:30 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate Have you tried just printing each message, to see which ones are being processed? On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: I am able to see no of messages processed per event in sparkstreaming web UI . Also I am counting the messages inside foreachRDD . Removed the settings for backpressure but still the same . Sent from Sams
RE: Apache Spark data locality when integrating with Kafka
Yes . To reduce network latency . Sent from Samsung Mobile. Original message From: fanooosDate:07/02/2016 09:24 (GMT+05:30) To: user@spark.apache.org Cc: Subject: Apache Spark data locality when integrating with Kafka Dears If I will use Kafka as a streaming source to some spark jobs, is it advised to install spark to the same nodes of kafka cluster? What are the benefits and drawbacks of such a decision? regards -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Apache-Spark-data-locality-when-integrating-with-Kafka-tp26165.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka directsream receiving rate
Cody, Yes , I am printing each messages . It is processing all messages under each dstream block. Source systems are publishing 1 Million messages /4 secs which is less than batch interval. The issue is with Directsream processing 10 message per event. When partitions were increased to 20 in topic, DirectStream picksup only 200 messages ( I guess 10 for each partition ) at a time for processing . I have 16 executors running for streaming ( both yarn client & cluster mode). I am expecting Directsream to process 1 million messages which published in topic < batch interval . Using createStream , It could batch 150K messages and process . createStream is better than Directsream in this case . Again why only 150K. Any clarification is much appreciated on directStream processing millions per batch . Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:06/02/2016 01:30 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate Have you tried just printing each message, to see which ones are being processed? On Fri, Feb 5, 2016 at 1:41 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: I am able to see no of messages processed per event in sparkstreaming web UI . Also I am counting the messages inside foreachRDD . Removed the settings for backpressure but still the same . Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:06/02/2016 00:33 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate How are you counting the number of messages? I'd go ahead and remove the settings for backpressure and maxrateperpartition, just to eliminate that as a variable. On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: I am using one directsream. Below is the call to directsream:- val topicSet = topics.split(",").toSet val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092") val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet) When I replace DirectStream call to createStream, all messages were read by one Dstream block.:- val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap ,StorageLevel.MEMORY_ONLY) I am using below spark-submit to execute: ./spark-submit --master yarn-client --conf "spark.dynamicAllocation.enabled=true" --conf "spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" --conf "spark.shuffle.consolidateFiles=true" --conf "spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver --files /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml --jars /root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar /root/Jars/sparkreceiver.jar Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:05/02/2016 22:07 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate If you're using the direct stream, you have 0 receivers. Do you mean you have 1 executor? Can you post the relevant call to createDirectStream from your code, as well as any relevant spark configuration? On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: Adding more info Batch interval is 2000ms. I expect all 100 messages go thru one dstream from directsream but it receives at rate of 10 messages at time. Am I missing some configurations here. Any help appreciated. Regards Diwakar. Sent from Samsung Mobile. Original message From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Date:05/02/2016 07:33 (GMT+05:30) To: user@spark.apache.org Cc: Subject: Kafka directsream receiving rate Hi, Using spark 1.5.1. I have a topic with 20 partitions. When I publish 100 messages. Spark direct stream is receiving 10 messages per dstream. I have only one receiver . When I used createStream the receiver received entire 100 messages at once. Appreciate any help . Regards Diwakar Sent from Samsung Mobile.
Re: Kafka directsream receiving rate
I am using one directsream. Below is the call to directsream:- val topicSet = topics.split(",").toSet val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092") val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet) When I replace DirectStream call to createStream, all messages were read by one Dstream block.:- val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap ,StorageLevel.MEMORY_ONLY) I am using below spark-submit to execute: ./spark-submit --master yarn-client --conf "spark.dynamicAllocation.enabled=true" --conf "spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" --conf "spark.shuffle.consolidateFiles=true" --conf "spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver --files /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml --jars /root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar /root/Jars/sparkreceiver.jar Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:05/02/2016 22:07 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate If you're using the direct stream, you have 0 receivers. Do you mean you have 1 executor? Can you post the relevant call to createDirectStream from your code, as well as any relevant spark configuration? On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: Adding more info Batch interval is 2000ms. I expect all 100 messages go thru one dstream from directsream but it receives at rate of 10 messages at time. Am I missing some configurations here. Any help appreciated. Regards Diwakar. Sent from Samsung Mobile. Original message From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Date:05/02/2016 07:33 (GMT+05:30) To: user@spark.apache.org Cc: Subject: Kafka directsream receiving rate Hi, Using spark 1.5.1. I have a topic with 20 partitions. When I publish 100 messages. Spark direct stream is receiving 10 messages per dstream. I have only one receiver . When I used createStream the receiver received entire 100 messages at once. Appreciate any help . Regards Diwakar Sent from Samsung Mobile.
Re: Kafka directsream receiving rate
I am able to see no of messages processed per event in sparkstreaming web UI . Also I am counting the messages inside foreachRDD . Removed the settings for backpressure but still the same . Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:06/02/2016 00:33 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate How are you counting the number of messages? I'd go ahead and remove the settings for backpressure and maxrateperpartition, just to eliminate that as a variable. On Fri, Feb 5, 2016 at 12:22 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: I am using one directsream. Below is the call to directsream:- val topicSet = topics.split(",").toSet val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092") val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet) When I replace DirectStream call to createStream, all messages were read by one Dstream block.:- val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","resp",topicMap ,StorageLevel.MEMORY_ONLY) I am using below spark-submit to execute: ./spark-submit --master yarn-client --conf "spark.dynamicAllocation.enabled=true" --conf "spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" --conf "spark.shuffle.consolidateFiles=true" --conf "spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver --files /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml --jars /root/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Jars/sparkreceiver.jar /root/Jars/sparkreceiver.jar Sent from Samsung Mobile. Original message From: Cody Koeninger <c...@koeninger.org> Date:05/02/2016 22:07 (GMT+05:30) To: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Cc: user@spark.apache.org Subject: Re: Kafka directsream receiving rate If you're using the direct stream, you have 0 receivers. Do you mean you have 1 executor? Can you post the relevant call to createDirectStream from your code, as well as any relevant spark configuration? On Thu, Feb 4, 2016 at 8:13 PM, Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> wrote: Adding more info Batch interval is 2000ms. I expect all 100 messages go thru one dstream from directsream but it receives at rate of 10 messages at time. Am I missing some configurations here. Any help appreciated. Regards Diwakar. Sent from Samsung Mobile. Original message From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Date:05/02/2016 07:33 (GMT+05:30) To: user@spark.apache.org Cc: Subject: Kafka directsream receiving rate Hi, Using spark 1.5.1. I have a topic with 20 partitions. When I publish 100 messages. Spark direct stream is receiving 10 messages per dstream. I have only one receiver . When I used createStream the receiver received entire 100 messages at once. Appreciate any help . Regards Diwakar Sent from Samsung Mobile.
RE: Kafka directsream receiving rate
Adding more info Batch interval is 2000ms. I expect all 100 messages go thru one dstream from directsream but it receives at rate of 10 messages at time. Am I missing some configurations here. Any help appreciated. Regards Diwakar. Sent from Samsung Mobile. Original message From: Diwakar Dhanuskodi <diwakar.dhanusk...@gmail.com> Date:05/02/2016 07:33 (GMT+05:30) To: user@spark.apache.org Cc: Subject: Kafka directsream receiving rate Hi, Using spark 1.5.1. I have a topic with 20 partitions. When I publish 100 messages. Spark direct stream is receiving 10 messages per dstream. I have only one receiver . When I used createStream the receiver received entire 100 messages at once. Appreciate any help . Regards Diwakar Sent from Samsung Mobile.
kafkaDirectStream usage error
:04:41 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on datanode3.isdp.com:42736 in memory (size: 2.4 KB, free: 530.3 MB) 16/02/04 21:04:41 INFO spark.ContextCleaner: Cleaned accumulator 1 16/02/04 21:04:41 INFO ui.SparkUI: Stopped Spark web UI at http://10.132.117.208:4040 16/02/04 21:04:41 INFO scheduler.DAGScheduler: Stopping DAGScheduler 16/02/04 21:04:41 INFO scheduler.DAGScheduler: ShuffleMapStage 2 (foreachRDD at SparkReceiver.scala:74) failed in 1.512 s 16/02/04 21:04:41 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors 16/02/04 21:04:41 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread 16/02/04 21:04:41 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down 16/02/04 21:04:41 INFO cluster.YarnClientSchedulerBackend: Stopped 16/02/04 21:04:41 WARN spark.ExecutorAllocationManager: No stages are running, but numRunningTasks != 0 16/02/04 21:04:41 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 16/02/04 21:04:41 INFO storage.MemoryStore: MemoryStore cleared 16/02/04 21:04:41 INFO storage.BlockManager: BlockManager stopped 16/02/04 21:04:41 INFO storage.BlockManagerMaster: BlockManagerMaster stopped 16/02/04 21:04:41 INFO spark.SparkContext: Successfully stopped SparkContext 16/02/04 21:04:41 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 16/02/04 21:04:41 INFO util.ShutdownHookManager: Shutdown hook called 16/02/04 21:04:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-2014834b-fb03-4418-99e0-f35171acc67e 16/02/04 21:04:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-9afdd351-d7a6-44cc-a47f-cce3170293e9 16/02/04 21:04:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/02/04 21:04:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. [root@Datanode4 bin]# // val k = KafkaUtils.createStream(ssc, "datanode4.isdp.com:2181","DIME",topicMap ,StorageLevel.MEMORY_ONLY) //Changes for DirectStream - START val brokers = "datanode4.isdp.com:9092"+","+"datanode5.isdp.com:9093" val topicSet = topics.split(",").toSet val kafkaParams = Map[String,String]("bootstrap.servers" -> "datanode4.isdp.com:9092") //when executing DirectStream , comment out createStream statement val k = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topicSet) val topicAndPart = OffsetRange.create("request5",0, 1,10).topicAndPartition() val fromOffsets = Map[kafka.common.TopicAndPartition,Long](topicAndPart->0) val messageHandler = (mmd : MessageAndMetadata[String,String]) => (mmd.key(),mmd.message()) val k1 = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)](ssc, kafkaParams, fromOffsets,messageHandler) ./spark-submit --master yarn-client --conf "spark.dynamicAllocation.enabled=true" --conf "spark.shuffle.service.enabled=true" --conf "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" --conf "spark.shuffle.consolidateFiles=true" --conf "spark.streaming.kafka.maxRatePerPartition=100" --driver-memory 2g --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver --files /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml --jars /root/Downloads/dime/jars/spark-streaming-kafka-assembly_2.10-1.5.1.jar,/root/Desktop/Jars/sparkreceiver.jar /root/Desktop/Jars/sparkreceiver.jar > log.txt ./spark-submit --master yarn --deploy-mode client --num-executors 10 --executor-cores 1 --conf "spark.streaming.kafka.maxRatePerPartition=1000" --conf "spark.streaming.blockInterval=50ms" --conf "spark.ui.showConsoleProgress=false" --conf "spark.sql.tungsten.enabled=false" --conf "spark.sql.codegen=false" --conf "spark.sql.unsafe.enabled=false" --conf "spark.streaming.backpressure.enabled=true" --conf "spark.locality.wait=1s" --conf "spark.shuffle.consolidateFiles=true" --driver-memory 2g --executor-memory 1g --class com.tcs.dime.spark.SparkReceiver --files /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml,/etc/hadoop/conf/mapred-site.xml,/etc/hadoop/conf/yarn-site.xml,/etc/hive/conf/hive-site.xml --jars hdfs://Namenode:8020/user/root/kafka-assembly.jar,hdfs://Namenode:8020/user/root/sparkreceiver.jar /root/Desktop/Jars/sparkreceiver.jar &> log.txt