Hi - thanks for the responses. You are right that I started by copying the
word-counting example. I assumed that this would help spread the load
evenly across the cluster, with each worker receiving a portion of the
stream data - corresponding to one shard's worth - and then keeping the
data local until something invoked a shuffle. I did check that the data
isn't skewed in Kinesis - it seems to be pretty well randomly distributed
across the shards in the stream. In reducing the code to this example, I
hoped to show that a simple calculation would be parallelized. But it
doesn't seem to be! Every RDD count is run on the same executor, as far as
I can see. It's such a small example that the simplest explanation to me is
that I misunderstood something :-( I didn't mention that I am running with
"--master yarn", though as far as I know, nothing has changed from the yarn
defaults Cloudera provides.

Graham






On Fri, Jan 27, 2017 at 4:48 AM, Takeshi Yamamuro <linguin....@gmail.com>
wrote:

> Probably, he referred to the word-couting example in kinesis here:
> https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/
> scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L114
>
>
> On Fri, Jan 27, 2017 at 6:41 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Maybe a naive question: why are you creating 1 Dstream per shard? It
>> should be one Dstream corresponding to kinesis stream, isn't it?
>>
>> On Fri, Jan 27, 2017 at 8:09 PM, Takeshi Yamamuro <linguin....@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Just a guess though, Kinesis shards sometimes have skew data.
>>> So, before you compute something from kinesis RDDs, you'd be better to
>>> repartition them
>>> for better parallelism.
>>>
>>> // maropu
>>>
>>> On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark <grcl...@gmail.com> wrote:
>>>
>>>> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera)
>>>> to read information from Kinesis and write it to HDFS in parquet format.
>>>> The write seems very slow, and if I understood Spark's diagnostics
>>>> correctly, always seemed to run from the same executor, one partition after
>>>> the other, serially. So I stripped the program down to this:
>>>>
>>>>
>>>> val kinesisStreams = (0 until numShards).map { i => {
>>>>
>>>>   KinesisUtils.createStream(streamingContext, sparkApplicationName,
>>>>
>>>>     kinesisStreamName, kinesisUrl, awsRegion,
>>>> InitialPositionInStream.LATEST)
>>>>
>>>>     new Duration(streamingInterval.millis),
>>>> StorageLevel.MEMORY_AND_DISK_SER,
>>>>
>>>>     awsCredentials.accessKey, awsCredentials.secretKey)
>>>>
>>>> }}
>>>>
>>>> val allKinesisStreams = streamingContext.union(kinesisStreams)
>>>>
>>>> allKinesisStreams.foreachRDD {
>>>>
>>>>    rdd => {
>>>>
>>>>       info("total for this batch is " + rdd.count())
>>>>
>>>>    }
>>>> }
>>>>
>>>> The Kinesis stream has 20 shards (overprovisioned for this small test).
>>>> I confirmed using a small boto program that data is periodically written to
>>>> all 20 of the shards. I can see that Spark has created 20 executors, one
>>>> for each Kinesis shard. It also creates one other executor, tied to a
>>>> particular worker node, and that node seems to do the RDD counting. The
>>>> streaming interval is 1 minute, during which time several shards have
>>>> received data. Each minute interval, for this particular example, the
>>>> driver prints out between 20 and 30 for the count value. I expected to see
>>>> the count operation parallelized across the cluster. I think I must just be
>>>> misunderstanding something fundamental! Can anyone point out where I'm
>>>> going wrong?
>>>>
>>>> Yours in confusion,
>>>> Graham
>>>>
>>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>

Reply via email to