Hi - thanks for the responses. You are right that I started by copying the word-counting example. I assumed that this would help spread the load evenly across the cluster, with each worker receiving a portion of the stream data - corresponding to one shard's worth - and then keeping the data local until something invoked a shuffle. I did check that the data isn't skewed in Kinesis - it seems to be pretty well randomly distributed across the shards in the stream. In reducing the code to this example, I hoped to show that a simple calculation would be parallelized. But it doesn't seem to be! Every RDD count is run on the same executor, as far as I can see. It's such a small example that the simplest explanation to me is that I misunderstood something :-( I didn't mention that I am running with "--master yarn", though as far as I know, nothing has changed from the yarn defaults Cloudera provides.
Graham On Fri, Jan 27, 2017 at 4:48 AM, Takeshi Yamamuro <linguin....@gmail.com> wrote: > Probably, he referred to the word-couting example in kinesis here: > https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/ > scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L114 > > > On Fri, Jan 27, 2017 at 6:41 PM, ayan guha <guha.a...@gmail.com> wrote: > >> Maybe a naive question: why are you creating 1 Dstream per shard? It >> should be one Dstream corresponding to kinesis stream, isn't it? >> >> On Fri, Jan 27, 2017 at 8:09 PM, Takeshi Yamamuro <linguin....@gmail.com> >> wrote: >> >>> Hi, >>> >>> Just a guess though, Kinesis shards sometimes have skew data. >>> So, before you compute something from kinesis RDDs, you'd be better to >>> repartition them >>> for better parallelism. >>> >>> // maropu >>> >>> On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark <grcl...@gmail.com> wrote: >>> >>>> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera) >>>> to read information from Kinesis and write it to HDFS in parquet format. >>>> The write seems very slow, and if I understood Spark's diagnostics >>>> correctly, always seemed to run from the same executor, one partition after >>>> the other, serially. So I stripped the program down to this: >>>> >>>> >>>> val kinesisStreams = (0 until numShards).map { i => { >>>> >>>> KinesisUtils.createStream(streamingContext, sparkApplicationName, >>>> >>>> kinesisStreamName, kinesisUrl, awsRegion, >>>> InitialPositionInStream.LATEST) >>>> >>>> new Duration(streamingInterval.millis), >>>> StorageLevel.MEMORY_AND_DISK_SER, >>>> >>>> awsCredentials.accessKey, awsCredentials.secretKey) >>>> >>>> }} >>>> >>>> val allKinesisStreams = streamingContext.union(kinesisStreams) >>>> >>>> allKinesisStreams.foreachRDD { >>>> >>>> rdd => { >>>> >>>> info("total for this batch is " + rdd.count()) >>>> >>>> } >>>> } >>>> >>>> The Kinesis stream has 20 shards (overprovisioned for this small test). >>>> I confirmed using a small boto program that data is periodically written to >>>> all 20 of the shards. I can see that Spark has created 20 executors, one >>>> for each Kinesis shard. It also creates one other executor, tied to a >>>> particular worker node, and that node seems to do the RDD counting. The >>>> streaming interval is 1 minute, during which time several shards have >>>> received data. Each minute interval, for this particular example, the >>>> driver prints out between 20 and 30 for the count value. I expected to see >>>> the count operation parallelized across the cluster. I think I must just be >>>> misunderstanding something fundamental! Can anyone point out where I'm >>>> going wrong? >>>> >>>> Yours in confusion, >>>> Graham >>>> >>>> >>> >>> >>> -- >>> --- >>> Takeshi Yamamuro >>> >> >> >> >> -- >> Best Regards, >> Ayan Guha >> > > > > -- > --- > Takeshi Yamamuro >