I implemented the Elasticsearch State code myself.  As far as I can tell,
it is logically similar to the other Trident-Elasticsearch packages out
there.

I'm doubt that the implementation is the source of my problem though.  Even
if I set the updateState function in the BaseStateUpdater to simply log the
number of tuples it's called with, I still see the same behavior.  The
function is called only early in the topology.  Afterwards, I continue
seeing data flowing out of the spout and through the cluster, but the
updateState function is not called again.  It's pretty odd.

My Elasticsearch cluster is pretty beefy, 20 servers with 8cpu/64 gb of ram
a piece.  Prior to Kafka/Storm, we did some pretty heavy duty Elasticsearch
loads on a smaller cluster, and were able to achieve load rates of 300k
rec/second, so I'm fairly familiar with that side of the tuning.  That
said, the files were all loaded directly onto the cluster then, and were
read in parallel directly from the Elasticsearch disks.  That will likely
be difficult to compete with.

My Kafka/Storm cluster on the other hand is only 5 servers with 4 CPU/8GB
ram a piece.  Right now I'm just doing preliminary testing with a static
input, roughly 500,000 records are ingested from files on the file system
into Kafka.  Records are pushed at a rate of approximately 40,000 records
per second (due to bandwidth limitations).

My topology runs with 4 workers, and the initial processing runs with a
parallelismHint(8).  I then do a shuffle followed by a partitionPersist
with a parallelismHint of 16.  (My understanding is that my initial
parallelism should match the number of Kafka partitions in my topic.  I
would like to hit Elasticsearch with additional parallelism.).  I've added
some logging and here is what I'm seeing on loading 500000 records:

- There are indeed 16 total ElasticsearchStateUpdater classes executing.
Using counters on these, I've validated that the total number of tuples
counted on an initial load of 500,000 records is indeed 500,000.  So that's
a good sign in this case.
- After the initial load, I let the cluster sit still for a while while I
monitor the logs.  Tuples continue to roll into the cluster at an irregular
rate, even though no more data has been ingested into Kafka.  It seems like
the Trident spout continues to emit data even though it has read and
"persisted" every tuple in the log.  After a few minutes, it looks like my
cluster has processed over 1 million records when only 500k had been sent
initially.  Total updates remain static at 500k.
- If I keep the cluster running and kick off another load into Kafka, data
does begin to flow into the cluster again but the state updates are not
incremented.  No additional data appears to be "persisted". This is in line
with the behavior I've seen previously, where when doing a much bigger
load, the StateUpdaters mysteriously stop working.
- Nodes begin to crash intermittently.  I see errors like the
FileNotFoudnException '/supervisor/stormdist/..../stormconf.ser' does not
exist.  Other nodes report intermittent errors like "Remote address is not
reachable.  We will close this client".  The cluster does not appear to
fully recover.  (This does not always happen).
- Additionally, additional loads show records trickling or flowing in at a
very inconsistent and halting rate.

- On a final test run with a restarted cluster and another 500,000 record
load, the updateState stops firing after loading approximately 115880
records.

I'm sure that something is either configured wrong or there is an
incompatibility somewhere, but I'm at a loss for where it might be.
Hopefully this wasn't TL;DR.  Any input would be greatly appreciated!

On Wed, Nov 19, 2014 at 2:54 PM, P. Taylor Goetz <[email protected]> wrote:

> What are you using for your partitionPersist to ES? Is it something you
> implemented yourself, or an open source library?
>
> With Kafka —> Storm —> Elastic Search, ES is likely going to be your
> bottleneck, since indexing is comparatively expensive. So you will likely
> have to spend a fair amount of effort tuning ES and Storm/Trident.
>
> I have accomplished this with solid throughput and reliability, but it
> took a lot of work to get ES tuned. Chances are your ES cluster will have
> to be larger than your storm cluster.
>
> Any additional information you could add about your environment and use
> case would help.
>
> -Taylor
>
> On Nov 19, 2014, at 2:40 PM, Elliott Bradshaw <[email protected]>
> wrote:
>
> I feel like there have to be people out there doing State updates with a
> Trident-Kafka topology, has anyone successfully accomplished this with
> solid throughput and reliability?
>
> On Tue, Nov 18, 2014 at 2:30 PM, Elliott Bradshaw <[email protected]>
> wrote:
>
>> My apologies if I wasn't clear.
>>
>> PartitionPersist is a Trident stream operation that persists a batch of
>> Trident tuples to a stateful destination, in this case, Elasticsearch.
>> UpdateState is a function in the BaseStateUpdater class that should be
>> called when a batch of tuples arrives.
>>
>> On Tue, Nov 18, 2014 at 1:26 PM, Itai Frenkel <[email protected]> wrote:
>>
>>>  Could you please elaborate what is the relation between "updateState"
>>> and "partitionPersist"? Are those two consecutive topology bolts ?
>>>
>>>
>>>  ------------------------------
>>> *From:* Elliott Bradshaw <[email protected]>
>>> *Sent:* Tuesday, November 18, 2014 5:25 PM
>>> *To:* [email protected]
>>> *Subject:* Fwd: Issues with State updates in
>>> Kafka-Trident-Elasticsearch topology
>>>
>>>
>>>   Hi All,
>>>
>>> I'm currently attempting to get a topology running for data into
>>> Elasticsearch.  Tuples go through some minimal marshalling and
>>> preprocessing before being sent to partitionPersist, where they are
>>> transformed into JSON and indexed in Elasticsearch.
>>>
>>>  The cluster appears to work properly in local mode, but when deployed
>>> to my 4 node cluster, state updates do not seem to fire correctly
>>> (sometimes they don't fire at all).  Tuple counter filters show data
>>> flowing through the topology at a healthy rate (approx 80,000 rec/second),
>>> however, the updateState function only rarely appears to be called.  After
>>> a brief period of time, no further calls to updateState are seen.
>>>
>>>  As a test, I wrote a filter that queues up tuples and batch sends them
>>> to Elasticsearch once a certain threshold is reached.  This works perfectly
>>> fine and is capable of managing the processing load.
>>>
>>> I've seen discussion of this behavior before, but have not managed to
>>> find an explanation or solution.  Has anybody else had similar issues or
>>> have a solution?
>>>
>>>
>>
>
>

Reply via email to