Also, I'm using the following versions:

Kafka_2.9.2 - 0.8.1.1 (I'm using this as a compile time dependency.
Although I'm just noticing that we're running kafka_2.10 on the server.
Hopefully that isn't the problem......)
Storm - 0.9.2-incubating
Zookeeper - 3.4.5
Storm-Kafka - 0.9.2-incubating

On Thu, Nov 20, 2014 at 2:54 PM, Elliott Bradshaw <[email protected]>
wrote:

> I implemented the Elasticsearch State code myself.  As far as I can tell,
> it is logically similar to the other Trident-Elasticsearch packages out
> there.
>
> I'm doubt that the implementation is the source of my problem though.
> Even if I set the updateState function in the BaseStateUpdater to simply
> log the number of tuples it's called with, I still see the same behavior.
> The function is called only early in the topology.  Afterwards, I continue
> seeing data flowing out of the spout and through the cluster, but the
> updateState function is not called again.  It's pretty odd.
>
> My Elasticsearch cluster is pretty beefy, 20 servers with 8cpu/64 gb of
> ram a piece.  Prior to Kafka/Storm, we did some pretty heavy duty
> Elasticsearch loads on a smaller cluster, and were able to achieve load
> rates of 300k rec/second, so I'm fairly familiar with that side of the
> tuning.  That said, the files were all loaded directly onto the cluster
> then, and were read in parallel directly from the Elasticsearch disks.
> That will likely be difficult to compete with.
>
> My Kafka/Storm cluster on the other hand is only 5 servers with 4 CPU/8GB
> ram a piece.  Right now I'm just doing preliminary testing with a static
> input, roughly 500,000 records are ingested from files on the file system
> into Kafka.  Records are pushed at a rate of approximately 40,000 records
> per second (due to bandwidth limitations).
>
> My topology runs with 4 workers, and the initial processing runs with a
> parallelismHint(8).  I then do a shuffle followed by a partitionPersist
> with a parallelismHint of 16.  (My understanding is that my initial
> parallelism should match the number of Kafka partitions in my topic.  I
> would like to hit Elasticsearch with additional parallelism.).  I've added
> some logging and here is what I'm seeing on loading 500000 records:
>
> - There are indeed 16 total ElasticsearchStateUpdater classes executing.
> Using counters on these, I've validated that the total number of tuples
> counted on an initial load of 500,000 records is indeed 500,000.  So that's
> a good sign in this case.
> - After the initial load, I let the cluster sit still for a while while I
> monitor the logs.  Tuples continue to roll into the cluster at an irregular
> rate, even though no more data has been ingested into Kafka.  It seems like
> the Trident spout continues to emit data even though it has read and
> "persisted" every tuple in the log.  After a few minutes, it looks like my
> cluster has processed over 1 million records when only 500k had been sent
> initially.  Total updates remain static at 500k.
> - If I keep the cluster running and kick off another load into Kafka, data
> does begin to flow into the cluster again but the state updates are not
> incremented.  No additional data appears to be "persisted". This is in line
> with the behavior I've seen previously, where when doing a much bigger
> load, the StateUpdaters mysteriously stop working.
> - Nodes begin to crash intermittently.  I see errors like the
> FileNotFoudnException '/supervisor/stormdist/..../stormconf.ser' does not
> exist.  Other nodes report intermittent errors like "Remote address is not
> reachable.  We will close this client".  The cluster does not appear to
> fully recover.  (This does not always happen).
> - Additionally, additional loads show records trickling or flowing in at a
> very inconsistent and halting rate.
>
> - On a final test run with a restarted cluster and another 500,000 record
> load, the updateState stops firing after loading approximately 115880
> records.
>
> I'm sure that something is either configured wrong or there is an
> incompatibility somewhere, but I'm at a loss for where it might be.
> Hopefully this wasn't TL;DR.  Any input would be greatly appreciated!
>
> On Wed, Nov 19, 2014 at 2:54 PM, P. Taylor Goetz <[email protected]>
> wrote:
>
>> What are you using for your partitionPersist to ES? Is it something you
>> implemented yourself, or an open source library?
>>
>> With Kafka —> Storm —> Elastic Search, ES is likely going to be your
>> bottleneck, since indexing is comparatively expensive. So you will likely
>> have to spend a fair amount of effort tuning ES and Storm/Trident.
>>
>> I have accomplished this with solid throughput and reliability, but it
>> took a lot of work to get ES tuned. Chances are your ES cluster will have
>> to be larger than your storm cluster.
>>
>> Any additional information you could add about your environment and use
>> case would help.
>>
>> -Taylor
>>
>> On Nov 19, 2014, at 2:40 PM, Elliott Bradshaw <[email protected]>
>> wrote:
>>
>> I feel like there have to be people out there doing State updates with a
>> Trident-Kafka topology, has anyone successfully accomplished this with
>> solid throughput and reliability?
>>
>> On Tue, Nov 18, 2014 at 2:30 PM, Elliott Bradshaw <[email protected]>
>> wrote:
>>
>>> My apologies if I wasn't clear.
>>>
>>> PartitionPersist is a Trident stream operation that persists a batch of
>>> Trident tuples to a stateful destination, in this case, Elasticsearch.
>>> UpdateState is a function in the BaseStateUpdater class that should be
>>> called when a batch of tuples arrives.
>>>
>>> On Tue, Nov 18, 2014 at 1:26 PM, Itai Frenkel <[email protected]> wrote:
>>>
>>>>  Could you please elaborate what is the relation between "updateState"
>>>> and "partitionPersist"? Are those two consecutive topology bolts ?
>>>>
>>>>
>>>>  ------------------------------
>>>> *From:* Elliott Bradshaw <[email protected]>
>>>> *Sent:* Tuesday, November 18, 2014 5:25 PM
>>>> *To:* [email protected]
>>>> *Subject:* Fwd: Issues with State updates in
>>>> Kafka-Trident-Elasticsearch topology
>>>>
>>>>
>>>>   Hi All,
>>>>
>>>> I'm currently attempting to get a topology running for data into
>>>> Elasticsearch.  Tuples go through some minimal marshalling and
>>>> preprocessing before being sent to partitionPersist, where they are
>>>> transformed into JSON and indexed in Elasticsearch.
>>>>
>>>>  The cluster appears to work properly in local mode, but when deployed
>>>> to my 4 node cluster, state updates do not seem to fire correctly
>>>> (sometimes they don't fire at all).  Tuple counter filters show data
>>>> flowing through the topology at a healthy rate (approx 80,000 rec/second),
>>>> however, the updateState function only rarely appears to be called.  After
>>>> a brief period of time, no further calls to updateState are seen.
>>>>
>>>>  As a test, I wrote a filter that queues up tuples and batch sends them
>>>> to Elasticsearch once a certain threshold is reached.  This works perfectly
>>>> fine and is capable of managing the processing load.
>>>>
>>>> I've seen discussion of this behavior before, but have not managed to
>>>> find an explanation or solution.  Has anybody else had similar issues or
>>>> have a solution?
>>>>
>>>>
>>>
>>
>>
>

Reply via email to