Hi Spark Users,

I am testing my application on Spark 1.5 and kinesis-asl-1.5. The streaming
application starts but I see a ton of stages scheduled for
ReceiverTracker (submitJob
at ReceiverTracker.scala:557 <http://xxx>).

In the driver logs I see this sequence repeat:
15/10/09 00:10:54 INFO INFO ReceiverTracker: Starting 100 receivers
15/10/09 00:10:54 INFO ReceiverTracker: ReceiverTracker started

15/10/09 00:10:54 INFO ReceiverTracker: Receiver 0 started
15/10/09 00:10:54 DEBUG ClosureCleaner: +++ Cleaning closure <function1>
(org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9)
+++
15/10/09 00:10:54 DEBUG ClosureCleaner:  + declared fields: 3
15/10/09 00:10:54 DEBUG ClosureCleaner:      public static final long
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.serialVersionUID
15/10/09 00:10:54 DEBUG ClosureCleaner:      private final scala.Option
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.checkpointDirOption$1
15/10/09 00:10:54 DEBUG ClosureCleaner:      private final
org.apache.spark.util.SerializableConfiguration
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.serializableHadoopConf$1
15/10/09 00:10:54 DEBUG ClosureCleaner:  + declared methods: 2
15/10/09 00:10:54 DEBUG ClosureCleaner:      public final java.lang.Object
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(java.lang.Object)
15/10/09 00:10:54 DEBUG ClosureCleaner:      public final void
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(scala.collection.Iterator)
15/10/09 00:10:54 DEBUG ClosureCleaner:  + inner classes: 0
15/10/09 00:10:54 DEBUG ClosureCleaner:  + outer classes: 0
15/10/09 00:10:54 DEBUG ClosureCleaner:  + outer objects: 0
15/10/09 00:10:54 DEBUG ClosureCleaner:  + populating accessed fields
because this is the starting closure
15/10/09 00:10:54 DEBUG ClosureCleaner:  + fields accessed by starting
closure: 0
15/10/09 00:10:54 DEBUG ClosureCleaner:  + there are no enclosing objects!
15/10/09 00:10:54 DEBUG ClosureCleaner:  +++ closure <function1>
(org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9)
is now cleaned +++

...
(and so on for 100 receivers)

And then I start seeing ...
15/10/09 00:11:02 INFO ReceiverTracker: Restarting Receiver 36
.. and so on for the other receivers

After which the I see Receiver started logs
15/10/09 00:11:02 INFO ReceiverTracker: Receiver 20 started
..
Again the Restarting Receiver logs appear

After a while the driver hangs, no new logs appear although the app seems
to be running. The streaming console shows scheduled stages and jobs.

There are no ERROR logs in the driver. However I see the following
Exceptions (DEBUG logs)

akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@ip-<xxx>:57886
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
] from Actor[akka://sparkDriver/deadLetters]
15/10/09 00:10:37 DEBUG
AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received
message AssociationError [akka.tcp://sparkDriver@<xxx>:39053] <-
[akka.tcp://driverPropsFetcher@<xxx>:57886]: Error [Shut down address:
akka.tcp://driverPropsFetcher@<xxx>:57886] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<xxx>:57886
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
] from Actor[akka://sparkDriver/deadLetters]

In one of the executor logs I see the following Exceptions:

application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:37 WARN receiver.ReceiverSupervisorImpl: Skip stopping receiver
because it has not yet stared
application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:37 INFO receiver.BlockGenerator: Stopping BlockGenerator
application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:37 INFO receiver.BlockGenerator: Waiting for block pushing thread to
terminate
application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:37 INFO receiver.BlockGenerator: Pushing out the last 0 blocks
application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:37 INFO receiver.BlockGenerator: Stopped block pushing thread
application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:37 INFO receiver.BlockGenerator: Stopped BlockGenerator
application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:37 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be
stopped
application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:37 INFO receiver.ReceiverSupervisorImpl: Stopped receiver without
error
application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:38 INFO receiver.BlockGenerator: Started BlockGenerator
application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:38 INFO receiver.BlockGenerator: Started block pushing thread
application_1444344955519_0001/container_1444344955519_0001_01_000005/stderr:15/10/09
00:45:38 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with
message: Registered unsuccessfully because Driver refused to start receiver
46:


There is no data in the kinesis stream from where the app is reading. The
number of shards is 100. And the app starts 100 receivers.

Has anyone else seen this behavior? Any ideas on how I can debug the
problem and find out the root cause and fix would be very helpful.

Thanks,

Bharath

Reply via email to