Is the SparkContext you're using the same one that the StreamingContext
wraps?  If not, I don't think using two is supported.

-Sandy

On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg <jonrgr...@gmail.com> wrote:

> I'm still getting an error.  Here's my code, which works successfully when
> tested using spark-shell:
>
>       val badIPs = sc.textFile("/user/sb/badfullIPs.csv").collect
>       val badIpSet = badIPs.toSet
>       val badIPsBC = sc.broadcast(badIpSet)
>
>
> The job looks OK from my end:
>
> 15/02/07 18:59:58 INFO Client: Application report from ASM:
>
>          application identifier: application_1423081782629_3861
>
>          appId: 3861
>
> *         clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service:  }*
>
>          appDiagnostics:
>
>          appMasterHost: phd40010008.na.com
>
>          appQueue: root.default
>
>          appMasterRpcPort: 0
>
>          appStartTime: 1423353581140
>
> *         yarnAppState: RUNNING*
>
>          distributedFinalState: UNDEFINED
>
>
> But the streaming process never actually begins.  The full log is below,
> scroll to the end for the repeated warning "WARN YarnClusterScheduler:
> Initial job has not accepted any resources; check your cluster UI to ensure
> that workers are registered and have sufficient memory".
>
> I'll note that I have a different Spark Streaming app called "dqd" working
> successfully for a different job that uses only a StreamingContext and not
> an additional SparkContext.  But this app (called "sbStreamingTv") uses
> both a SparkContext and a StreamingContext for grabbing a lookup file in
> HDFS for IP filtering. * The references to line #198 from the log below
> refers to the "val badIPs = sc.textFile("/user/sb/badfullIPs.csv").collect"
> line shown above, and it looks like Spark doesn't get beyond that point in
> the code.*
>
> Also, this job ("sbStreamingTv") does work successfully using yarn-client,
> even with both a SparkContext and StreamingContext.  It looks to me that in
> yarn-cluster mode it's grabbing resources for the StreamingContext but not
> for the SparkContext.
>
> Any ideas?
>
> Jon
>
>
> 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity
> 1177.8 MB.
> 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with
> id = ConnectionManagerId(phd40010008.na.com,30129)
> 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager
> 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager
> phd40010008.na.com:30129 with 1177.8 MB RAM
> 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager
> 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
> 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at
> http://10.229.16.108:35183
> 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is
> /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_000001/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec
> 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server
> 15/02/10 12:06:16 INFO JettyUtils: Adding filter:
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
> 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at
> http://phd40010008.na.com:25869
> 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to
> /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801
> 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler
> 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for
> context org.apache.spark.SparkContext@7f38095d
> 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster
> 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors.
> 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor
> containers, each with 2432 memory
> 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
> Any, priority: 1, capability: <memory:2432, vCores:1>
> 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
> Any, priority: 1, capability: <memory:2432, vCores:1>
> 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host:
> Any, priority: 1, capability: <memory:2432, vCores:1>
> 15/02/10 12:06:20 INFO YarnClusterScheduler:
> YarnClusterScheduler.postStartHook done
> 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir
> will be overridden by the value set by the cluster manager (via
> SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
> 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg
> 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(jg)
> 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started
> 15/02/10 12:06:20 INFO Remoting: Starting remoting
> 15/02/10 12:06:20 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sp...@phd40010008.na.com:43340]
> 15/02/10 12:06:20 INFO Remoting: Remoting now listens on addresses:
> [akka.tcp://sp...@phd40010008.na.com:43340]
> 15/02/10 12:06:20 INFO SparkEnv: Registering MapOutputTracker
> 15/02/10 12:06:20 INFO SparkEnv: Registering BlockManagerMaster
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/1/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-f6e1
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/10/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-583d
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/11/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-0b66
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-bc8f
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/2/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-17e4
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/3/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-c01e
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/4/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-915c
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/5/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-38ff
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/6/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-c92f
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/7/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-b67a
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/8/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-46fb
> 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at
> /hdata/9/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-9d11
> 15/02/10 12:06:20 INFO MemoryStore: MemoryStore started with capacity
> 1177.8 MB.
> 15/02/10 12:06:20 INFO ConnectionManager: Bound socket to port 55944 with
> id = ConnectionManagerId(phd40010008.na.com,55944)
> 15/02/10 12:06:20 INFO BlockManagerMaster: Trying to register BlockManager
> 15/02/10 12:06:20 INFO BlockManagerInfo: Registering block manager
> phd40010008.na.com:55944 with 1177.8 MB RAM
> 15/02/10 12:06:20 INFO BlockManagerMaster: Registered BlockManager
> 15/02/10 12:06:20 INFO HttpFileServer: HTTP File server directory is
> /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_000001/tmp/spark-b3daba9d-f743-4738-b6c2-f56e56813edd
> 15/02/10 12:06:20 INFO HttpServer: Starting HTTP Server
> 15/02/10 12:06:20 INFO JettyUtils: Adding filter:
> org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
> 15/02/10 12:06:20 INFO SparkUI: Started SparkUI at
> http://phd40010008.na.com:10612
> 15/02/10 12:06:20 INFO EventLoggingListener: Logging events to
> /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587980782
> 15/02/10 12:06:20 INFO YarnClusterScheduler: Created YarnClusterScheduler
> 15/02/10 12:06:20 INFO YarnClusterScheduler:
> YarnClusterScheduler.postStartHook done
> 15/02/10 12:06:21 INFO MemoryStore: ensureFreeSpace(253715) called with
> curMem=0, maxMem=1235012812
> 15/02/10 12:06:21 INFO MemoryStore: Block broadcast_0 stored as values to
> memory (estimated size 247.8 KB, free 1177.6 MB)
> 15/02/10 12:06:21 INFO FileInputFormat: Total input paths to process : 1
> 15/02/10 12:06:21 INFO SparkContext: Starting job: collect at
> sbStreamingTv.scala:198
> 15/02/10 12:06:21 INFO DAGScheduler: Got job 0 (collect at
> sbStreamingTv.scala:198) with 2 output partitions (allowLocal=false)
> 15/02/10 12:06:21 INFO DAGScheduler: Final stage: Stage 0(*collect at
> sbStreamingTv.scala:198*)
> 15/02/10 12:06:21 INFO DAGScheduler: Parents of final stage: List()
> 15/02/10 12:06:21 INFO DAGScheduler: Missing parents: List()
> 15/02/10 12:06:21 INFO DAGScheduler: Submitting Stage 0 (*MappedRDD[1] at
> textFile at sbStreamingTv.scala:198*), which has no missing parents
> 15/02/10 12:06:21 INFO DAGScheduler: Submitting 2 missing tasks from Stage
> 0 (*MappedRDD[1] at textFile at sbStreamingTv.scala:198*)
> 15/02/10 12:06:21 INFO YarnClusterScheduler: Adding task set 0.0 with 2
> tasks
> 15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for :
> phd40010024.na.com:8041
> 15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for :
> phd40010002.na.com:8041
> 15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for :
> phd40010022.na.com:8041
> 15/02/10 12:06:21 INFO RackResolver: Resolved phd40010002.na.com to
> /sdc/c4h5
> 15/02/10 12:06:21 INFO RackResolver: Resolved phd40010022.na.com to
> /sdc/c4h5
> 15/02/10 12:06:21 INFO RackResolver: Resolved phd40010024.na.com to
> /sdc/c4h1
> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container
> container_1423081782629_7370_01_000003 for on host phd40010002.na.com
> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching ExecutorRunnable.
> driverUrl: akka.tcp://
> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler,
>  executorHostname: phd40010002.na.com
> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container
> container_1423081782629_7370_01_000004 for on host phd40010022.na.com
> 15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container
> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching ExecutorRunnable.
> driverUrl: akka.tcp://
> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler,
>  executorHostname: phd40010022.na.com
> 15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container
> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container
> container_1423081782629_7370_01_000002 for on host phd40010024.na.com
> 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching ExecutorRunnable.
> driverUrl: akka.tcp://
> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler,
>  executorHostname: phd40010024.na.com
> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy:
> yarn.client.max-nodemanagers-proxies : 500
> 15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container
> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy:
> yarn.client.max-nodemanagers-proxies : 500
> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy:
> yarn.client.max-nodemanagers-proxies : 500
> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up ContainerLaunchContext
> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up ContainerLaunchContext
> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up ContainerLaunchContext
> 15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources
> 15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources
> 15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources
> 15/02/10 12:06:21 INFO ApplicationMaster: All executors have launched.
> 15/02/10 12:06:21 INFO ApplicationMaster: Started progress reporter thread
> - sleep time : 5000
> 15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources
> Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port:
> -1 file:
> "/user/jg/.sparkStaging/application_1423081782629_7370/spark-assembly-1.0.0-cdh5.1.3-hadoop2.3.0-cdh5.1.3.jar"
> } size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE,
> __app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1
> file:
> "/user/jg/.sparkStaging/application_1423081782629_7370/sbStreamingTv-0.0.1-SNAPSHOT-jar-with-dependencies.jar"
> } size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE)
> 15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources
> Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port:
> -1 file:
> "/user/jg/.sparkStaging/application_1423081782629_7370/spark-assembly-1.0.0-cdh5.1.3-hadoop2.3.0-cdh5.1.3.jar"
> } size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE,
> __app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1
> file:
> "/user/jg/.sparkStaging/application_1423081782629_7370/sbStreamingTv-0.0.1-SNAPSHOT-jar-with-dependencies.jar"
> } size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE)
> 15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources
> Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port:
> -1 file:
> "/user/jg/.sparkStaging/application_1423081782629_7370/spark-assembly-1.0.0-cdh5.1.3-hadoop2.3.0-cdh5.1.3.jar"
> } size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE,
> __app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1
> file:
> "/user/jg/.sparkStaging/application_1423081782629_7370/sbStreamingTv-0.0.1-SNAPSHOT-jar-with-dependencies.jar"
> } size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE)
> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with
> commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill
> %p', -Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp,
>  -Dlog4j.configuration=log4j-spark-container.properties,
> org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://
> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 1,
> phd40010002.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with
> commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill
> %p', -Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp,
>  -Dlog4j.configuration=log4j-spark-container.properties,
> org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://
> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 3,
> phd40010024.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
> 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with
> commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill
> %p', -Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp,
>  -Dlog4j.configuration=log4j-spark-container.properties,
> org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://
> sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 2,
> phd40010022.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
> phd40010022.na.com:8041
> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
> phd40010024.na.com:8041
> 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy :
> phd40010002.na.com:8041
> 15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered executor:
> Actor[akka.tcp://
> sparkexecu...@phd40010022.na.com:29369/user/Executor#43651774] with ID 2
> 15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered executor:
> Actor[akka.tcp://
> sparkexecu...@phd40010024.na.com:12969/user/Executor#1711844295] with ID 3
> 15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager
> phd40010022.na.com:14119 with 1178.1 MB RAM
> 15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager
> phd40010024.na.com:53284 with 1178.1 MB RAM
> 15/02/10 12:06:29 INFO CoarseGrainedSchedulerBackend: Registered executor:
> Actor[akka.tcp://
> sparkexecu...@phd40010002.na.com:35547/user/Executor#-1690254909] with ID
> 1
> 15/02/10 12:06:29 INFO BlockManagerInfo: Registering block manager
> phd40010002.na.com:62754 with 1178.1 MB RAM
> 15/02/10 12:06:36 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:06:51 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:07:06 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:07:21 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:07:36 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:07:51 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:08:06 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:08:21 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:08:36 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:08:51 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:09:06 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:09:21 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:09:36 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:09:51 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:10:06 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:10:21 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:10:36 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
> 15/02/10 12:10:51 WARN YarnClusterScheduler: Initial job has not accepted
> any resources; check your cluster UI to ensure that workers are registered
> and have sufficient memory
>
> On Fri, Feb 6, 2015 at 3:24 PM, Sandy Ryza <sandy.r...@cloudera.com>
> wrote:
>
>> You can call collect() to pull in the contents of an RDD into the driver:
>>
>>   val badIPsLines = badIPs.collect()
>>
>> On Fri, Feb 6, 2015 at 12:19 PM, Jon Gregg <jonrgr...@gmail.com> wrote:
>>
>>> OK I tried that, but how do I convert an RDD to a Set that I can then
>>> broadcast and cache?
>>>
>>>       val badIPs = sc.textFile("hdfs:///user/jon/"+ "badfullIPs.csv")
>>>       val badIPsLines = badIPs.getLines
>>>       val badIpSet = badIPsLines.toSet
>>>       val badIPsBC = sc.broadcast(badIpSet)
>>>
>>> produces the error "value getLines is not a member of
>>> org.apache.spark.rdd.RDD[String]".
>>>
>>> Leaving it as an RDD and then constantly joining I think will be too
>>> slow for a streaming job.
>>>
>>> On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza <sandy.r...@cloudera.com>
>>> wrote:
>>>
>>>> Hi Jon,
>>>>
>>>> You'll need to put the file on HDFS (or whatever distributed filesystem
>>>> you're running on) and load it from there.
>>>>
>>>> -Sandy
>>>>
>>>> On Thu, Feb 5, 2015 at 3:18 PM, YaoPau <jonrgr...@gmail.com> wrote:
>>>>
>>>>> I have a file "badFullIPs.csv" of bad IP addresses used for
>>>>> filtering.  In
>>>>> yarn-client mode, I simply read it off the edge node, transform it,
>>>>> and then
>>>>> broadcast it:
>>>>>
>>>>>       val badIPs = fromFile(edgeDir + "badfullIPs.csv")
>>>>>       val badIPsLines = badIPs.getLines
>>>>>       val badIpSet = badIPsLines.toSet
>>>>>       val badIPsBC = sc.broadcast(badIpSet)
>>>>>       badIPs.close
>>>>>
>>>>> How can I accomplish this in yarn-cluster mode?
>>>>>
>>>>> Jon
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to