Thanks John for the details on storm design. I am planning below approach for our design. Keep the messages in redis and increment the message count. Have a key for total count from the last message. This message can come at any time since ordering is not guaranteed. Each time you receive a message, check wether total count is matched with receiving message count. If matched push the messages to database. Since we have 1 million row updates in a single transaction , planning to compress the message dto while putting to cache. When counts are matched , get the list of dtos and take sublists and insert to database . Call commit after the last sublist is processed. Do you see any issues with this approach?
On Thu, Apr 14, 2016 at 8:28 PM, John Bush <[email protected]> wrote: > We follow a pattern like this, to batch them in memory: > http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/ > > Biggest gotcha I just ran into was the collector isn't thread safe, so > make sure you don't ever ack/fail in another thread, I can tell you how I > got around that if you have that problem, just read my blog: > https://scalalala.wordpress.com/2016/04/09/async-stormy-weather/ > > The biggest table we've copied doing this was about 10 million rows, with > 100 or so columns. It works great. I actually am pulling from 3 different > data sources this way, sql server, postgresql, and mongo, and then feed > into cassandra and elasticsearch. > > We use a thread safe counter, AtomicCounter in guava, there is also > AtomicInteger in core Java, not really sure the difference. We have a UI > that gives visibility into where things are with the loading, so now we > flush the counts out to Cassandra as well to serve the UI, but really > completion is detected using the AtomicCounter. Experiment with fetch size > on the jdbc read and batch size on the write, probably depends on your > specific environment. The memory overhead is super low for us, because we > keep our batch size on writes fairly small, since Cassandra is async on the > writes anyway, if you are going into a RDMS system that would be different. > > On Thu, Apr 14, 2016 at 6:09 PM, pradeep s <[email protected]> > wrote: > >> Thanks Jon. Ours is exactly similar usecase .We thought of using redis >> cache as the storage layer.but since we can get bulk updates from the >> source ,cache value size can become pretty large.For getting the total >> count of messages in a transaction we are keeping a counter against Tran >> id key and this is sent as part of the last message in the transaction. >> There is a islast flag available from goldengate.But this can come in any >> order in storm.So we are using the count which is coming in the last >> message from goldengate. >> In database bolt we can continue to hold the messages in a store till we >> get the total count. >> Main challenge is maintaining and updating the count in multithreaded >> bolt processing and storing sometimes a GB worth of message in case of bulk >> updates. Any suggestions? >> On Apr 14, 2016 9:22 AM, "John Bush" <[email protected]> wrote: >> >>> We do something kinda similar. I think you will need another store to >>> keep track of these, not sure about storm's distributed cache, we use >>> cassandra, but you could use zookeeper, or some other store. The >>> issue is since rows come out of storm in no guaranteed order, you >>> really don't know when you are done. You need to know when you are >>> complete in order to do remove the messages from your source (or >>> otherwise update stuff over there). >>> >>> So what we do is keep track of how many rows are on the read side (in >>> some store). Then as we process we update on the write side, how many >>> rows we wrote. Then by checking this count, how many we updated vs >>> how many we expected in total, we know when we are done. It sounds >>> like you situation might be more complicated than ours if you talking >>> about rows from many different tables all inside same transaction, but >>> in any event some type of pattern like this should work. >>> >>> For perspective, we essential ETL data out of 100's of tables like >>> this into Cassandra, and it works quite well. You just need to be >>> super careful with the completion logic there are many edge cases to >>> consider. >>> >>> On Thu, Apr 14, 2016 at 9:00 AM, Nikos R. Katsipoulakis >>> <[email protected]> wrote: >>> > Hello Sreekumar, >>> > >>> > Have you thought of using Storm's distributed cache? If not, that >>> might a >>> > way to cache messages before you push them to the target DB. Another >>> way to >>> > do so, is if you can create your own Bolt to periodically push >>> messages in >>> > the database. >>> > >>> > I hope I helped. >>> > >>> > Cheers, >>> > Nikos >>> > >>> > On Thu, Apr 14, 2016 at 12:54 AM, pradeep s < >>> [email protected]> >>> > wrote: >>> >> >>> >> Hi, >>> >> We are using Storm for processing CDC messages from Oracle Golden >>> Gate . >>> >> Pipeline is as below >>> >> Oracle GoldenGate-->Queue-->Storm-->Relational DB >>> >> We have a requirement to hold the messages for a transaction Id till >>> all >>> >> the messages for that transaction is available in Storm. There can be >>> >> scenarios like 1 million updates happening in onme transaction source >>> oracle >>> >> system. >>> >> Can you please suggest a best approach for holding the messages and >>> then >>> >> pushing to target db only when all messages for tran id is available >>> in >>> >> storm. >>> >> >>> >> Regards >>> >> Pradeep S >>> > >>> > >>> > >>> > >>> > -- >>> > Nikos R. Katsipoulakis, >>> > Department of Computer Science >>> > University of Pittsburgh >>> >>> >>> >>> -- >>> >>> John Bush >>> Trax Technologies, Inc. >>> M: 480-227-2910 >>> TraxTech.Com >>> >>> -- >>> CONFIDENTIALITY NOTICE: The preceding and/or attached information may be >>> confidential or privileged. It should be used or disseminated solely for >>> the purpose of conducting business with Trax. If you are not an intended >>> recipient, please notify the sender by replying to this message and then >>> delete the information from your system. Thank you for your cooperation. >>> >> >
