Is the SparkContext you're using the same one that the StreamingContext wraps? If not, I don't think using two is supported.
-Sandy On Tue, Feb 10, 2015 at 9:58 AM, Jon Gregg <jonrgr...@gmail.com> wrote: > I'm still getting an error. Here's my code, which works successfully when > tested using spark-shell: > > val badIPs = sc.textFile("/user/sb/badfullIPs.csv").collect > val badIpSet = badIPs.toSet > val badIPsBC = sc.broadcast(badIpSet) > > > The job looks OK from my end: > > 15/02/07 18:59:58 INFO Client: Application report from ASM: > > application identifier: application_1423081782629_3861 > > appId: 3861 > > * clientToAMToken: Token { kind: YARN_CLIENT_TOKEN, service: }* > > appDiagnostics: > > appMasterHost: phd40010008.na.com > > appQueue: root.default > > appMasterRpcPort: 0 > > appStartTime: 1423353581140 > > * yarnAppState: RUNNING* > > distributedFinalState: UNDEFINED > > > But the streaming process never actually begins. The full log is below, > scroll to the end for the repeated warning "WARN YarnClusterScheduler: > Initial job has not accepted any resources; check your cluster UI to ensure > that workers are registered and have sufficient memory". > > I'll note that I have a different Spark Streaming app called "dqd" working > successfully for a different job that uses only a StreamingContext and not > an additional SparkContext. But this app (called "sbStreamingTv") uses > both a SparkContext and a StreamingContext for grabbing a lookup file in > HDFS for IP filtering. * The references to line #198 from the log below > refers to the "val badIPs = sc.textFile("/user/sb/badfullIPs.csv").collect" > line shown above, and it looks like Spark doesn't get beyond that point in > the code.* > > Also, this job ("sbStreamingTv") does work successfully using yarn-client, > even with both a SparkContext and StreamingContext. It looks to me that in > yarn-cluster mode it's grabbing resources for the StreamingContext but not > for the SparkContext. > > Any ideas? > > Jon > > > 15/02/10 12:06:16 INFO MemoryStore: MemoryStore started with capacity > 1177.8 MB. > 15/02/10 12:06:16 INFO ConnectionManager: Bound socket to port 30129 with > id = ConnectionManagerId(phd40010008.na.com,30129) > 15/02/10 12:06:16 INFO BlockManagerMaster: Trying to register BlockManager > 15/02/10 12:06:16 INFO BlockManagerInfo: Registering block manager > phd40010008.na.com:30129 with 1177.8 MB RAM > 15/02/10 12:06:16 INFO BlockManagerMaster: Registered BlockManager > 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server > 15/02/10 12:06:16 INFO HttpBroadcast: Broadcast server started at > http://10.229.16.108:35183 > 15/02/10 12:06:16 INFO HttpFileServer: HTTP File server directory is > /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_000001/tmp/spark-b73a964b-4d91-4af3-8246-48da420c1cec > 15/02/10 12:06:16 INFO HttpServer: Starting HTTP Server > 15/02/10 12:06:16 INFO JettyUtils: Adding filter: > org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter > 15/02/10 12:06:16 INFO SparkUI: Started SparkUI at > http://phd40010008.na.com:25869 > 15/02/10 12:06:17 INFO EventLoggingListener: Logging events to > /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587976801 > 15/02/10 12:06:17 INFO YarnClusterScheduler: Created YarnClusterScheduler > 15/02/10 12:06:17 INFO ApplicationMaster$$anon$1: Adding shutdown hook for > context org.apache.spark.SparkContext@7f38095d > 15/02/10 12:06:17 INFO ApplicationMaster: Registering the ApplicationMaster > 15/02/10 12:06:17 INFO ApplicationMaster: Allocating 3 executors. > 15/02/10 12:06:17 INFO YarnAllocationHandler: Will Allocate 3 executor > containers, each with 2432 memory > 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: > Any, priority: 1, capability: <memory:2432, vCores:1> > 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: > Any, priority: 1, capability: <memory:2432, vCores:1> > 15/02/10 12:06:17 INFO YarnAllocationHandler: Container request (host: > Any, priority: 1, capability: <memory:2432, vCores:1> > 15/02/10 12:06:20 INFO YarnClusterScheduler: > YarnClusterScheduler.postStartHook done > 15/02/10 12:06:20 WARN SparkConf: In Spark 1.0 and later spark.local.dir > will be overridden by the value set by the cluster manager (via > SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN). > 15/02/10 12:06:20 INFO SecurityManager: Changing view acls to: jg > 15/02/10 12:06:20 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(jg) > 15/02/10 12:06:20 INFO Slf4jLogger: Slf4jLogger started > 15/02/10 12:06:20 INFO Remoting: Starting remoting > 15/02/10 12:06:20 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sp...@phd40010008.na.com:43340] > 15/02/10 12:06:20 INFO Remoting: Remoting now listens on addresses: > [akka.tcp://sp...@phd40010008.na.com:43340] > 15/02/10 12:06:20 INFO SparkEnv: Registering MapOutputTracker > 15/02/10 12:06:20 INFO SparkEnv: Registering BlockManagerMaster > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/1/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-f6e1 > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/10/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-583d > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/11/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-0b66 > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-bc8f > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/2/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-17e4 > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/3/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-c01e > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/4/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-915c > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/5/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-38ff > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/6/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-c92f > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/7/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-b67a > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/8/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-46fb > 15/02/10 12:06:20 INFO DiskBlockManager: Created local directory at > /hdata/9/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/spark-local-20150210120620-9d11 > 15/02/10 12:06:20 INFO MemoryStore: MemoryStore started with capacity > 1177.8 MB. > 15/02/10 12:06:20 INFO ConnectionManager: Bound socket to port 55944 with > id = ConnectionManagerId(phd40010008.na.com,55944) > 15/02/10 12:06:20 INFO BlockManagerMaster: Trying to register BlockManager > 15/02/10 12:06:20 INFO BlockManagerInfo: Registering block manager > phd40010008.na.com:55944 with 1177.8 MB RAM > 15/02/10 12:06:20 INFO BlockManagerMaster: Registered BlockManager > 15/02/10 12:06:20 INFO HttpFileServer: HTTP File server directory is > /hdata/12/yarn/nm/usercache/jg/appcache/application_1423081782629_7370/container_1423081782629_7370_01_000001/tmp/spark-b3daba9d-f743-4738-b6c2-f56e56813edd > 15/02/10 12:06:20 INFO HttpServer: Starting HTTP Server > 15/02/10 12:06:20 INFO JettyUtils: Adding filter: > org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter > 15/02/10 12:06:20 INFO SparkUI: Started SparkUI at > http://phd40010008.na.com:10612 > 15/02/10 12:06:20 INFO EventLoggingListener: Logging events to > /user/spark/applicationHistory/com.na.scalaspark.sbstreamingtv-1423587980782 > 15/02/10 12:06:20 INFO YarnClusterScheduler: Created YarnClusterScheduler > 15/02/10 12:06:20 INFO YarnClusterScheduler: > YarnClusterScheduler.postStartHook done > 15/02/10 12:06:21 INFO MemoryStore: ensureFreeSpace(253715) called with > curMem=0, maxMem=1235012812 > 15/02/10 12:06:21 INFO MemoryStore: Block broadcast_0 stored as values to > memory (estimated size 247.8 KB, free 1177.6 MB) > 15/02/10 12:06:21 INFO FileInputFormat: Total input paths to process : 1 > 15/02/10 12:06:21 INFO SparkContext: Starting job: collect at > sbStreamingTv.scala:198 > 15/02/10 12:06:21 INFO DAGScheduler: Got job 0 (collect at > sbStreamingTv.scala:198) with 2 output partitions (allowLocal=false) > 15/02/10 12:06:21 INFO DAGScheduler: Final stage: Stage 0(*collect at > sbStreamingTv.scala:198*) > 15/02/10 12:06:21 INFO DAGScheduler: Parents of final stage: List() > 15/02/10 12:06:21 INFO DAGScheduler: Missing parents: List() > 15/02/10 12:06:21 INFO DAGScheduler: Submitting Stage 0 (*MappedRDD[1] at > textFile at sbStreamingTv.scala:198*), which has no missing parents > 15/02/10 12:06:21 INFO DAGScheduler: Submitting 2 missing tasks from Stage > 0 (*MappedRDD[1] at textFile at sbStreamingTv.scala:198*) > 15/02/10 12:06:21 INFO YarnClusterScheduler: Adding task set 0.0 with 2 > tasks > 15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for : > phd40010024.na.com:8041 > 15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for : > phd40010002.na.com:8041 > 15/02/10 12:06:21 INFO AMRMClientImpl: Received new token for : > phd40010022.na.com:8041 > 15/02/10 12:06:21 INFO RackResolver: Resolved phd40010002.na.com to > /sdc/c4h5 > 15/02/10 12:06:21 INFO RackResolver: Resolved phd40010022.na.com to > /sdc/c4h5 > 15/02/10 12:06:21 INFO RackResolver: Resolved phd40010024.na.com to > /sdc/c4h1 > 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container > container_1423081782629_7370_01_000003 for on host phd40010002.na.com > 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching ExecutorRunnable. > driverUrl: akka.tcp:// > sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, > executorHostname: phd40010002.na.com > 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container > container_1423081782629_7370_01_000004 for on host phd40010022.na.com > 15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container > 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching ExecutorRunnable. > driverUrl: akka.tcp:// > sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, > executorHostname: phd40010022.na.com > 15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container > 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching container > container_1423081782629_7370_01_000002 for on host phd40010024.na.com > 15/02/10 12:06:21 INFO YarnAllocationHandler: Launching ExecutorRunnable. > driverUrl: akka.tcp:// > sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, > executorHostname: phd40010024.na.com > 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: > yarn.client.max-nodemanagers-proxies : 500 > 15/02/10 12:06:21 INFO ExecutorRunnable: Starting Executor Container > 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: > yarn.client.max-nodemanagers-proxies : 500 > 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: > yarn.client.max-nodemanagers-proxies : 500 > 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up ContainerLaunchContext > 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up ContainerLaunchContext > 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up ContainerLaunchContext > 15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources > 15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources > 15/02/10 12:06:21 INFO ExecutorRunnable: Preparing Local resources > 15/02/10 12:06:21 INFO ApplicationMaster: All executors have launched. > 15/02/10 12:06:21 INFO ApplicationMaster: Started progress reporter thread > - sleep time : 5000 > 15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources > Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: > -1 file: > "/user/jg/.sparkStaging/application_1423081782629_7370/spark-assembly-1.0.0-cdh5.1.3-hadoop2.3.0-cdh5.1.3.jar" > } size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE, > __app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1 > file: > "/user/jg/.sparkStaging/application_1423081782629_7370/sbStreamingTv-0.0.1-SNAPSHOT-jar-with-dependencies.jar" > } size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE) > 15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources > Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: > -1 file: > "/user/jg/.sparkStaging/application_1423081782629_7370/spark-assembly-1.0.0-cdh5.1.3-hadoop2.3.0-cdh5.1.3.jar" > } size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE, > __app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1 > file: > "/user/jg/.sparkStaging/application_1423081782629_7370/sbStreamingTv-0.0.1-SNAPSHOT-jar-with-dependencies.jar" > } size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE) > 15/02/10 12:06:21 INFO ExecutorRunnable: Prepared Local resources > Map(__spark__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: > -1 file: > "/user/jg/.sparkStaging/application_1423081782629_7370/spark-assembly-1.0.0-cdh5.1.3-hadoop2.3.0-cdh5.1.3.jar" > } size: 93542713 timestamp: 1423587960750 type: FILE visibility: PRIVATE, > __app__.jar -> resource { scheme: "hdfs" host: "nameservice1" port: -1 > file: > "/user/jg/.sparkStaging/application_1423081782629_7370/sbStreamingTv-0.0.1-SNAPSHOT-jar-with-dependencies.jar" > } size: 95950353 timestamp: 1423587960370 type: FILE visibility: PRIVATE) > 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with > commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill > %p', -Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp, > -Dlog4j.configuration=log4j-spark-container.properties, > org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp:// > sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 1, > phd40010002.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr) > 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with > commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill > %p', -Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp, > -Dlog4j.configuration=log4j-spark-container.properties, > org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp:// > sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 3, > phd40010024.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr) > 15/02/10 12:06:21 INFO ExecutorRunnable: Setting up executor with > commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill > %p', -Xms2048m -Xmx2048m , -Djava.io.tmpdir=$PWD/tmp, > -Dlog4j.configuration=log4j-spark-container.properties, > org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp:// > sp...@phd40010008.na.com:58240/user/CoarseGrainedScheduler, 2, > phd40010022.na.com, 1, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr) > 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy : > phd40010022.na.com:8041 > 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy : > phd40010024.na.com:8041 > 15/02/10 12:06:21 INFO ContainerManagementProtocolProxy: Opening proxy : > phd40010002.na.com:8041 > 15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered executor: > Actor[akka.tcp:// > sparkexecu...@phd40010022.na.com:29369/user/Executor#43651774] with ID 2 > 15/02/10 12:06:26 INFO CoarseGrainedSchedulerBackend: Registered executor: > Actor[akka.tcp:// > sparkexecu...@phd40010024.na.com:12969/user/Executor#1711844295] with ID 3 > 15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager > phd40010022.na.com:14119 with 1178.1 MB RAM > 15/02/10 12:06:26 INFO BlockManagerInfo: Registering block manager > phd40010024.na.com:53284 with 1178.1 MB RAM > 15/02/10 12:06:29 INFO CoarseGrainedSchedulerBackend: Registered executor: > Actor[akka.tcp:// > sparkexecu...@phd40010002.na.com:35547/user/Executor#-1690254909] with ID > 1 > 15/02/10 12:06:29 INFO BlockManagerInfo: Registering block manager > phd40010002.na.com:62754 with 1178.1 MB RAM > 15/02/10 12:06:36 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:06:51 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:07:06 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:07:21 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:07:36 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:07:51 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:08:06 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:08:21 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:08:36 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:08:51 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:09:06 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:09:21 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:09:36 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:09:51 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:10:06 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:10:21 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:10:36 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > 15/02/10 12:10:51 WARN YarnClusterScheduler: Initial job has not accepted > any resources; check your cluster UI to ensure that workers are registered > and have sufficient memory > > On Fri, Feb 6, 2015 at 3:24 PM, Sandy Ryza <sandy.r...@cloudera.com> > wrote: > >> You can call collect() to pull in the contents of an RDD into the driver: >> >> val badIPsLines = badIPs.collect() >> >> On Fri, Feb 6, 2015 at 12:19 PM, Jon Gregg <jonrgr...@gmail.com> wrote: >> >>> OK I tried that, but how do I convert an RDD to a Set that I can then >>> broadcast and cache? >>> >>> val badIPs = sc.textFile("hdfs:///user/jon/"+ "badfullIPs.csv") >>> val badIPsLines = badIPs.getLines >>> val badIpSet = badIPsLines.toSet >>> val badIPsBC = sc.broadcast(badIpSet) >>> >>> produces the error "value getLines is not a member of >>> org.apache.spark.rdd.RDD[String]". >>> >>> Leaving it as an RDD and then constantly joining I think will be too >>> slow for a streaming job. >>> >>> On Thu, Feb 5, 2015 at 8:06 PM, Sandy Ryza <sandy.r...@cloudera.com> >>> wrote: >>> >>>> Hi Jon, >>>> >>>> You'll need to put the file on HDFS (or whatever distributed filesystem >>>> you're running on) and load it from there. >>>> >>>> -Sandy >>>> >>>> On Thu, Feb 5, 2015 at 3:18 PM, YaoPau <jonrgr...@gmail.com> wrote: >>>> >>>>> I have a file "badFullIPs.csv" of bad IP addresses used for >>>>> filtering. In >>>>> yarn-client mode, I simply read it off the edge node, transform it, >>>>> and then >>>>> broadcast it: >>>>> >>>>> val badIPs = fromFile(edgeDir + "badfullIPs.csv") >>>>> val badIPsLines = badIPs.getLines >>>>> val badIpSet = badIPsLines.toSet >>>>> val badIPsBC = sc.broadcast(badIpSet) >>>>> badIPs.close >>>>> >>>>> How can I accomplish this in yarn-cluster mode? >>>>> >>>>> Jon >>>>> >>>>> >>>>> >>>>> -- >>>>> View this message in context: >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-broadcast-a-variable-read-from-a-file-in-yarn-cluster-mode-tp21524.html >>>>> Sent from the Apache Spark User List mailing list archive at >>>>> Nabble.com. >>>>> >>>>> --------------------------------------------------------------------- >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>>> >>> >> >