Bump.  Any more thoughts on this?  I went ahead and fixed the Kafka
dependency.  Didn't help the issue.

On Thu, Nov 20, 2014 at 3:11 PM, Elliott Bradshaw <[email protected]>
wrote:

> Also, I'm using the following versions:
>
> Kafka_2.9.2 - 0.8.1.1 (I'm using this as a compile time dependency.
> Although I'm just noticing that we're running kafka_2.10 on the server.
> Hopefully that isn't the problem......)
> Storm - 0.9.2-incubating
> Zookeeper - 3.4.5
> Storm-Kafka - 0.9.2-incubating
>
> On Thu, Nov 20, 2014 at 2:54 PM, Elliott Bradshaw <[email protected]>
> wrote:
>
>> I implemented the Elasticsearch State code myself.  As far as I can tell,
>> it is logically similar to the other Trident-Elasticsearch packages out
>> there.
>>
>> I'm doubt that the implementation is the source of my problem though.
>> Even if I set the updateState function in the BaseStateUpdater to simply
>> log the number of tuples it's called with, I still see the same behavior.
>> The function is called only early in the topology.  Afterwards, I continue
>> seeing data flowing out of the spout and through the cluster, but the
>> updateState function is not called again.  It's pretty odd.
>>
>> My Elasticsearch cluster is pretty beefy, 20 servers with 8cpu/64 gb of
>> ram a piece.  Prior to Kafka/Storm, we did some pretty heavy duty
>> Elasticsearch loads on a smaller cluster, and were able to achieve load
>> rates of 300k rec/second, so I'm fairly familiar with that side of the
>> tuning.  That said, the files were all loaded directly onto the cluster
>> then, and were read in parallel directly from the Elasticsearch disks.
>> That will likely be difficult to compete with.
>>
>> My Kafka/Storm cluster on the other hand is only 5 servers with 4 CPU/8GB
>> ram a piece.  Right now I'm just doing preliminary testing with a static
>> input, roughly 500,000 records are ingested from files on the file system
>> into Kafka.  Records are pushed at a rate of approximately 40,000 records
>> per second (due to bandwidth limitations).
>>
>> My topology runs with 4 workers, and the initial processing runs with a
>> parallelismHint(8).  I then do a shuffle followed by a partitionPersist
>> with a parallelismHint of 16.  (My understanding is that my initial
>> parallelism should match the number of Kafka partitions in my topic.  I
>> would like to hit Elasticsearch with additional parallelism.).  I've added
>> some logging and here is what I'm seeing on loading 500000 records:
>>
>> - There are indeed 16 total ElasticsearchStateUpdater classes executing.
>> Using counters on these, I've validated that the total number of tuples
>> counted on an initial load of 500,000 records is indeed 500,000.  So that's
>> a good sign in this case.
>> - After the initial load, I let the cluster sit still for a while while I
>> monitor the logs.  Tuples continue to roll into the cluster at an irregular
>> rate, even though no more data has been ingested into Kafka.  It seems like
>> the Trident spout continues to emit data even though it has read and
>> "persisted" every tuple in the log.  After a few minutes, it looks like my
>> cluster has processed over 1 million records when only 500k had been sent
>> initially.  Total updates remain static at 500k.
>> - If I keep the cluster running and kick off another load into Kafka,
>> data does begin to flow into the cluster again but the state updates are
>> not incremented.  No additional data appears to be "persisted". This is in
>> line with the behavior I've seen previously, where when doing a much bigger
>> load, the StateUpdaters mysteriously stop working.
>> - Nodes begin to crash intermittently.  I see errors like the
>> FileNotFoudnException '/supervisor/stormdist/..../stormconf.ser' does not
>> exist.  Other nodes report intermittent errors like "Remote address is not
>> reachable.  We will close this client".  The cluster does not appear to
>> fully recover.  (This does not always happen).
>> - Additionally, additional loads show records trickling or flowing in at
>> a very inconsistent and halting rate.
>>
>> - On a final test run with a restarted cluster and another 500,000 record
>> load, the updateState stops firing after loading approximately 115880
>> records.
>>
>> I'm sure that something is either configured wrong or there is an
>> incompatibility somewhere, but I'm at a loss for where it might be.
>> Hopefully this wasn't TL;DR.  Any input would be greatly appreciated!
>>
>> On Wed, Nov 19, 2014 at 2:54 PM, P. Taylor Goetz <[email protected]>
>> wrote:
>>
>>> What are you using for your partitionPersist to ES? Is it something you
>>> implemented yourself, or an open source library?
>>>
>>> With Kafka —> Storm —> Elastic Search, ES is likely going to be your
>>> bottleneck, since indexing is comparatively expensive. So you will likely
>>> have to spend a fair amount of effort tuning ES and Storm/Trident.
>>>
>>> I have accomplished this with solid throughput and reliability, but it
>>> took a lot of work to get ES tuned. Chances are your ES cluster will have
>>> to be larger than your storm cluster.
>>>
>>> Any additional information you could add about your environment and use
>>> case would help.
>>>
>>> -Taylor
>>>
>>> On Nov 19, 2014, at 2:40 PM, Elliott Bradshaw <[email protected]>
>>> wrote:
>>>
>>> I feel like there have to be people out there doing State updates with a
>>> Trident-Kafka topology, has anyone successfully accomplished this with
>>> solid throughput and reliability?
>>>
>>> On Tue, Nov 18, 2014 at 2:30 PM, Elliott Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> My apologies if I wasn't clear.
>>>>
>>>> PartitionPersist is a Trident stream operation that persists a batch of
>>>> Trident tuples to a stateful destination, in this case, Elasticsearch.
>>>> UpdateState is a function in the BaseStateUpdater class that should be
>>>> called when a batch of tuples arrives.
>>>>
>>>> On Tue, Nov 18, 2014 at 1:26 PM, Itai Frenkel <[email protected]> wrote:
>>>>
>>>>>  Could you please elaborate what is the relation between
>>>>> "updateState" and "partitionPersist"? Are those two consecutive topology
>>>>> bolts ?
>>>>>
>>>>>
>>>>>  ------------------------------
>>>>> *From:* Elliott Bradshaw <[email protected]>
>>>>> *Sent:* Tuesday, November 18, 2014 5:25 PM
>>>>> *To:* [email protected]
>>>>> *Subject:* Fwd: Issues with State updates in
>>>>> Kafka-Trident-Elasticsearch topology
>>>>>
>>>>>
>>>>>   Hi All,
>>>>>
>>>>> I'm currently attempting to get a topology running for data into
>>>>> Elasticsearch.  Tuples go through some minimal marshalling and
>>>>> preprocessing before being sent to partitionPersist, where they are
>>>>> transformed into JSON and indexed in Elasticsearch.
>>>>>
>>>>>  The cluster appears to work properly in local mode, but when deployed
>>>>> to my 4 node cluster, state updates do not seem to fire correctly
>>>>> (sometimes they don't fire at all).  Tuple counter filters show data
>>>>> flowing through the topology at a healthy rate (approx 80,000 rec/second),
>>>>> however, the updateState function only rarely appears to be called.  After
>>>>> a brief period of time, no further calls to updateState are seen.
>>>>>
>>>>>  As a test, I wrote a filter that queues up tuples and batch sends
>>>>> them to Elasticsearch once a certain threshold is reached.  This works
>>>>> perfectly fine and is capable of managing the processing load.
>>>>>
>>>>> I've seen discussion of this behavior before, but have not managed to
>>>>> find an explanation or solution.  Has anybody else had similar issues or
>>>>> have a solution?
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Reply via email to