Also, I'm using the following versions: Kafka_2.9.2 - 0.8.1.1 (I'm using this as a compile time dependency. Although I'm just noticing that we're running kafka_2.10 on the server. Hopefully that isn't the problem......) Storm - 0.9.2-incubating Zookeeper - 3.4.5 Storm-Kafka - 0.9.2-incubating
On Thu, Nov 20, 2014 at 2:54 PM, Elliott Bradshaw <[email protected]> wrote: > I implemented the Elasticsearch State code myself. As far as I can tell, > it is logically similar to the other Trident-Elasticsearch packages out > there. > > I'm doubt that the implementation is the source of my problem though. > Even if I set the updateState function in the BaseStateUpdater to simply > log the number of tuples it's called with, I still see the same behavior. > The function is called only early in the topology. Afterwards, I continue > seeing data flowing out of the spout and through the cluster, but the > updateState function is not called again. It's pretty odd. > > My Elasticsearch cluster is pretty beefy, 20 servers with 8cpu/64 gb of > ram a piece. Prior to Kafka/Storm, we did some pretty heavy duty > Elasticsearch loads on a smaller cluster, and were able to achieve load > rates of 300k rec/second, so I'm fairly familiar with that side of the > tuning. That said, the files were all loaded directly onto the cluster > then, and were read in parallel directly from the Elasticsearch disks. > That will likely be difficult to compete with. > > My Kafka/Storm cluster on the other hand is only 5 servers with 4 CPU/8GB > ram a piece. Right now I'm just doing preliminary testing with a static > input, roughly 500,000 records are ingested from files on the file system > into Kafka. Records are pushed at a rate of approximately 40,000 records > per second (due to bandwidth limitations). > > My topology runs with 4 workers, and the initial processing runs with a > parallelismHint(8). I then do a shuffle followed by a partitionPersist > with a parallelismHint of 16. (My understanding is that my initial > parallelism should match the number of Kafka partitions in my topic. I > would like to hit Elasticsearch with additional parallelism.). I've added > some logging and here is what I'm seeing on loading 500000 records: > > - There are indeed 16 total ElasticsearchStateUpdater classes executing. > Using counters on these, I've validated that the total number of tuples > counted on an initial load of 500,000 records is indeed 500,000. So that's > a good sign in this case. > - After the initial load, I let the cluster sit still for a while while I > monitor the logs. Tuples continue to roll into the cluster at an irregular > rate, even though no more data has been ingested into Kafka. It seems like > the Trident spout continues to emit data even though it has read and > "persisted" every tuple in the log. After a few minutes, it looks like my > cluster has processed over 1 million records when only 500k had been sent > initially. Total updates remain static at 500k. > - If I keep the cluster running and kick off another load into Kafka, data > does begin to flow into the cluster again but the state updates are not > incremented. No additional data appears to be "persisted". This is in line > with the behavior I've seen previously, where when doing a much bigger > load, the StateUpdaters mysteriously stop working. > - Nodes begin to crash intermittently. I see errors like the > FileNotFoudnException '/supervisor/stormdist/..../stormconf.ser' does not > exist. Other nodes report intermittent errors like "Remote address is not > reachable. We will close this client". The cluster does not appear to > fully recover. (This does not always happen). > - Additionally, additional loads show records trickling or flowing in at a > very inconsistent and halting rate. > > - On a final test run with a restarted cluster and another 500,000 record > load, the updateState stops firing after loading approximately 115880 > records. > > I'm sure that something is either configured wrong or there is an > incompatibility somewhere, but I'm at a loss for where it might be. > Hopefully this wasn't TL;DR. Any input would be greatly appreciated! > > On Wed, Nov 19, 2014 at 2:54 PM, P. Taylor Goetz <[email protected]> > wrote: > >> What are you using for your partitionPersist to ES? Is it something you >> implemented yourself, or an open source library? >> >> With Kafka —> Storm —> Elastic Search, ES is likely going to be your >> bottleneck, since indexing is comparatively expensive. So you will likely >> have to spend a fair amount of effort tuning ES and Storm/Trident. >> >> I have accomplished this with solid throughput and reliability, but it >> took a lot of work to get ES tuned. Chances are your ES cluster will have >> to be larger than your storm cluster. >> >> Any additional information you could add about your environment and use >> case would help. >> >> -Taylor >> >> On Nov 19, 2014, at 2:40 PM, Elliott Bradshaw <[email protected]> >> wrote: >> >> I feel like there have to be people out there doing State updates with a >> Trident-Kafka topology, has anyone successfully accomplished this with >> solid throughput and reliability? >> >> On Tue, Nov 18, 2014 at 2:30 PM, Elliott Bradshaw <[email protected]> >> wrote: >> >>> My apologies if I wasn't clear. >>> >>> PartitionPersist is a Trident stream operation that persists a batch of >>> Trident tuples to a stateful destination, in this case, Elasticsearch. >>> UpdateState is a function in the BaseStateUpdater class that should be >>> called when a batch of tuples arrives. >>> >>> On Tue, Nov 18, 2014 at 1:26 PM, Itai Frenkel <[email protected]> wrote: >>> >>>> Could you please elaborate what is the relation between "updateState" >>>> and "partitionPersist"? Are those two consecutive topology bolts ? >>>> >>>> >>>> ------------------------------ >>>> *From:* Elliott Bradshaw <[email protected]> >>>> *Sent:* Tuesday, November 18, 2014 5:25 PM >>>> *To:* [email protected] >>>> *Subject:* Fwd: Issues with State updates in >>>> Kafka-Trident-Elasticsearch topology >>>> >>>> >>>> Hi All, >>>> >>>> I'm currently attempting to get a topology running for data into >>>> Elasticsearch. Tuples go through some minimal marshalling and >>>> preprocessing before being sent to partitionPersist, where they are >>>> transformed into JSON and indexed in Elasticsearch. >>>> >>>> The cluster appears to work properly in local mode, but when deployed >>>> to my 4 node cluster, state updates do not seem to fire correctly >>>> (sometimes they don't fire at all). Tuple counter filters show data >>>> flowing through the topology at a healthy rate (approx 80,000 rec/second), >>>> however, the updateState function only rarely appears to be called. After >>>> a brief period of time, no further calls to updateState are seen. >>>> >>>> As a test, I wrote a filter that queues up tuples and batch sends them >>>> to Elasticsearch once a certain threshold is reached. This works perfectly >>>> fine and is capable of managing the processing load. >>>> >>>> I've seen discussion of this behavior before, but have not managed to >>>> find an explanation or solution. Has anybody else had similar issues or >>>> have a solution? >>>> >>>> >>> >> >> >
