You should be able to replace that second line with

val sc = ssc.sparkContext

On Tue, Feb 10, 2015 at 10:04 AM, Jon Gregg <jonrgr...@gmail.com> wrote:

> They're separate in my code, how can I combine them?  Here's what I have:
>
>       val sparkConf = new SparkConf()
>       val ssc =  new StreamingContext(sparkConf, Seconds(bucketSecs))
>
>       val sc = new SparkContext()
>
> On Tue, Feb 10, 2015 at 1:02 PM, Sandy Ryza <sandy.r...@cloudera.com>
> wrote:
>
>> Is the SparkContext you're using the same one that the StreamingContext
>> wraps?  If not, I don't think using two is supported.
>>
>> -Sandy
>>
>> On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg <jonrgr...@gmail.com> wrote:
>>
>>> I'm still getting an error.  Here's my code, which works successfully
>>> when tested using spark-shell:
>>>
>>>       val badIPs = sc.textFile("/user/sb/badfullIPs.csv").collect
>>>       val badIpSet = badIPs.toSet
>>>       val badIPsBC = sc.broadcast(badIpSet)
>>>
>>>
>>> The job looks OK from my end:
>>>
>>> 15/02/07 18:59:58 INFO Client: Application report from ASM:
>>>
>>>          application identifier: application_1423081782629_3861
>>>
>>>          appId: 3861
>>>
>>> *         clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service:  }*
>>>
>>>          appDiagnostics:
>>>
>>>          appMasterHost: phd40010008.na.com
>>>
>>>          appQueue: root.default
>>>
>>>          appMasterRpcPort: 0
>>>
>>>          appStartTime: 1423353581140
>>>
>>> *         yarnAppState: RUNNING*
>>>
>>>          distributedFinalState: UNDEFINED
>>>
>>>
>>> But the streaming process never actually begins.  The full log is below,
>>> scroll to the end for the repeated warning "WARN YarnClusterScheduler:
>>> Initial job has not accepted any resources; check your cluster UI to ensure
>>> that workers are registered and have sufficient memory".
>>>
>>> I'll note that I have a different Spark Streaming app called "dqd"
>>> working successfully for a different job that uses only a StreamingContext
>>> and not an additional SparkContext.  But this app (called "sbStreamingTv")
>>> uses both a SparkContext and a StreamingContext for grabbing a lookup file
>>> in HDFS for IP filtering. * The references to line #198 from the log
>>> below refers to the "val badIPs =
>>> sc.textFile("/user/sb/badfullIPs.csv").collect" line shown above, and it
>>> looks like Spark doesn't get beyond that point in the code.*
>>>
>>> Also, this job ("sbStreamingTv") does work successfully using
>>> yarn-client, even with both a SparkContext and StreamingContext.  It looks
>>> to me that in yarn-cluster mode it's grabbing resources for the
>>> StreamingContext but not for the SparkContext.
>>>
>>> Any ideas?
>>>
>>> Jon
>>>
>>>
>>> 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity
>>> 1177.8 MB.
>>> 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129
>>> with id = ConnectionManagerId(phd40010008.na.com,30129)
>>> 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register
>>> BlockManager
>>> 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager
>>> phd40010008.na.com:30129 with 1177.8 MB RAM
>>> 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager
>>> 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
>>> 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at
>>> http://10.229.16.108:35183
>>> 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is
>>> /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_000001/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec
>>> 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
>>> 15/02/10 12:06:16 INFO JettyUtils: Adding filter:
>>> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
>>> 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at
>>> http://phd40010008.na.com:25869
>>> 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to
>>> /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801
>>> 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler
>>> 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook
>>> for context org.apache.spark.SparkContext@7f38095d
>>> 15/02/10 12:06:17 INFO ApplicationMaster: Registering the
>>> ApplicationMaster
>>> 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors.
>>> 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor
>>> containers, each with 2432 memory
>>> 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
>>> Any, priority: 1, capability: <memory:2432, vCores:1>
>>> 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
>>> Any, priority: 1, capability: <memory:2432, vCores:1>
>>> 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
>>> Any, priority: 1, capability: <memory:2432, vCores:1>
>>> 15/02/10 12:06:20 INFO YarnClusterScheduler:
>>> YarnClusterScheduler.postStartHook done
>>> 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir
>>> will be overridden by the value set by the cluster manager (via
>>> SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
>>> 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg
>>> 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication
>>> disabled; ui acls disabled; users with view permissions: Set(jg)
>>> 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started
>>> 15/02/10 12:06:20 INFO Remoting: Starting remoting
>>> 15/02/10 12:06:20 INFO Remoting: Remoting started; listening on
>>> addresses :[akka.tcp://sp...@phd40010008.na.com:43340]
>>> 15/02/10 12:06:20 INFO Remoting: Remoting now listens on addresses:
>>> [akka.tcp://sp...@phd40010008.na.com:43340]
>>> 15/02/10 12:06:20 INFO SparkEnv: Registering MapOutputTracker
>>> 15/02/10 12:06:20 INFO SparkEnv: Registering BlockManagerMaster
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/1/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-f6e1
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/10/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-583d
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/11/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-0b66
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-bc8f
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/2/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-17e4
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/3/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-c01e
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/4/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-915c
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/5/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-38ff
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/6/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-c92f
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/7/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-b67a
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/8/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-46fb
>>> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
>>> /hdata/9/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-9d11
>>> 15/02/10 12:06:20 INFO MemoryStore: MemoryStore started with capacity
>>> 1177.8 MB.
>>> 15/02/10 12:06:20 INFO ConnectionManager: Bound socket to port 55944
>>> with id = ConnectionManagerId(phd40010008.na.com,55944)
>>> 15/02/10 12:06:20 INFO BlockManagerMaster: Trying to register
>>> BlockManager
>>> 15/02/10 12:06:20 INFO BlockManagerInfo: Registering block manager
>>> phd40010008.na.com:55944 with 1177.8 MB RAM
>>> 15/02/10 12:06:20 INFO BlockManagerMaster: Registered BlockManager
>>> 15/02/10 12:06:20 INFO HttpFileServer: HTTP File server directory is
>>> /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_000001/tmp/spark-b3daba9d-f743-4738-b6c2-f56e56813edd
>>> 15/02/10 12:06:20 INFO HttpServer: Starting HTTP Server
>>> 15/02/10 12:06:20 INFO JettyUtils: Adding filter:
>>> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
>>> 15/02/10 12:06:20 INFO SparkUI: Started SparkUI at
>>> http://phd40010008.na.com:10612
>>> 15/02/10 12:06:20 INFO EventLoggingListener: Logging events to
>>> /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587980782
>>> 15/02/10 12:06:20 INFO YarnClusterScheduler: Created YarnClusterScheduler
>>> 15/02/10 12:06:20 INFO YarnClusterScheduler:
>>> YarnClusterScheduler.postStartHook done
>>> 15/02/10 12:06:21 INFO MemoryStore: ensureFreeSpace(253715) called with
>>> curMem=0, maxMem=1235012812
>>> 15/02/10 12:06:21 INFO MemoryStore: Block broadcast_0 stored as values
>>> to memory (estimated size 247.8 KB, free 1177.6 MB)
>>> 15/02/10 12:06:21 INFO FileInputFormat: Total input paths to process : 1
>>> 15/02/10 12:06:21 INFO SparkContext: Starting job: collect at
>>> sbStreamingTv.scala:198
>>> 15/02/10 12:06:21 INFO DAGScheduler: Got job 0 (collect at
>>> sbStreamingTv.scala:198) with 2 output partitions (allowLocal=false)
>>> 15/02/10 12:06:21 INFO DAGScheduler: Final stage: Stage 0(*collect at
>>> sbStreamingTv.scala:198*)
>>> 15/02/10 12:06:21 INFO DAGScheduler: Parents of final stage: List()
>>> 15/02/10 12:06:21 INFO DAGScheduler: Missing parents: List()
>>> 15/02/10 12:06:21 INFO DAGScheduler: Submitting Stage 0 (*MappedRDD[1]
>>> at textFile at sbStreamingTv.scala:198*), which has no missing parents
>>> 15/02/10 12:06:21 INFO DAGScheduler: Submitting 2 missing tasks from
>>> Stage 0 (*MappedRDD[1] at textFile at sbStreamingTv.scala:198*)
>>> 15/02/10 12:06:21 INFO YarnClusterScheduler: Adding task set 0.0 with 2
>>> tasks
>>> 15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for :
>>> phd40010024.na.com:8041
>>> 15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for :
>>> phd40010002.na.com:8041
>>> 15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for :
>>> phd40010022.na.com:8041
>>> 15/02/10 12:06:21 INFO RackResolver: Resolved phd40010002.na.com to
>>> /sdc/c4h5
>>> 15/02/10 12:06:21 INFO RackResolver: Resolved phd40010022.na.com to
>>> /sdc/c4h5
>>> 15/02/10 12:06:21 INFO RackResolver: Resolved phd40010024.na.com to
>>> /sdc/c4h1
>>> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container
>>> container_1423081782629_7370_01_000003 for on host phd40010002.na.com
>>> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching
>>> ExecutorRunnable. driverUrl: akka.tcp://
>>> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler,
>>>  executorHostname: phd40010002.na.com
>>> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container
>>> container_1423081782629_7370_01_000004 for on host phd40010022.na.com
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container
>>> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching
>>> ExecutorRunnable. driverUrl: akka.tcp://
>>> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler,
>>>  executorHostname: phd40010022.na.com
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container
>>> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container
>>> container_1423081782629_7370_01_000002 for on host phd40010024.na.com
>>> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching
>>> ExecutorRunnable. driverUrl: akka.tcp://
>>> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler,
>>>  executorHostname: phd40010024.na.com
>>> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy:
>>> yarn.client.max-nodemanagers-proxies : 500
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container
>>> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy:
>>> yarn.client.max-nodemanagers-proxies : 500
>>> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy:
>>> yarn.client.max-nodemanagers-proxies : 500
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up
>>> ContainerLaunchContext
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up
>>> ContainerLaunchContext
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up
>>> ContainerLaunchContext
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources
>>> 15/02/10 12:06:21 INFO ApplicationMaster: All executors have launched.
>>> 15/02/10 12:06:21 INFO ApplicationMaster: Started progress reporter
>>> thread - sleep time : 5000
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources
>>> Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port:
>>> -1 file:
>>> "/user/jg/.sparkStaging/application_1423081782629_7370/spark-assembly-1.0.0-cdh5.1.3-hadoop2.3.0-cdh5.1.3.jar"
>>> } size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE,
>>> __app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1
>>> file:
>>> "/user/jg/.sparkStaging/application_1423081782629_7370/sbStreamingTv-0.0.1-SNAPSHOT-jar-with-dependencies.jar"
>>> } size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE)
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources
>>> Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port:
>>> -1 file:
>>> "/user/jg/.sparkStaging/application_1423081782629_7370/spark-assembly-1.0.0-cdh5.1.3-hadoop2.3.0-cdh5.1.3.jar"
>>> } size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE,
>>> __app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1
>>> file:
>>> "/user/jg/.sparkStaging/application_1423081782629_7370/sbStreamingTv-0.0.1-SNAPSHOT-jar-with-dependencies.jar"
>>> } size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE)
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources
>>> Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port:
>>> -1 file:
>>> "/user/jg/.sparkStaging/application_1423081782629_7370/spark-assembly-1.0.0-cdh5.1.3-hadoop2.3.0-cdh5.1.3.jar"
>>> } size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE,
>>> __app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1
>>> file:
>>> "/user/jg/.sparkStaging/application_1423081782629_7370/sbStreamingTv-0.0.1-SNAPSHOT-jar-with-dependencies.jar"
>>> } size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE)
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with
>>> commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill
>>> %p', -Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp,
>>>  -Dlog4j.configuration=log4j-spark-container.properties,
>>> org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://
>>> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 1,
>>> phd40010002.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with
>>> commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill
>>> %p', -Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp,
>>>  -Dlog4j.configuration=log4j-spark-container.properties,
>>> org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://
>>> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 3,
>>> phd40010024.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
>>> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with
>>> commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill
>>> %p', -Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp,
>>>  -Dlog4j.configuration=log4j-spark-container.properties,
>>> org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://
>>> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 2,
>>> phd40010022.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
>>> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
>>> phd40010022.na.com:8041
>>> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
>>> phd40010024.na.com:8041
>>> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
>>> phd40010002.na.com:8041
>>> 15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered
>>> executor: Actor[akka.tcp://
>>> sparkexecu...@phd40010022.na.com:29369/user/Executor#43651774] with ID 2
>>> 15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered
>>> executor: Actor[akka.tcp://
>>> sparkexecu...@phd40010024.na.com:12969/user/Executor#1711844295] with
>>> ID 3
>>> 15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager
>>> phd40010022.na.com:14119 with 1178.1 MB RAM
>>> 15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager
>>> phd40010024.na.com:53284 with 1178.1 MB RAM
>>> 15/02/10 12:06:29 INFO CoarseGrainedSchedulerBackend: Registered
>>> executor: Actor[akka.tcp://
>>> sparkexecu...@phd40010002.na.com:35547/user/Executor#-1690254909] with
>>> ID 1
>>> 15/02/10 12:06:29 INFO BlockManagerInfo: Registering block manager
>>> phd40010002.na.com:62754 with 1178.1 MB RAM
>>> 15/02/10 12:06:36 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:06:51 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:07:06 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:07:21 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:07:36 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:07:51 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:08:06 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:08:21 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:08:36 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:08:51 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:09:06 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:09:21 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:09:36 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:09:51 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:10:06 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:10:21 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:10:36 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>> 15/02/10 12:10:51 WARN YarnClusterScheduler: Initial job has not
>>> accepted any resources; check your cluster UI to ensure that workers are
>>> registered and have sufficient memory
>>>
>>> On Fri, Feb 6, 2015 at 3:24 PM, Sandy Ryza <sandy.r...@cloudera.com>
>>> wrote:
>>>
>>>> You can call collect() to pull in the contents of an RDD into the
>>>> driver:
>>>>
>>>>   val badIPsLines = badIPs.collect()
>>>>
>>>> On Fri, Feb 6, 2015 at 12:19 PM, Jon Gregg <jonrgr...@gmail.com> wrote:
>>>>
>>>>> OK I tried that, but how do I convert an RDD to a Set that I can then
>>>>> broadcast and cache?
>>>>>
>>>>>       val badIPs = sc.textFile("hdfs:///user/jon/"+ "badfullIPs.csv")
>>>>>       val badIPsLines = badIPs.getLines
>>>>>       val badIpSet = badIPsLines.toSet
>>>>>       val badIPsBC = sc.broadcast(badIpSet)
>>>>>
>>>>> produces the error "value getLines is not a member of
>>>>> org.apache.spark.rdd.RDD[String]".
>>>>>
>>>>> Leaving it as an RDD and then constantly joining I think will be too
>>>>> slow for a streaming job.
>>>>>
>>>>> On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza <sandy.r...@cloudera.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Jon,
>>>>>>
>>>>>> You'll need to put the file on HDFS (or whatever distributed
>>>>>> filesystem you're running on) and load it from there.
>>>>>>
>>>>>> -Sandy
>>>>>>
>>>>>> On Thu, Feb 5, 2015 at 3:18 PM, YaoPau <jonrgr...@gmail.com> wrote:
>>>>>>
>>>>>>> I have a file "badFullIPs.csv" of bad IP addresses used for
>>>>>>> filtering.  In
>>>>>>> yarn-client mode, I simply read it off the edge node, transform it,
>>>>>>> and then
>>>>>>> broadcast it:
>>>>>>>
>>>>>>>       val badIPs = fromFile(edgeDir + "badfullIPs.csv")
>>>>>>>       val badIPsLines = badIPs.getLines
>>>>>>>       val badIpSet = badIPsLines.toSet
>>>>>>>       val badIPsBC = sc.broadcast(badIpSet)
>>>>>>>       badIPs.close
>>>>>>>
>>>>>>> How can I accomplish this in yarn-cluster mode?
>>>>>>>
>>>>>>> Jon
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> View this message in context:
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
>>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>>> Nabble.com.
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to