Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-02-01 Thread Alexey Ozeritsky


24.01.2017, 22:03, "Dong Lin" :
> Hey Alexey,
>
> Thanks. I think we agreed that the suggested solution doesn't work in
> general for kafka users. To answer your questions:
>
> 1. I agree we need quota to rate limit replica movement when a broker is
> moving a "leader" replica. I will come up with solution, probably re-use
> the config of replication quota introduced in KIP-73.
>
> 2. Good point. I agree that this is a problem in general. If is no new data
> on that broker, with current default value of replica.fetch.wait.max.ms
> and replica.fetch.max.bytes, the replica will be moved at only 2 MBps
> throughput. I think the solution is for broker to set
> replica.fetch.wait.max.ms to 0 in its FetchRequest if the corresponding
> ReplicaFetcherThread needs to move some replica to another disk.
>
> 3. I have updated the KIP to mention that the read size of a given
> partition is configured using replica.fetch.max.bytes when we move replicas
> between disks.
>
> Please see this
> <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=67638408&selectedPageVersions=4&selectedPageVersions=5>
> for the change of the KIP. I will come up with a solution to throttle
> replica movement when a broker is moving a "leader" replica.

Thanks. It looks great. 

>
> On Tue, Jan 24, 2017 at 3:30 AM, Alexey Ozeritsky 
> wrote:
>
>>  23.01.2017, 22:11, "Dong Lin" :
>>  > Thanks. Please see my comment inline.
>>  >
>>  > On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky 
>>  > wrote:
>>  >
>>  >> 13.01.2017, 22:29, "Dong Lin" :
>>  >> > Hey Alexey,
>>  >> >
>>  >> > Thanks for your review and the alternative approach. Here is my
>>  >> > understanding of your patch. kafka's background threads are used to
>>  move
>>  >> > data between replicas. When data movement is triggered, the log will
>>  be
>>  >> > rolled and the new logs will be put in the new directory, and
>>  background
>>  >> > threads will move segment from old directory to new directory.
>>  >> >
>>  >> > It is important to note that KIP-112 is intended to work with
>>  KIP-113 to
>>  >> > support JBOD. I think your solution is definitely simpler and better
>>  >> under
>>  >> > the current kafka implementation that a broker will fail if any disk
>>  >> fails.
>>  >> > But I am not sure if we want to allow broker to run with partial
>>  disks
>>  >> > failure. Let's say the a replica is being moved from log_dir_old to
>>  >> > log_dir_new and then log_dir_old stops working due to disk failure.
>>  How
>>  >> > would your existing patch handles it? To make the scenario a bit more
>>  >>
>>  >> We will lose log_dir_old. After broker restart we can read the data
>>  from
>>  >> log_dir_new.
>>  >
>>  > No, you probably can't. This is because the broker doesn't have *all* the
>>  > data for this partition. For example, say the broker has
>>  > partition_segement_1, partition_segment_50 and partition_segment_100 on
>>  the
>>  > log_dir_old. partition_segment_100, which has the latest data, has been
>>  > moved to log_dir_new, and the log_dir_old fails before
>>  partition_segment_50
>>  > and partition_segment_1 is moved to log_dir_new. When broker re-starts,
>>  it
>>  > won't have partition_segment_50. This causes problem if broker is elected
>>  > leader and consumer wants to consume data in the partition_segment_1.
>>
>>  Right.
>>
>>  >
>>  >> > complicated, let's say the broker is shtudown, log_dir_old's disk
>>  fails,
>>  >> > and the broker starts. In this case broker doesn't even know if
>>  >> log_dir_new
>>  >> > has all the data needed for this replica. It becomes a problem if the
>>  >> > broker is elected leader of this partition in this case.
>>  >>
>>  >> log_dir_new contains the most recent data so we will lose the tail of
>>  >> partition.
>>  >> This is not a big problem for us because we already delete tails by
>>  hand
>>  >> (see https://issues.apache.org/jira/browse/KAFKA-1712).
>>  >> Also we dont use authomatic leader balancing
>>  (auto.leader.rebalance.enable=false),
>>  >> so this partition becomes the leader with a low probability.
>>  >

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-24 Thread Alexey Ozeritsky


23.01.2017, 22:11, "Dong Lin" :
> Thanks. Please see my comment inline.
>
> On Mon, Jan 23, 2017 at 6:45 AM, Alexey Ozeritsky 
> wrote:
>
>>  13.01.2017, 22:29, "Dong Lin" :
>>  > Hey Alexey,
>>  >
>>  > Thanks for your review and the alternative approach. Here is my
>>  > understanding of your patch. kafka's background threads are used to move
>>  > data between replicas. When data movement is triggered, the log will be
>>  > rolled and the new logs will be put in the new directory, and background
>>  > threads will move segment from old directory to new directory.
>>  >
>>  > It is important to note that KIP-112 is intended to work with KIP-113 to
>>  > support JBOD. I think your solution is definitely simpler and better
>>  under
>>  > the current kafka implementation that a broker will fail if any disk
>>  fails.
>>  > But I am not sure if we want to allow broker to run with partial disks
>>  > failure. Let's say the a replica is being moved from log_dir_old to
>>  > log_dir_new and then log_dir_old stops working due to disk failure. How
>>  > would your existing patch handles it? To make the scenario a bit more
>>
>>  We will lose log_dir_old. After broker restart we can read the data from
>>  log_dir_new.
>
> No, you probably can't. This is because the broker doesn't have *all* the
> data for this partition. For example, say the broker has
> partition_segement_1, partition_segment_50 and partition_segment_100 on the
> log_dir_old. partition_segment_100, which has the latest data, has been
> moved to log_dir_new, and the log_dir_old fails before partition_segment_50
> and partition_segment_1 is moved to log_dir_new. When broker re-starts, it
> won't have partition_segment_50. This causes problem if broker is elected
> leader and consumer wants to consume data in the partition_segment_1.

Right.

>
>>  > complicated, let's say the broker is shtudown, log_dir_old's disk fails,
>>  > and the broker starts. In this case broker doesn't even know if
>>  log_dir_new
>>  > has all the data needed for this replica. It becomes a problem if the
>>  > broker is elected leader of this partition in this case.
>>
>>  log_dir_new contains the most recent data so we will lose the tail of
>>  partition.
>>  This is not a big problem for us because we already delete tails by hand
>>  (see https://issues.apache.org/jira/browse/KAFKA-1712).
>>  Also we dont use authomatic leader balancing 
>> (auto.leader.rebalance.enable=false),
>>  so this partition becomes the leader with a low probability.
>>  I think my patch can be modified to prohibit the selection of the leader
>>  until the partition does not move completely.
>
> I guess you are saying that you have deleted the tails by hand in your own
> kafka branch. But KAFKA-1712 is not accepted into Kafka trunk and I am not

No. We just modify segments mtime by cron job. This works with vanilla kafka.

> sure if it is the right solution. How would this solution address the
> problem mentioned above?

If you need only fresh data and if you remove old data by hands this is not a 
problem. But in general case 
this is a problem of course.

>
> BTW, I am not sure the solution mentioned in KAFKA-1712 is the right way to
> address its problem. Now that we have timestamp in the message we can use
> that to delete old segement instead of relying on the log segment mtime.
> Just some idea and we don't have to discuss this problem here.
>
>>  >
>>  > The solution presented in the KIP attempts to handle it by replacing
>>  > replica in an atomic version fashion after the log in the new dir has
>>  fully
>>  > caught up with the log in the old dir. At at time the log can be
>>  considered
>>  > to exist on only one log directory.
>>
>>  As I understand your solution does not cover quotas.
>>  What happens if someone starts to transfer 100 partitions ?
>
> Good point. Quota can be implemented in the future. It is currently
> mentioned as as a potential future improvement in KIP-112
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%3A+Handle+disk+failure+for+JBOD>.Thanks
> for the reminder. I will move it to KIP-113.
>
>>  > If yes, it will read a ByteBufferMessageSet from topicPartition.log and
>>  append the message set to topicPartition.move
>>
>>  i.e. processPartitionData will read data from the beginning of
>>  topicPartition.log? What is the read size?
>>  A ReplicaFetchThread reads many partitions so if one does some co

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-23 Thread Alexey Ozeritsky


13.01.2017, 22:29, "Dong Lin" :
> Hey Alexey,
>
> Thanks for your review and the alternative approach. Here is my
> understanding of your patch. kafka's background threads are used to move
> data between replicas. When data movement is triggered, the log will be
> rolled and the new logs will be put in the new directory, and background
> threads will move segment from old directory to new directory.
>
> It is important to note that KIP-112 is intended to work with KIP-113 to
> support JBOD. I think your solution is definitely simpler and better under
> the current kafka implementation that a broker will fail if any disk fails.
> But I am not sure if we want to allow broker to run with partial disks
> failure. Let's say the a replica is being moved from log_dir_old to
> log_dir_new and then log_dir_old stops working due to disk failure. How
> would your existing patch handles it? To make the scenario a bit more

We will lose log_dir_old. After broker restart we can read the data from 
log_dir_new.

> complicated, let's say the broker is shtudown, log_dir_old's disk fails,
> and the broker starts. In this case broker doesn't even know if log_dir_new
> has all the data needed for this replica. It becomes a problem if the
> broker is elected leader of this partition in this case.

log_dir_new contains the most recent data so we will lose the tail of partition.
This is not a big problem for us because we already delete tails by hand (see 
https://issues.apache.org/jira/browse/KAFKA-1712).
Also we dont use authomatic leader balancing 
(auto.leader.rebalance.enable=false), so this partition becomes the leader with 
a low probability.
I think my patch can be modified to prohibit the selection of the leader until 
the partition does not move completely.

>
> The solution presented in the KIP attempts to handle it by replacing
> replica in an atomic version fashion after the log in the new dir has fully
> caught up with the log in the old dir. At at time the log can be considered
> to exist on only one log directory.

As I understand your solution does not cover quotas.
What happens if someone starts to transfer 100 partitions ? 

> If yes, it will read a ByteBufferMessageSet from topicPartition.log and 
> append the message set to topicPartition.move

i.e. processPartitionData will read data from the beginning of 
topicPartition.log? What is the read size?
A ReplicaFetchThread reads many partitions so if one does some complicated work 
(= read a lot of data from disk) everything will slow down.
I think read size should not be very big. 

On the other hand at this point (processPartitionData) one can use only the new 
data (ByteBufferMessageSet  from parameters) and wait until 
(topicPartition.move.smallestOffset <= topicPartition.log.smallestOffset && 
topicPartition.log.largestOffset == topicPartition.log.largestOffset).
In this case the write speed to topicPartition.move and topicPartition.log will 
be the same so this will allow us to move many partitions to one disk.

>
> And to answer your question, yes topicpartition.log refers to
> topic-paritition/segment.log.
>
> Thanks,
> Dong
>
> On Fri, Jan 13, 2017 at 4:12 AM, Alexey Ozeritsky 
> wrote:
>
>>  Hi,
>>
>>  We have the similar solution that have been working in production since
>>  2014. You can see it here: https://github.com/resetius/ka
>>  fka/commit/20658593e246d2184906879defa2e763c4d413fb
>>  The idea is very simple
>>  1. Disk balancer runs in a separate thread inside scheduler pool.
>>  2. It does not touch empty partitions
>>  3. Before it moves a partition it forcibly creates new segment on a
>>  destination disk
>>  4. It moves segment by segment from new to old.
>>  5. Log class works with segments on both disks
>>
>>  Your approach seems too complicated, moreover it means that you have to
>>  patch different components of the system
>>  Could you clarify what do you mean by topicPartition.log? Is it
>>  topic-paritition/segment.log ?
>>
>>  12.01.2017, 21:47, "Dong Lin" :
>>  > Hi all,
>>  >
>>  > We created KIP-113: Support replicas movement between log directories.
>>  > Please find the KIP wiki in the link
>>  > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
>>  3A+Support+replicas+movement+between+log+directories
>>  > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%
>>  3A+Support+replicas+movement+between+log+directories>.*
>>  >
>>  > This KIP is related to KIP-112
>>  > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-112%
>>  3A+Handle+disk+failure+for+JBOD>:
>>  > Handle disk failure for JBOD. They are needed in order to support JBOD in
>>  > Kafka. Please help review the KIP. You feedback is appreciated!
>>  >
>>  > Thanks,
>>  > Dong


Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-01-13 Thread Alexey Ozeritsky
Hi,

We have the similar solution that have been working in production since 2014. 
You can see it here: 
https://github.com/resetius/kafka/commit/20658593e246d2184906879defa2e763c4d413fb
The idea is very simple
1. Disk balancer runs in a separate thread inside scheduler pool.
2. It does not touch empty partitions
3. Before it moves a partition it forcibly creates new segment on a destination 
disk
4. It moves segment by segment from new to old.
5. Log class works with segments on both disks

Your approach seems too complicated, moreover it means that you have to patch 
different components of the system
Could you clarify what do you mean by topicPartition.log? Is it 
topic-paritition/segment.log ?

12.01.2017, 21:47, "Dong Lin" :
> Hi all,
>
> We created KIP-113: Support replicas movement between log directories.
> Please find the KIP wiki in the link
> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories
> .*
>
> This KIP is related to KIP-112
> :
> Handle disk failure for JBOD. They are needed in order to support JBOD in
> Kafka. Please help review the KIP. You feedback is appreciated!
>
> Thanks,
> Dong


Re: Prevent broker from leading topic partitions

2016-08-22 Thread Alexey Ozeritsky
Hi

22.08.2016, 19:16, "Tom Crayford" :
> Hi,
>
> I don't think I understand *why* you need this. Kafka is by default a
> distributed HA system that balances data and leadership over nodes. Why do
> you need to change this?

In fact, kafka does not balance anything. It uses static partition distribution.
In most cases the first replica in replica list is always a leader.

If a replica is under heavy load (for example hard disk was replaced or so on) 
it is a bad idea to do automatic leader "rebalance".

>
> You could accomplish something like this with mirror maker, that may make
> more sense.
>
> Thanks
>
> Tom Crayford
>
> Heroku Kafka
>
> On Mon, Aug 22, 2016 at 4:05 PM, Jason Aliyetti 
> wrote:
>
>>  I have a use case that requires a 2 node deployment with a Kafka-backed
>>  service with the following constraints:
>>
>>  - All data must be persisted to node 1. If node 1 fails (regardless of the
>>  status of node 2), then the system must stop.
>>  - If node 2 is up, then it must stay in synch with node 1.
>>  - If node 2 fails, then service must not be disrupted, but as soon as it
>>  comes back up and rejoins ISR it must stay in synch.
>>
>>  The deployment is basically a primary node and a cold node with real time
>>  replication, but no failover to the cold node.
>>
>>  To achieve this I am considering adding a broker-level configuration option
>>  that would prevent a broker from becoming a leader for any topic partition
>>  it hosts - this would allow me to enforce that the cold node never take
>>  leadership for any topics. In conjunction with manipulating a topic's
>>  "min.insync.replicas" setting at runtime, I should be able to achieve the
>>  behavior desired (2 if both brokers up, 1 if the standby goes down).
>>
>>  I know this sounds like an edgy use case, but does this sound like a
>>  reasonable approach? Are there any valid use cases around such a broker or
>>  topic level configuration (i.e. does this sound like a feature that would
>>  make sense to open a KIP against)?