Hi Jack, 1. Several previous instances of "key not valid?" error had been attributed to memory issues, either memory allocated per executor or per task, depending on the context. You can google it to see some examples. 2. I think your case is similar, even though its happening due to broadcast. I suspect specifically, this line "14/07/02 18:20:09 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0" after the driver commanded a shutdown. It's happening only for TorrentBroadcast, because HttpBroadcast does not store intermediate chunks of a broadcast in memory. 3. You might want to allocate more memory to Spark executors to take advantage of in-memory processing. ~300MB caching space per machine is likely to be too small for most jobs. 4. Another common cause of disconnection is the spark.akka.frameSize parameter. You can try playing with it. While you don't enough memory to crank it up, you can try moving it up and down within reason. 5. There is one more curious line in your trace: "14/07/02 18:20:06 INFO BlockManager: Removing broadcast 0" Nothing should've been there to remove in the first place. 6. Finally, we found in our benchmarks that using TorrentBroadcast in smaller clusters (<10) and small data size (<10MB) has no benefit over HttpBroadcast, and often worse. I'd suggest sticking to HttpBroadcast unless you have gigantic broadcast (>=1GB) or too many nodes (many 10s or 100s).
Hope it helps, Mosharaf -- Mosharaf Chowdhury http://www.mosharaf.com/ On Thu, Jul 3, 2014 at 7:48 AM, jackxucs <jackx...@gmail.com> wrote: > Hello, > > I am running the BroadcastTest example in a standalone cluster using > spark-submit. I have 8 host machines and made Host1 the master. Host2 to > Host8 act as 7 workers to connect to the master. The connection was fine as > I could see all 7 hosts on the master web ui. The BroadcastTest example > with > Http broadcast also works fine, I think, as there was no error msg and all > workers "EXITED" at the end. But when I changed the third argument from > "Http" to "Torrent" to use Torrent broadcast, all workers got a "KILLED" > status once they reached sc.stop(). > > Below is the stderr on one of the workers when running Torrent broadcast (I > masked the IP addresses): > > ========================================================================================== > 14/07/02 18:20:03 INFO SecurityManager: Changing view acls to: root > 14/07/02 18:20:03 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(root) > 14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started > 14/07/02 18:20:04 INFO Remoting: Starting remoting > 14/07/02 18:20:04 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37771] > 14/07/02 18:20:04 INFO Remoting: Remoting now listens on addresses: > [akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37771] > 14/07/02 18:20:04 INFO SecurityManager: Changing view acls to: root > 14/07/02 18:20:04 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(root) > 14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Shutting > down remote daemon. > 14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Remote > daemon shut down; proceeding with flushing remote transports. > 14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started > 14/07/02 18:20:04 INFO Remoting: Starting remoting > 14/07/02 18:20:04 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:53661] > 14/07/02 18:20:04 INFO Remoting: Remoting now listens on addresses: > [akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:53661] > 14/07/02 18:20:04 INFO CoarseGrainedExecutorBackend: Connecting to driver: > akka.tcp://spark@dyn-xxx-xx-xx-xx:42436/user/CoarseGrainedScheduler > 14/07/02 18:20:04 INFO WorkerWatcher: Connecting to worker > akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker > 14/07/02 18:20:04 INFO Remoting: Remoting shut down > 14/07/02 18:20:04 INFO RemoteActorRefProvider$RemotingTerminator: Remoting > shut down. > 14/07/02 18:20:04 INFO WorkerWatcher: Successfully connected to > akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker > 14/07/02 18:20:04 INFO CoarseGrainedExecutorBackend: Successfully > registered > with driver > 14/07/02 18:20:04 INFO SecurityManager: Changing view acls to: root > 14/07/02 18:20:04 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(root) > 14/07/02 18:20:04 INFO Slf4jLogger: Slf4jLogger started > 14/07/02 18:20:04 INFO Remoting: Starting remoting > 14/07/02 18:20:05 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://spark@dyn-xxx-xx-xx-xx:57883] > 14/07/02 18:20:05 INFO Remoting: Remoting now listens on addresses: > [akka.tcp://spark@dyn-xxx-xx-xx-xx:57883] > 14/07/02 18:20:05 INFO SparkEnv: Connecting to MapOutputTracker: > akka.tcp://spark@dyn-xxx-xx-xx-xx:42436/user/MapOutputTracker > 14/07/02 18:20:05 INFO SparkEnv: Connecting to BlockManagerMaster: > akka.tcp://spark@dyn-xxx-xx-xx-xx:42436/user/BlockManagerMaster > 14/07/02 18:20:05 INFO DiskBlockManager: Created local directory at > /tmp/spark-local-20140702182005-30bd > 14/07/02 18:20:05 INFO ConnectionManager: Bound socket to port 60368 with > id > = ConnectionManagerId(dyn-xxx-xx-xx-xx,60368) > 14/07/02 18:20:05 INFO MemoryStore: MemoryStore started with capacity 294.6 > MB > 14/07/02 18:20:05 INFO BlockManagerMaster: Trying to register BlockManager > 14/07/02 18:20:05 INFO BlockManagerMaster: Registered BlockManager > 14/07/02 18:20:05 INFO HttpFileServer: HTTP File server directory is > /tmp/spark-35f65442-e0e8-4122-9359-ca8232ca97a6 > 14/07/02 18:20:05 INFO HttpServer: Starting HTTP Server > 14/07/02 18:20:06 INFO CoarseGrainedExecutorBackend: Got assigned task 9 > 14/07/02 18:20:06 INFO Executor: Running task ID 9 > 14/07/02 18:20:06 INFO Executor: Fetching > http://xxx.xx.xx.xx:54292/jars/broadcast-test_2.10-1.0.jar with timestamp > 1404339601903 > 14/07/02 18:20:06 INFO Utils: Fetching > http://xxx.xx.xx.xx:54292/jars/broadcast-test_2.10-1.0.jar to > /tmp/fetchFileTemp5382215579021312284.tmp > 14/07/02 18:20:06 INFO BlockManager: Removing broadcast 0 > 14/07/02 18:20:07 INFO Executor: Adding > > file:/home/lrl/Desktop/spark-master/work/app-20140702182002-0006/3/./broadcast-test_2.10-1.0.jar > to class loader > 14/07/02 18:20:07 INFO TorrentBroadcast: Started reading broadcast variable > 1 > 14/07/02 18:20:07 INFO SendingConnection: Initiating connection to > [dyn-xxx-xx-xx-xx:60179] > 14/07/02 18:20:07 INFO SendingConnection: Connected to > [dyn-xxx-xx-xx-xx:60179], 1 messages pending > 14/07/02 18:20:07 INFO ConnectionManager: Accepted connection from > [DCTB-Host1/xxx.xx.xx.xx] > 14/07/02 18:20:07 INFO BlockManager: Found block broadcast_1_meta remotely > 14/07/02 18:20:07 INFO SendingConnection: Initiating connection to > [dyn-xxx-xx-xx-xx:55273] > 14/07/02 18:20:07 INFO SendingConnection: Connected to > [dyn-xxx-xx-xx-xx:55273], 1 messages pending > 14/07/02 18:20:07 INFO ConnectionManager: Accepted connection from > [dyn-xxx-xx-xx-xxx] > 14/07/02 18:20:07 INFO BlockManager: Found block broadcast_1_piece0 > remotely > 14/07/02 18:20:07 WARN SizeEstimator: Failed to check whether > UseCompressedOops is set; assuming yes > 14/07/02 18:20:07 INFO MemoryStore: ensureFreeSpace(4000168) called with > curMem=0, maxMem=308910489 > 14/07/02 18:20:07 INFO MemoryStore: Block broadcast_1_piece0 stored as > values in memory (estimated size 3.8 MB, free 290.8 MB) > 14/07/02 18:20:07 INFO BlockManagerMaster: Updated info of block > broadcast_1_piece0 > 14/07/02 18:20:08 INFO MemoryStore: ensureFreeSpace(4000120) called with > curMem=4000168, maxMem=308910489 > 14/07/02 18:20:08 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 3.8 MB, free 287.0 MB) > 14/07/02 18:20:08 INFO TorrentBroadcast: Reading broadcast variable 1 took > 0.909187542 s > 14/07/02 18:20:08 INFO Executor: Serialized size of result for 9 is 599 > 14/07/02 18:20:08 INFO Executor: Sending result for 9 directly to driver > 14/07/02 18:20:08 INFO Executor: Finished task ID 9 > 14/07/02 18:20:08 INFO CoarseGrainedExecutorBackend: Got assigned task 13 > 14/07/02 18:20:08 INFO Executor: Running task ID 13 > 14/07/02 18:20:08 INFO TorrentBroadcast: Started reading broadcast variable > 2 > 14/07/02 18:20:08 INFO BlockManager: Found block broadcast_2_meta remotely > 14/07/02 18:20:08 INFO BlockManager: Found block broadcast_2_piece0 > remotely > 14/07/02 18:20:08 INFO MemoryStore: ensureFreeSpace(4000168) called with > curMem=8000288, maxMem=308910489 > 14/07/02 18:20:08 INFO MemoryStore: Block broadcast_2_piece0 stored as > values in memory (estimated size 3.8 MB, free 283.2 MB) > 14/07/02 18:20:08 INFO BlockManagerMaster: Updated info of block > broadcast_2_piece0 > 14/07/02 18:20:08 INFO MemoryStore: ensureFreeSpace(4000120) called with > curMem=12000456, maxMem=308910489 > 14/07/02 18:20:08 INFO MemoryStore: Block broadcast_2 stored as values in > memory (estimated size 3.8 MB, free 279.3 MB) > 14/07/02 18:20:08 INFO TorrentBroadcast: Reading broadcast variable 2 took > 0.269456367 s > 14/07/02 18:20:08 INFO Executor: Serialized size of result for 13 is 599 > 14/07/02 18:20:08 INFO Executor: Sending result for 13 directly to driver > 14/07/02 18:20:08 INFO Executor: Finished task ID 13 > 14/07/02 18:20:09 INFO BlockManager: Removing broadcast 2 > 14/07/02 18:20:09 INFO BlockManager: Removing block broadcast_2 > 14/07/02 18:20:09 INFO MemoryStore: Block broadcast_2 of size 4000120 > dropped from memory (free 296910033) > 14/07/02 18:20:09 INFO BlockManager: Removing block broadcast_2_piece0 > 14/07/02 18:20:09 INFO MemoryStore: Block broadcast_2_piece0 of size > 4000168 > dropped from memory (free 300910201) > 14/07/02 18:20:09 INFO CoarseGrainedExecutorBackend: Driver commanded a > shutdown > 14/07/02 18:20:09 INFO RemoteActorRefProvider$RemotingTerminator: Shutting > down remote daemon. > 14/07/02 18:20:09 INFO BlockManagerMaster: Updated info of block > broadcast_2_piece0 > 14/07/02 18:20:09 INFO RemoteActorRefProvider$RemotingTerminator: Remote > daemon shut down; proceeding with flushing remote transports. > 14/07/02 18:20:09 INFO Remoting: Remoting shut down > 14/07/02 18:20:10 INFO ConnectionManager: Key not valid ? > sun.nio.ch.SelectionKeyImpl@1973a69 > 14/07/02 18:20:10 INFO ConnectionManager: Removing SendingConnection to > ConnectionManagerId(dyn-xxx-xx-xx-xx,60179) > 14/07/02 18:20:10 INFO ConnectionManager: Removing ReceivingConnection to > ConnectionManagerId(DCTB-Host1,60179) > 14/07/02 18:20:10 ERROR ConnectionManager: Corresponding > SendingConnectionManagerId not found > 14/07/02 18:20:10 INFO ConnectionManager: key already cancelled ? > sun.nio.ch.SelectionKeyImpl@1973a69 > java.nio.channels.CancelledKeyException > at > org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:363) > at > > org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:116) > > ========================================================================================== > > Also, here is the output when running http broadcast, as a comparision (IP > addresses masked): > > ========================================================================================== > 14/07/02 18:02:04 INFO SecurityManager: Changing view acls to: root > 14/07/02 18:02:04 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(root) > 14/07/02 18:02:05 INFO Slf4jLogger: Slf4jLogger started > 14/07/02 18:02:05 INFO Remoting: Starting remoting > 14/07/02 18:02:05 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37190] > 14/07/02 18:02:05 INFO Remoting: Remoting now listens on addresses: > [akka.tcp://driverPropsFetcher@dyn-xxx-xx-xx-xx:37190] > 14/07/02 18:02:05 INFO SecurityManager: Changing view acls to: root > 14/07/02 18:02:05 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(root) > 14/07/02 18:02:05 INFO RemoteActorRefProvider$RemotingTerminator: Shutting > down remote daemon. > 14/07/02 18:02:05 INFO RemoteActorRefProvider$RemotingTerminator: Remote > daemon shut down; proceeding with flushing remote transports. > 14/07/02 18:02:05 INFO Slf4jLogger: Slf4jLogger started > 14/07/02 18:02:05 INFO Remoting: Starting remoting > 14/07/02 18:02:05 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:44376] > 14/07/02 18:02:05 INFO Remoting: Remoting now listens on addresses: > [akka.tcp://sparkExecutor@dyn-xxx-xx-xx-xx:44376] > 14/07/02 18:02:05 INFO CoarseGrainedExecutorBackend: Connecting to driver: > akka.tcp://spark@dyn-xxx-xx-xx-xx:51300/user/CoarseGrainedScheduler > 14/07/02 18:02:05 INFO WorkerWatcher: Connecting to worker > akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker > 14/07/02 18:02:05 INFO Remoting: Remoting shut down > 14/07/02 18:02:05 INFO RemoteActorRefProvider$RemotingTerminator: Remoting > shut down. > 14/07/02 18:02:05 INFO WorkerWatcher: Successfully connected to > akka.tcp://sparkWorker@dyn-xxx-xx-xx-xx:32818/user/Worker > 14/07/02 18:02:05 INFO CoarseGrainedExecutorBackend: Successfully > registered > with driver > 14/07/02 18:02:05 INFO SecurityManager: Changing view acls to: root > 14/07/02 18:02:05 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(root) > 14/07/02 18:02:05 INFO Slf4jLogger: Slf4jLogger started > 14/07/02 18:02:05 INFO Remoting: Starting remoting > 14/07/02 18:02:05 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://spark@dyn-xxx-xx-xx-xx:39965] > 14/07/02 18:02:05 INFO Remoting: Remoting now listens on addresses: > [akka.tcp://spark@dyn-xxx-xx-xx-xx:39965] > 14/07/02 18:02:05 INFO SparkEnv: Connecting to MapOutputTracker: > akka.tcp://spark@dyn-xxx-xx-xx-xx:51300/user/MapOutputTracker > 14/07/02 18:02:05 INFO SparkEnv: Connecting to BlockManagerMaster: > akka.tcp://spark@dyn-xxx-xx-xx-xx:51300/user/BlockManagerMaster > 14/07/02 18:02:05 INFO DiskBlockManager: Created local directory at > /tmp/spark-local-20140702180205-113e > 14/07/02 18:02:05 INFO ConnectionManager: Bound socket to port 48270 with > id > = ConnectionManagerId(dyn-xxx-xx-xx-xx,48270) > 14/07/02 18:02:05 INFO MemoryStore: MemoryStore started with capacity 294.6 > MB > 14/07/02 18:02:05 INFO BlockManagerMaster: Trying to register BlockManager > 14/07/02 18:02:05 INFO BlockManagerMaster: Registered BlockManager > 14/07/02 18:02:05 INFO HttpFileServer: HTTP File server directory is > /tmp/spark-5c87e636-faaa-489a-b5a6-c9100cfe4dc5 > 14/07/02 18:02:05 INFO HttpServer: Starting HTTP Server > 14/07/02 18:02:06 INFO CoarseGrainedExecutorBackend: Got assigned task 11 > 14/07/02 18:02:06 INFO Executor: Running task ID 11 > 14/07/02 18:02:06 INFO Executor: Fetching > http://xxx.xx.xx.xx:43505/jars/broadcast-test_2.10-1.0.jar with timestamp > 1404338522319 > 14/07/02 18:02:06 INFO Utils: Fetching > http://xxx.xx.xx.xx:43505/jars/broadcast-test_2.10-1.0.jar to > /tmp/fetchFileTemp4777840901789178395.tmp > 14/07/02 18:02:06 INFO Executor: Adding > > file:/home/lrl/Desktop/spark-master/work/app-20140702180202-0001/3/./broadcast-test_2.10-1.0.jar > to class loader > 14/07/02 18:02:06 INFO HttpBroadcast: Started reading broadcast variable 1 > 14/07/02 18:02:06 WARN SizeEstimator: Failed to check whether > UseCompressedOops is set; assuming yes > 14/07/02 18:02:06 INFO MemoryStore: ensureFreeSpace(4000120) called with > curMem=0, maxMem=308910489 > 14/07/02 18:02:06 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 3.8 MB, free 290.8 MB) > 14/07/02 18:02:06 INFO HttpBroadcast: Reading broadcast variable 1 took > 0.121600945 s > 14/07/02 18:02:06 INFO Executor: Serialized size of result for 11 is 599 > 14/07/02 18:02:06 INFO Executor: Sending result for 11 directly to driver > 14/07/02 18:02:06 INFO Executor: Finished task ID 11 > 14/07/02 18:02:07 INFO CoarseGrainedExecutorBackend: Got assigned task 18 > 14/07/02 18:02:07 INFO Executor: Running task ID 18 > 14/07/02 18:02:07 INFO HttpBroadcast: Started reading broadcast variable 2 > 14/07/02 18:02:07 INFO MemoryStore: ensureFreeSpace(4000120) called with > curMem=4000120, maxMem=308910489 > 14/07/02 18:02:07 INFO MemoryStore: Block broadcast_2 stored as values in > memory (estimated size 3.8 MB, free 287.0 MB) > 14/07/02 18:02:07 INFO HttpBroadcast: Reading broadcast variable 2 took > 0.234879208 s > 14/07/02 18:02:07 INFO Executor: Serialized size of result for 18 is 599 > 14/07/02 18:02:07 INFO Executor: Sending result for 18 directly to driver > 14/07/02 18:02:07 INFO Executor: Finished task ID 18 > 14/07/02 18:02:07 INFO CoarseGrainedExecutorBackend: Driver commanded a > shutdown > 14/07/02 18:02:07 INFO RemoteActorRefProvider$RemotingTerminator: Shutting > down remote daemon. > 14/07/02 18:02:07 INFO RemoteActorRefProvider$RemotingTerminator: Remote > daemon shut down; proceeding with flushing remote transports. > 14/07/02 18:02:07 INFO Remoting: Remoting shut down > > ========================================================================================== > > It seems to me that the error does not happen until broadcasting is already > finished. But still I would like to make sure why would there be such an > error message occurring at sc.stop(). If it is issue with sc.stop() then > why > is it happening only with Torrent broadcast but not Http broadcast? Any > insights on this will be very much appreciated! Thanks in advance! > > Best, > Jack > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Running-the-BroadcastTest-scala-with-TorrentBroadcastFactory-in-a-standalone-cluster-tp8736.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >