You are passing a new SparkConf to the StreamingContext, which will cause
the creation of a new SparkContext:

*StreamingContext(conf: **SparkConf*
<https://spark.apache.org/docs/1.4.1/api/scala/org/apache/spark/SparkConf.html>
*, batchDuration: **Duration*
<https://spark.apache.org/docs/1.4.1/api/scala/org/apache/spark/streaming/Duration.html>
*)*

Create a StreamingContext by providing the configuration necessary for a
new SparkContext.

Is there a reason you can not use the existing SparkContext created by
Zeppelin?  Then you can just do something like:

val ssc = new StreamingContext(sc, Milliseconds(SparkStreamingBatchInterval
))

ssc.checkpoint(SparkCheckpointDir)

...

Where "sc" is the Zeppelin provided SparkContext.

-Todd



On Tue, Sep 8, 2015 at 8:11 AM, Sajeevan Achuthan <
achuthan.sajee...@gmail.com> wrote:

> Hi
>   The problem is the Spark is allowing to create two contexts, See the log
> below. Could you please let me know , how to fix this problem?
>
> WARN [2015-09-08 13:09:01,191] ({pool-2-thread-5}
> Logging.scala[logWarning]:92) - Multiple running SparkContexts detected in
> the same JVM!
> org.apache.spark.SparkException: Only one SparkContext may be running in
> this JVM (see SPARK-2243). To ignore this error, set
> spark.driver.allowMultipleContexts = true. The currently running
> SparkContext was created at:
> org.apache.spark.SparkContext.<init>(SparkContext.scala:81)
>
> org.apache.zeppelin.spark.SparkInterpreter.createSparkContext(SparkInterpreter.java:301)
>
> org.apache.zeppelin.spark.SparkInterpreter.getSparkContext(SparkInterpreter.java:146)
> org.apache.zeppelin.spark.SparkInterpreter.open(SparkInterpreter.java:423)
>
> org.apache.zeppelin.interpreter.ClassloaderInterpreter.open(ClassloaderInterpreter.java:73)
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.open(LazyOpenInterpreter.java:68)
>
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:92)
>
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:277)
> org.apache.zeppelin.scheduler.Job.run(Job.java:170)
> org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118)
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
> at
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2083)
> at
> org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:2065)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:2065)
> at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:2151)
> at org.apache.spark.SparkContext.<init>(SparkContext.scala:2023)
> at
> org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:834)
> at
> org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:80)
> at
> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45)
> at
> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
> at
> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
> at
> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
> at
> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:56)
> at
> $line58.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58)
> at $line58.$
> /Saj
>
> On 8 September 2015 at 12:25, Todd Nist <tsind...@gmail.com> wrote:
>
>> I do not see that your importing the following:
>>
>>  import org.apache.spark.sql._
>>
>> Which I believe is where you will find the DataFrame.toDF function.
>>
>> HTH.
>>
>> -Todd
>>
>> On Mon, Sep 7, 2015 at 5:49 PM, Sajeevan Achuthan <
>> achuthan.sajee...@gmail.com> wrote:
>>
>>> Hi Moon,
>>>    Thanks for the reply, I tried that option too. Unfortunately, I tried
>>> that option too and I got the error
>>> data: org.apache.spark.streaming.dstream.DStream[CELL_KPIS] =
>>> org.apache.spark.streaming.dstream.MappedDStream@5f3ea8bb <console>:49:
>>> error: value toDF is not a member of org.apache.spark.rdd.RDD[CELL_KPIS]
>>> accessLogs.toDF.registerTempTable("RAS") ^
>>> Any idea?
>>>
>>> On 7 September 2015 at 17:30, moon soo Lee <m...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> I think you will need to convert RDD to data frame using .toDF(), like
>>>> accessLogs.toDF.registerTempTable("RAS")
>>>>
>>>> Thanks,
>>>> moon
>>>>
>>>> On Mon, Sep 7, 2015 at 3:34 AM Sajeevan Achuthan <
>>>> achuthan.sajee...@gmail.com> wrote:
>>>>
>>>>> Zeppelin, an excellent tool. I am trying to implement a streaming
>>>>> application. I get an error while deploying my application. See my code
>>>>> below
>>>>>
>>>>>
>>>>> import org.apache.spark.SparkContext
>>>>> import org.apache.spark.SparkContext._
>>>>> import org.apache.spark.SparkConf
>>>>> import org.apache.spark.streaming.StreamingContext
>>>>> import org.apache.spark.streaming.Seconds
>>>>> import org.apache.spark.sql.SQLContext
>>>>>   val sparkConf = new
>>>>> SparkConf().setAppName("PEPA").setMaster("local[*]").set("spark.driver.allowMultipleContexts",
>>>>> "true")
>>>>>
>>>>>         import org.apache.spark.streaming.kafka._
>>>>>         val ssc = new StreamingContext(sparkConf, Seconds(2))
>>>>>
>>>>>         ssc.checkpoint("checkpoint")
>>>>>         val topicMap = Map("incoming"->1)
>>>>>
>>>>>         val record = KafkaUtils.createStream(ssc, "localhost", "1",
>>>>> topicMap).map(_._2)
>>>>>          record.print()
>>>>>         case class
>>>>> CELL_KPIS(ECELL_Name:String,CGI:String,Number_of_Times_Interf:Double,TAOF:Double,PHL:Double,NPCCHL:Double,LRSRP:Double,NC:Double)
>>>>>         val data =
>>>>> record.map(s=>s.split(",")).filter(s=>s(0)!="\"ECELL_Name\"").map(
>>>>>             s=>CELL_KPIS(s(0), s(1), s(2).toDouble, s(3).toDouble,
>>>>> s(5).toDouble,s(6).toDouble, s(7).toDouble, s(8).toDouble)
>>>>>         )
>>>>>         data.foreachRDD {accessLogs =>
>>>>>         import sqlContext.implicits._
>>>>>        accessLogs.registerTempTable("RAS")
>>>>>
>>>>>         }
>>>>>         ssc.start()
>>>>>    ssc.awaitTermination()
>>>>>
>>>>> And I get error
>>>>> import org.apache.spark.SparkContext import
>>>>> org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import
>>>>> org.apache.spark.streaming.StreamingContext import
>>>>> org.apache.spark.streaming.Seconds import org.apache.spark.sql.SQLContext
>>>>> sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@2e5779a
>>>>> import org.apache.spark.streaming.kafka._ ssc:
>>>>> org.apache.spark.streaming.StreamingContext =
>>>>> org.apache.spark.streaming.StreamingContext@48621ee1 topicMap:
>>>>> scala.collection.immutable.Map[String,Int] = Map(incoming -> 1) record:
>>>>> org.apache.spark.streaming.dstream.DStream[String] =
>>>>> org.apache.spark.streaming.dstream.MappedDStream@6290e75e defined
>>>>> class CELL_KPIS data: 
>>>>> org.apache.spark.streaming.dstream.DStream[CELL_KPIS]
>>>>> = org.apache.spark.streaming.dstream.MappedDStream@4bda38c3
>>>>>
>>>>> <console>:55: error: value registerTempTable is not a member of
>>>>> org.apache.spark.rdd.RDD[CELL_KPIS] accessLogs.registerTempTable("RAS")
>>>>>
>>>>> *My configuration for Zeppelin*
>>>>>
>>>>>
>>>>> export MASTER=spark://localhost:7077
>>>>> export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_05
>>>>> export ZEPPELIN_PORT=9090
>>>>> export ZEPPELIN_SPARK_CONCURRENTSQL=false
>>>>> export ZEPPELIN_SPARK_USEHIVECONTEXT=false
>>>>> #'export MASTER=local[*]
>>>>> export SPARK_HOME=/home/anauser/spark-1.3/spark-1.3.0-bin-cdh4
>>>>>
>>>>> *Interpreter configuration for spark *
>>>>>
>>>>> "2AW247KM7": { "id": "2AW247KM7", "name": "spark", "group": "spark",
>>>>> "properties": { "spark.cores.max": "", "spark.yarn.jar": "", "master":
>>>>> "local[*]", "zeppelin.spark.maxResult": "1000", "zeppelin.dep.localrepo":
>>>>> "local-repo", "spark.app.name": "APP3", "spark.executor.memory":
>>>>> "5G", "zeppelin.spark.useHiveContext": "false",
>>>>> "spark.driver.allowMultipleContexts": "true", "args": "", "spark.home":
>>>>> "/home/anauser/spark-1.3/spark-1.3.0-bin-cdh4",
>>>>> "zeppelin.spark.concurrentSQL": "true", "zeppelin.pyspark.python": 
>>>>> "python"
>>>>> }, "interpreterGroup": [ { "class":
>>>>> "org.apache.zeppelin.spark.SparkInterpreter", "name": "spark" }, { 
>>>>> "class":
>>>>> "org.apache.zeppelin.spark.PySparkInterpreter", "name": "pyspark" }, {
>>>>> "class": "org.apache.zeppelin.spark.SparkSqlInterpreter", "name": "sql" },
>>>>> { "class": "org.apache.zeppelin.spark.DepInterpreter", "name": "dep" } ],
>>>>> "option": { "remote": true } }
>>>>> Is there any problem in my code or setup ?
>>>>> Any help very much appreciated.
>>>>>
>>>>
>>>
>>
>

Reply via email to