Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera) to
read information from Kinesis and write it to HDFS in parquet format. The
write seems very slow, and if I understood Spark's diagnostics correctly,
always seemed to run from the same executor, one partition after the other,
serially. So I stripped the program down to this:


val kinesisStreams = (0 until numShards).map { i => {

  KinesisUtils.createStream(streamingContext, sparkApplicationName,

    kinesisStreamName, kinesisUrl, awsRegion,
InitialPositionInStream.LATEST)

    new Duration(streamingInterval.millis),
StorageLevel.MEMORY_AND_DISK_SER,

    awsCredentials.accessKey, awsCredentials.secretKey)

}}

val allKinesisStreams = streamingContext.union(kinesisStreams)

allKinesisStreams.foreachRDD {

   rdd => {

      info("total for this batch is " + rdd.count())

   }
}

The Kinesis stream has 20 shards (overprovisioned for this small test). I
confirmed using a small boto program that data is periodically written to
all 20 of the shards. I can see that Spark has created 20 executors, one
for each Kinesis shard. It also creates one other executor, tied to a
particular worker node, and that node seems to do the RDD counting. The
streaming interval is 1 minute, during which time several shards have
received data. Each minute interval, for this particular example, the
driver prints out between 20 and 30 for the count value. I expected to see
the count operation parallelized across the cluster. I think I must just be
misunderstanding something fundamental! Can anyone point out where I'm
going wrong?

Yours in confusion,
Graham

Reply via email to