I implemented the Elasticsearch State code myself. As far as I can tell, it is logically similar to the other Trident-Elasticsearch packages out there.
I'm doubt that the implementation is the source of my problem though. Even if I set the updateState function in the BaseStateUpdater to simply log the number of tuples it's called with, I still see the same behavior. The function is called only early in the topology. Afterwards, I continue seeing data flowing out of the spout and through the cluster, but the updateState function is not called again. It's pretty odd. My Elasticsearch cluster is pretty beefy, 20 servers with 8cpu/64 gb of ram a piece. Prior to Kafka/Storm, we did some pretty heavy duty Elasticsearch loads on a smaller cluster, and were able to achieve load rates of 300k rec/second, so I'm fairly familiar with that side of the tuning. That said, the files were all loaded directly onto the cluster then, and were read in parallel directly from the Elasticsearch disks. That will likely be difficult to compete with. My Kafka/Storm cluster on the other hand is only 5 servers with 4 CPU/8GB ram a piece. Right now I'm just doing preliminary testing with a static input, roughly 500,000 records are ingested from files on the file system into Kafka. Records are pushed at a rate of approximately 40,000 records per second (due to bandwidth limitations). My topology runs with 4 workers, and the initial processing runs with a parallelismHint(8). I then do a shuffle followed by a partitionPersist with a parallelismHint of 16. (My understanding is that my initial parallelism should match the number of Kafka partitions in my topic. I would like to hit Elasticsearch with additional parallelism.). I've added some logging and here is what I'm seeing on loading 500000 records: - There are indeed 16 total ElasticsearchStateUpdater classes executing. Using counters on these, I've validated that the total number of tuples counted on an initial load of 500,000 records is indeed 500,000. So that's a good sign in this case. - After the initial load, I let the cluster sit still for a while while I monitor the logs. Tuples continue to roll into the cluster at an irregular rate, even though no more data has been ingested into Kafka. It seems like the Trident spout continues to emit data even though it has read and "persisted" every tuple in the log. After a few minutes, it looks like my cluster has processed over 1 million records when only 500k had been sent initially. Total updates remain static at 500k. - If I keep the cluster running and kick off another load into Kafka, data does begin to flow into the cluster again but the state updates are not incremented. No additional data appears to be "persisted". This is in line with the behavior I've seen previously, where when doing a much bigger load, the StateUpdaters mysteriously stop working. - Nodes begin to crash intermittently. I see errors like the FileNotFoudnException '/supervisor/stormdist/..../stormconf.ser' does not exist. Other nodes report intermittent errors like "Remote address is not reachable. We will close this client". The cluster does not appear to fully recover. (This does not always happen). - Additionally, additional loads show records trickling or flowing in at a very inconsistent and halting rate. - On a final test run with a restarted cluster and another 500,000 record load, the updateState stops firing after loading approximately 115880 records. I'm sure that something is either configured wrong or there is an incompatibility somewhere, but I'm at a loss for where it might be. Hopefully this wasn't TL;DR. Any input would be greatly appreciated! On Wed, Nov 19, 2014 at 2:54 PM, P. Taylor Goetz <[email protected]> wrote: > What are you using for your partitionPersist to ES? Is it something you > implemented yourself, or an open source library? > > With Kafka —> Storm —> Elastic Search, ES is likely going to be your > bottleneck, since indexing is comparatively expensive. So you will likely > have to spend a fair amount of effort tuning ES and Storm/Trident. > > I have accomplished this with solid throughput and reliability, but it > took a lot of work to get ES tuned. Chances are your ES cluster will have > to be larger than your storm cluster. > > Any additional information you could add about your environment and use > case would help. > > -Taylor > > On Nov 19, 2014, at 2:40 PM, Elliott Bradshaw <[email protected]> > wrote: > > I feel like there have to be people out there doing State updates with a > Trident-Kafka topology, has anyone successfully accomplished this with > solid throughput and reliability? > > On Tue, Nov 18, 2014 at 2:30 PM, Elliott Bradshaw <[email protected]> > wrote: > >> My apologies if I wasn't clear. >> >> PartitionPersist is a Trident stream operation that persists a batch of >> Trident tuples to a stateful destination, in this case, Elasticsearch. >> UpdateState is a function in the BaseStateUpdater class that should be >> called when a batch of tuples arrives. >> >> On Tue, Nov 18, 2014 at 1:26 PM, Itai Frenkel <[email protected]> wrote: >> >>> Could you please elaborate what is the relation between "updateState" >>> and "partitionPersist"? Are those two consecutive topology bolts ? >>> >>> >>> ------------------------------ >>> *From:* Elliott Bradshaw <[email protected]> >>> *Sent:* Tuesday, November 18, 2014 5:25 PM >>> *To:* [email protected] >>> *Subject:* Fwd: Issues with State updates in >>> Kafka-Trident-Elasticsearch topology >>> >>> >>> Hi All, >>> >>> I'm currently attempting to get a topology running for data into >>> Elasticsearch. Tuples go through some minimal marshalling and >>> preprocessing before being sent to partitionPersist, where they are >>> transformed into JSON and indexed in Elasticsearch. >>> >>> The cluster appears to work properly in local mode, but when deployed >>> to my 4 node cluster, state updates do not seem to fire correctly >>> (sometimes they don't fire at all). Tuple counter filters show data >>> flowing through the topology at a healthy rate (approx 80,000 rec/second), >>> however, the updateState function only rarely appears to be called. After >>> a brief period of time, no further calls to updateState are seen. >>> >>> As a test, I wrote a filter that queues up tuples and batch sends them >>> to Elasticsearch once a certain threshold is reached. This works perfectly >>> fine and is capable of managing the processing load. >>> >>> I've seen discussion of this behavior before, but have not managed to >>> find an explanation or solution. Has anybody else had similar issues or >>> have a solution? >>> >>> >> > >
