On Mar 5, 2015, at 12:59 AM, Xiao <lixiao1...@gmail.com> wrote:

> Hi, James, 
> 
> This design regarding the restart point has a few potential issues, I think. 
> 
> - The restart point is based on the messages that you last published. The 
> message could be pruned. How large is your log.retention.hours?

That's a good point. In my case, I will be publishing this into a log compacted 
topic. My goal is that by reading the log compacted topic from beginning to 
end, and then continually applying new items as they come in, that it will 
always contain the final state of the database. So messages will only be pruned 
if they are being overwritten by later ones, and in that case, the "later" one 
is the one that I now care about.

> - If the Kafka message order is different from your log sequence, your 
> replication might lose the data. 

Good point. I think it depends on use case, and also if it's possible to 
"recover" lost data.

I don't think I'll be losing any messages, but it's possible that, due to 
network delays, that the items might arrive in Kafka out of order. And in my 
use case, that would be bad because "last items wins". In the log compaction 
case, I would be able to detect that because the "last" item in Kafka for a 
particular primary key would not match the state of the item in the mysql 
database. I could repair that item by re-publishing what is in the database, 
out to the stream.

I planned to have an auditor monitor the "correctness" of the Kafka topic, to 
ensure that it reflected the state of the database. I think I will now need to 
add a "correction" component, that will republish any items that are incorrect.

> First, I think you can maintain a local persistence media for recording the 
> last published message id. 

That works, but there is a small window of failure that can result in 
duplicates.

If you normally:
1) Write to Kafka
2) Write your "last published message id" somewhere.

You might crash between steps 1 and 2. When you come back up, you might end up 
re-publishing your last event.

-James

> Second, if you can add into each message a strictly increasing dense ID in 
> the producers, you can easily recover the sequences in the consumers. If so, 
> you can have multiple producers publish the messages at the same time. This 
> could improve your throughput and your consumers can easily identify if any 
> message is lost due to any reason. 
> 
> Best wishes, 
> 
> Xiao Li
> 
> 
> On Mar 4, 2015, at 4:59 PM, James Cheng <jch...@tivo.com> wrote:
> 
>> Another thing to think about is delivery guarantees. Exactly once, at least 
>> once, etc.
>> 
>> If you have a publisher that consumes from the database log and pushes out 
>> to Kafka, and then the publisher crashes, what happens when it starts back 
>> up? Depending on how you keep track of the database's transaction 
>> id/scn/offset, you may end up re-publishing events that you already 
>> published out to the kafka topic.
>> 
>> I am also working on database replication, namely from MySQL to Kafka. I'm 
>> using some of the ideas from http://ben.kirw.in/2014/11/28/kafka-patterns/ 
>> in order to get exactly once processing, so that I don't have any duplicates 
>> in my kafka stream.
>> 
>> Specifically, I have the publisher write messages to a single topic (I 
>> think/hope that Kafka's throughput is high enough). I include MySQL's binary 
>> log coordinates into my output messages. Upon startup, I read back the "end" 
>> of my topic to find out what messages I published. This gives me 2 pieces of 
>> information:
>> 1) The MySQL binary log coordinates, so I know where to start again.
>> 2) The messages that I last published, to make sure that I don't re-publish 
>> them.
>> 
>> That does mean that all data from all tables is in a single topic. I will 
>> probably have a consumer that will read that "all tables" topic, and split 
>> the data out into separate topics, for consumers who just want a subset of 
>> the data.
>> 
>> -James
>> 
>> On Mar 4, 2015, at 9:28 AM, Jonathan Hodges <hodg...@gmail.com> wrote:
>> 
>>> Yes you are right on the oplog per partition as well as that mapping well
>>> to the Kafka partitions.  I think we are making this harder than it is
>>> based on previous attempts and trying to leverage something like Databus
>>> for propagating log changes from MongoDB and Cassandra since it requires a
>>> scn.  Sounds like direct Kafka makes more sense for these use cases.
>>> Thanks again!
>>> 
>>> 
>>> On Wed, Mar 4, 2015 at 8:56 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>> 
>>>> Hey Josh,
>>>> 
>>>> NoSQL DBs may actually be easier because they themselves generally don't
>>>> have a global order. I.e. I believe Mongo has a per-partition oplog, is
>>>> that right? Their partitions would match our partitions.
>>>> 
>>>> -Jay
>>>> 
>>>> On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader <jrader...@gmail.com> wrote:
>>>> 
>>>>> Thanks everyone for your responses!  These are great.  It seems our cases
>>>>> matches closest to Jay's recommendations.
>>>>> 
>>>>> The one part that sounds a little tricky is point #5 'Include in each
>>>>> message the database's transaction id, scn, or other identifier '.  This
>>>> is
>>>>> pretty straightforward with the RDBMS case that I mentioned, but I could
>>>>> see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
>>>>> which might not always have a readily available monotonic id,
>>>> particularly
>>>>> in failover scenarios.  I guess in that case we can think about creating
>>>>> this id ourselves from the single producer.
>>>>> 
>>>>> Xiao,
>>>>> 
>>>>> I think in the Kafka failover cases you mention if we also store the
>>>> offset
>>>>> with replicated data we should be able to pick up where we left off since
>>>>> we are using the low level consumer.  Maybe I am missing your point
>>>>> though...
>>>>> 
>>>>> Guozhang,
>>>>> 
>>>>> Very good point that we didn't think of.  We will need to think this
>>>>> through, as you say avoid resending other messages in a batch if one is
>>>>> failed.  I wonder if we might also manage this on the consumer side too
>>>>> with idempotency.  Thanks for raising this!
>>>>> 
>>>>> Josh
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Mar 3, 2015 at 6:08 PM, Xiao <lixiao1...@gmail.com> wrote:
>>>>> 
>>>>>> Hey Josh,
>>>>>> 
>>>>>> Sorry, after reading codes, Kafka did fsync the data using a separate
>>>>>> thread. The recovery point (oldest transaction timestamp) can be got
>>>> from
>>>>>> the file recovery-point-offset-checkpoint.
>>>>>> 
>>>>>> You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if
>>>>> you
>>>>>> think the speed is not quick enough. When the workloads is huge, the
>>>>>> bottleneck could be in your target side or source side. That means,
>>>> your
>>>>>> apply could have enough jobs to do.
>>>>>> 
>>>>>> Basically, you need to keep reading this file for determining the
>>>> oldest
>>>>>> timestamps of all relevant partitions. Then, apply the transactions
>>>> until
>>>>>> that timestamp.
>>>>>> 
>>>>>> Note, this does not protect the transaction consistency. This is just
>>>> for
>>>>>> ensuring the data at the target side is consistent at one timestamp
>>>> when
>>>>>> you have multiple channel to send data changes. The implementation
>>>> should
>>>>>> be simple if you can understand the concepts. I am unable to find the
>>>>> filed
>>>>>> patent application about it. This is one related paper. It covers the
>>>>> main
>>>>>> concepts about the issues you are facing. "Inter-Data-Center
>>>> Large-Scale
>>>>>> Database Replication Optimization - A Workload Driven Partitioning
>>>>> Approach"
>>>>>> 
>>>>>> Hopefully, you understood what I explained above.
>>>>>> 
>>>>>> Best wishes,
>>>>>> 
>>>>>> Xiao Li
>>>>>> 
>>>>>> Best wishes,
>>>>>> 
>>>>>> Xiao Li
>>>>>> 
>>>>>> On Mar 3, 2015, at 4:23 PM, Xiao <lixiao1...@gmail.com> wrote:
>>>>>> 
>>>>>>> Hey Josh,
>>>>>>> 
>>>>>>> If you put different tables into different partitions or topics, it
>>>>>> might break transaction ACID at the target side. This is risky for some
>>>>> use
>>>>>> cases. Besides unit of work issues, you also need to think about the
>>>> load
>>>>>> balancing too.
>>>>>>> 
>>>>>>> For failover, you have to find the timestamp for point-in-time
>>>>>> consistency. This part is tricky. You have to ensure all the changes
>>>>> before
>>>>>> a specific timestamp have been flushed to the disk. Normally, you can
>>>>>> maintain a bookmark for different partition at the target side to know
>>>>> what
>>>>>> is the oldest transactions have been flushed to the disk.
>>>> Unfortunately,
>>>>>> based on my understanding, Kafka is unable to do it because it does not
>>>>> do
>>>>>> fsync regularly for achieving better throughput.
>>>>>>> 
>>>>>>> Best wishes,
>>>>>>> 
>>>>>>> Xiao Li
>>>>>>> 
>>>>>>> 
>>>>>>> On Mar 3, 2015, at 3:45 PM, Xiao <lixiao1...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> Hey Josh,
>>>>>>>> 
>>>>>>>> Transactions can be applied in parallel in the consumer side based
>>>> on
>>>>>> transaction dependency checking.
>>>>>>>> 
>>>>>>>> http://www.google.com.ar/patents/US20080163222
>>>>>>>> 
>>>>>>>> This patent documents how it work. It is easy to understand,
>>>> however,
>>>>>> you also need to consider the hash collision issues. This has been
>>>>>> implemented in IBM Q Replication since 2001.
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Xiao Li
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Mar 3, 2015, at 3:36 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>>> Hey Josh,
>>>>>>>>> 
>>>>>>>>> As you say, ordering is per partition. Technically it is generally
>>>>>> possible
>>>>>>>>> to publish all changes to a database to a single
>>>> partition--generally
>>>>>> the
>>>>>>>>> kafka partition should be high throughput enough to keep up.
>>>> However
>>>>>> there
>>>>>>>>> are a couple of downsides to this:
>>>>>>>>> 1. Consumer parallelism is limited to one. If you want a total
>>>> order
>>>>>> to the
>>>>>>>>> consumption of messages you need to have just 1 process, but often
>>>>> you
>>>>>>>>> would want to parallelize.
>>>>>>>>> 2. Often what people want is not a full stream of all changes in
>>>> all
>>>>>> tables
>>>>>>>>> in a database but rather the changes to a particular table.
>>>>>>>>> 
>>>>>>>>> To some extent the best way to do this depends on what you will do
>>>>>> with the
>>>>>>>>> data. However if you intend to have lots
>>>>>>>>> 
>>>>>>>>> I have seen pretty much every variation on this in the wild, and
>>>> here
>>>>>> is
>>>>>>>>> what I would recommend:
>>>>>>>>> 1. Have a single publisher process that publishes events into Kafka
>>>>>>>>> 2. If possible use the database log to get these changes (e.g.
>>>> mysql
>>>>>>>>> binlog, Oracle xstreams, golden gate, etc). This will be more
>>>>> complete
>>>>>> and
>>>>>>>>> more efficient than polling for changes, though that can work too.
>>>>>>>>> 3. Publish each table to its own topic.
>>>>>>>>> 4. Partition each topic by the primary key of the table.
>>>>>>>>> 5. Include in each message the database's transaction id, scn, or
>>>>> other
>>>>>>>>> identifier that gives the total order within the record stream.
>>>> Since
>>>>>> there
>>>>>>>>> is a single publisher this id will be monotonic within each
>>>>> partition.
>>>>>>>>> 
>>>>>>>>> This seems to be the best set of tradeoffs for most use cases:
>>>>>>>>> - You can have parallel consumers up to the number of partitions
>>>> you
>>>>>> chose
>>>>>>>>> that still get messages in order per ID'd entity.
>>>>>>>>> - You can subscribe to just one table if you like, or to multiple
>>>>>> tables.
>>>>>>>>> - Consumers who need a total order over all updates can do a
>>>> "merge"
>>>>>> across
>>>>>>>>> the partitions to reassemble the fully ordered set of changes
>>>> across
>>>>>> all
>>>>>>>>> tables/partitions.
>>>>>>>>> 
>>>>>>>>> One thing to note is that the requirement of having a single
>>>> consumer
>>>>>>>>> process/thread to get the total order isn't really so much a Kafka
>>>>>>>>> restriction as it just is a restriction about the world, since if
>>>> you
>>>>>> had
>>>>>>>>> multiple threads even if you delivered messages to them in order
>>>>> their
>>>>>>>>> processing might happen out of order (just do to the random timing
>>>> of
>>>>>> the
>>>>>>>>> processing).
>>>>>>>>> 
>>>>>>>>> -Jay
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader <jrader...@gmail.com>
>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Kafka Experts,
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> We have a use case around RDBMS replication where we are
>>>>> investigating
>>>>>>>>>> Kafka.  In this case ordering is very important.  Our
>>>> understanding
>>>>> is
>>>>>>>>>> ordering is only preserved within a single partition.  This makes
>>>>>> sense as
>>>>>>>>>> a single thread will consume these messages, but our question is
>>>> can
>>>>>> we
>>>>>>>>>> somehow parallelize this for better performance?   Is there maybe
>>>>> some
>>>>>>>>>> partition key strategy trick to have your cake and eat it too in
>>>>>> terms of
>>>>>>>>>> keeping ordering, but also able to parallelize the processing?
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> I am sorry if this has already been asked, but we tried to search
>>>>>> through
>>>>>>>>>> the archives and couldn't find this response.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> 
>>>>>>>>>> Josh
>>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>> 
> 

Reply via email to