Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera) to read information from Kinesis and write it to HDFS in parquet format. The write seems very slow, and if I understood Spark's diagnostics correctly, always seemed to run from the same executor, one partition after the other, serially. So I stripped the program down to this:
val kinesisStreams = (0 until numShards).map { i => { KinesisUtils.createStream(streamingContext, sparkApplicationName, kinesisStreamName, kinesisUrl, awsRegion, InitialPositionInStream.LATEST) new Duration(streamingInterval.millis), StorageLevel.MEMORY_AND_DISK_SER, awsCredentials.accessKey, awsCredentials.secretKey) }} val allKinesisStreams = streamingContext.union(kinesisStreams) allKinesisStreams.foreachRDD { rdd => { info("total for this batch is " + rdd.count()) } } The Kinesis stream has 20 shards (overprovisioned for this small test). I confirmed using a small boto program that data is periodically written to all 20 of the shards. I can see that Spark has created 20 executors, one for each Kinesis shard. It also creates one other executor, tied to a particular worker node, and that node seems to do the RDD counting. The streaming interval is 1 minute, during which time several shards have received data. Each minute interval, for this particular example, the driver prints out between 20 and 30 for the count value. I expected to see the count operation parallelized across the cluster. I think I must just be misunderstanding something fundamental! Can anyone point out where I'm going wrong? Yours in confusion, Graham