Why wouldn't the storm approach provide semantics of exactly once delivery? https://github.com/nathanmarz/storm
Nathan actually credits the Kafka_devs for the basic idea of transaction persisting in one of his talks. Regards Milind On Nov 3, 2012 11:51 AM, "Rohit Prasad" <rohit.prasa...@gmail.com> wrote: > I agree that this approach only prevents duplicate messages to partition > from the Producer side. There needs to be a similar approach on the > consumer side too. Using Zk can be one solution, or other non-ZK > approaches. > > Even if Consumer reads none or all messages of a transaction. But that does > not solve the transaction problem yet. Because the business/application > logic inside the Consumer thread may execute partially and fail. So it > becomes tricky to decide the point when you want to say that you have > "consumed" the message and increase consumption offset. If your consumer > thread is saving some value into DB/HDFS/etc, ideally you want this save > operation and consumption offset to be incremented atomically. Thats why it > boils down to Application logic implementing transactions and dealing with > duplicates. > Maybe a journalling or redo log approach on Consumer side can help build > such a system. > > It will be nice if eventually kafka can be a transport which provides > "exactly once" semantics for message delivery. Then consumer threads can be > sure that they receive messages once, and can build appln logic on top of > that. > > I have a use case similar to what Jay mentioned in a previous mail. I want > to do aggregation but want the aggregated data to be correct, possible > avoiding duplicates incase of failures/crashes. > > > > On Fri, Nov 2, 2012 at 4:12 PM, Tom Brown <tombrow...@gmail.com> wrote: > > > That approach allows a producer to prevent duplicate messages to the > > partition, but what about the consumer? In my case, I don't want the > > consumer to be able to read any of the messages unless it can read all > > of the messages from a transaction. > > > > I also like the idea of there being multiple types of Kafka > > transaction, though, just to accommodate different performance, > > reliability, and consumption patterns. Of course, the added complexity > > of that might just sink the whole thing. > > > > --Tom > > > > On Fri, Nov 2, 2012 at 4:11 PM, Rohit Prasad <rohit.prasa...@gmail.com> > > wrote: > > > Getting transactional support is quite hard problem. There will always > be > > > corner cases where the solution will not work, unless you want to go > down > > > the path of 2PC, paxos, etc which ofcourse will degrade kafka's > > > performance. It is best to reconcile data and deal with duplicate > > messages > > > in Application layer. Having said that it would be amazing if we can > > build > > > "at most once" semantics in Kafka!! > > > > > > Regarding above approaches, > > > The producer will always have a doubt if its commit went through. i.e. > if > > > the ack for "commit" is not received by the producer. Or If producer > dies > > > immediately after calling the commit. When it is restarted how does it > > know > > > if last operation went through? > > > > > > I suggest the following - > > > 1. Producer should attach a timestamp at the beginning of each message > > and > > > send it to Server. > > > 2. On restarts/timeouts/re-connections, the producer should first read > > the > > > last committed message from the leader of the partition. > > > 3. From timestamp, it can know how many messages went through before it > > > died (or connection was broken). And it can infer how many messages to > > > replay. > > > > > > The above approach can be used with existing Kafka libraries since you > > can > > > have a producer and consumer thread together in an application to > > implement > > > this logic. Or someone can take the initiative to write a Transactional > > > producer (which internally has both producer and a consumer to read > last > > > committed message.) I will be developing one for kafka 0.8 in c++. > > > > > > The above approach will work even if you batch messages for a single > > > partition. > > > The above approach will work only if a single producer is writing to a > > > partition. I want hear opinions about the above approach. I sure there > > can > > > be corner-cases where it may break. > > > > > > If there are multiple producers to a partition, then some book keeping > on > > > server side with regards to last msg committed from a "co-relation id" > > (to > > > identify unique producer) may be needed. > > > > > > > > > Regards, > > > Rohit > > > > > > > > > On Sun, Oct 28, 2012 at 10:31 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > >> If you use Kafka just as a redo log, you can't undo anything that's > > written > > >> to the log. Write-ahead logs in typical database systems are both redo > > and > > >> undo logs. Transaction commits and rollbacks are implemented on top of > > the > > >> logs. However, general-purpose write-ahead logs for transactions are > > much > > >> more complicated. > > >> > > >> Thanks, > > >> > > >> Jun > > >> > > >> On Fri, Oct 26, 2012 at 11:08 AM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > >> > > >> > This is an important feature and I am interested in helping out in > the > > >> > design and implementation, though I am working on 0.8 features for > the > > >> next > > >> > month so I may not be of too much use. I have thought a little bit > > about > > >> > this, but I am not yet sure of the best approach. > > >> > > > >> > Here is a specific use case I think is important to address: > consider > > a > > >> > case where you are doing processing of one or more streams and > > producing > > >> an > > >> > output stream. This processing may involve some kind of local state > > (say > > >> > counters or other local aggregation intermediate state). This is a > > common > > >> > scenario. The problem is to give reasonable semantics to this > > computation > > >> > in the presence of failures. The processor effectively has a > > >> > position/offset in each of its input streams as well as whatever > local > > >> > state. The problem is that if this process fails it needs to restore > > to a > > >> > state that matches the last produced messages. There are several > > >> solutions > > >> > to this problem. One is to make the output somehow idempotent, this > > will > > >> > solve some cases but is not a general solution as many things cannot > > be > > >> > made idempotent easily. > > >> > > > >> > I think the two proposals you give outline a couple of basic > > approaches: > > >> > 1. Store the messages on the server somewhere but don't add them to > > the > > >> log > > >> > until the commit call > > >> > 2. Store the messages in the log but don't make them available to > the > > >> > consumer until the commit call > > >> > Another option you didn't mention: > > >> > > > >> > I can give several subtleties to these approaches. > > >> > > > >> > One advantage of the second approach is that messages are in the log > > and > > >> > can be available for reading or not. This makes it possible to > > support a > > >> > kind of "dirty read" that allows the consumer to specify whether > they > > >> want > > >> > to immediately see all messages with low latency but potentially see > > >> > uncommitted messages or only see committed messages. > > >> > > > >> > The problem with the second approach at least in the way you > describe > > it > > >> is > > >> > that you have to lock the log until the commit occurs otherwise you > > can't > > >> > roll back (because otherwise someone else may have appended their > own > > >> > messages and you can't truncate the log). This would have all the > > >> problems > > >> > of remote locks. I think this might be a deal-breaker. > > >> > > > >> > Another variation on the second approach would be the following: > have > > >> each > > >> > producer maintain an id and generation number. Keep a schedule of > > valid > > >> > offset/id/generation numbers on the broker and only hand these out. > > This > > >> > solution would support non-blocking multi-writer appends but > requires > > >> more > > >> > participation from the producer (i.e. getting a generation number > and > > >> id). > > >> > > > >> > Cheers, > > >> > > > >> > -Jay > > >> > > > >> > On Thu, Oct 25, 2012 at 7:04 PM, Tom Brown <tombrow...@gmail.com> > > wrote: > > >> > > > >> > > I have come up with two different possibilities, both with > different > > >> > > trade-offs. > > >> > > > > >> > > The first would be to support "true" transactions by writing > > >> > > transactional data into a temporary file and then copy it directly > > to > > >> > > the end of the partition when the commit command is created. The > > >> > > upside to this approach is that individual transactions can be > > larger > > >> > > than a single batch, and more than one producer could conduct > > >> > > transactions at once. The downside is the extra IO involved in > > writing > > >> > > it and reading it from disk an extra time. > > >> > > > > >> > > The second would be to allow any number of messages to be appended > > to > > >> > > a topic, but not move the "end of topic" offset until the commit > was > > >> > > received. If a rollback was received, or the producer timed out, > the > > >> > > partition could be truncated at the most recently recognized "end > of > > >> > > topic" offset. The upside is that there is very little extra IO > > (only > > >> > > to store the official "end of topic" metadata), and it seems like > it > > >> > > should be easy to implement. The downside is that this the > > >> > > "transaction" feature is incompatible with anything but a single > > >> > > producer per partition. > > >> > > > > >> > > I am interested in your thoughts on these. > > >> > > > > >> > > --Tom > > >> > > > > >> > > On Thu, Oct 25, 2012 at 9:31 PM, Philip O'Toole < > phi...@loggly.com> > > >> > wrote: > > >> > > > On Thu, Oct 25, 2012 at 06:19:04PM -0700, Neha Narkhede wrote: > > >> > > >> The closest concept of transaction on the publisher side, that > I > > can > > >> > > >> think of, is using batch of messages in a single call to the > > >> > > >> synchronous producer. > > >> > > >> > > >> > > >> Precisely, you can configure a Kafka producer to use the "sync" > > mode > > >> > > >> and batch messages that require transactional guarantees in a > > >> > > >> single send() call. That will ensure that either all the > > messages in > > >> > > >> the batch are sent or none. > > >> > > > > > >> > > > This is an interesting feature -- something I wasn't aware of. > > Still > > >> it > > >> > > > doesn't solve the problem *completely*. As many people realise, > > it's > > >> > > still > > >> > > > possible for the batch of messages to get into Kafka fine, but > the > > >> ack > > >> > > from > > >> > > > Kafka to be lost on its way back to the Producer. In that case > the > > >> > > Producer > > >> > > > erroneously believes the messages didn't get in, and might > re-send > > >> > them. > > >> > > > > > >> > > > You guys *haven't* solved that issue, right? I believe you write > > >> about > > >> > > it on > > >> > > > the Kafka site. > > >> > > > > > >> > > >> > > >> > > >> Thanks, > > >> > > >> Neha > > >> > > >> > > >> > > >> On Thu, Oct 25, 2012 at 2:44 PM, Tom Brown < > tombrow...@gmail.com > > > > > >> > > wrote: > > >> > > >> > Is there an accepted, or recommended way to make writes to a > > Kafka > > >> > > >> > queue idempotent, or within a transaction? > > >> > > >> > > > >> > > >> > I can configure my system such that each queue has exactly > one > > >> > > producer. > > >> > > >> > > > >> > > >> > (If there are no accepted/recommended ways, I have a few > ideas > > I > > >> > would > > >> > > >> > like to propose. I would also be willing to implement them if > > >> > needed) > > >> > > >> > > > >> > > >> > Thanks in advance! > > >> > > >> > > > >> > > >> > --Tom > > >> > > > > > >> > > > -- > > >> > > > Philip O'Toole > > >> > > > > > >> > > > Senior Developer > > >> > > > Loggly, Inc. > > >> > > > San Francisco, Calif. > > >> > > > www.loggly.com > > >> > > > > > >> > > > Come join us! > > >> > > > http://loggly.com/company/careers/ > > >> > > > > >> > > > >> > > >