Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-07-12 Thread Satish Duggana
We already mentioned in the KIP that RemoteLogMetadataMamnager is
pluggable. Users have an option to plugin their own implementation of
RLMM instead of using the default implementaion(which is on topic
storage), which can be based on their remote storage environments like
AWS, GCP, Azure etc.

On Sun, Jul 12, 2020 at 6:36 AM Adam Bellemare  wrote:
>
> My 2 cents -
>
> I agree with Colin. I think that it's important that the metadata not grow
> unbounded without being delegated to external storage. Indefinite long-term
> storage of entity data in Kafka can result in extremely large datasets
> where the vast majority of data is stored in the external tier. I would be
> very disappointed to have the metadata storage be a limiting factor to
> exactly how much data I can store in Kafka. Additionally, and for example,
> I think it's very reasonable that an AWS metadata store could be
> implemented with DynamoDB (key-value store) paired with S3 - faster
> random-access metadata lookup than plain S3, but without needing to rebuild
> rocksDB state locally.
>
>
>
> On Fri, Jul 10, 2020 at 3:57 PM Colin McCabe  wrote:
>
> > Hi all,
> >
> > Thanks for the KIP.
> >
> > I took a look and one thing that stood out to me is that the more metadata
> > we have, the more storage we will need on local disk for the rocksDB
> > database.  This seems like it contradicts some of the goals of the
> > project.  Ideally the space we need on local disk should be related only to
> > the size of the hot set, not the size of the cold set.  It also seems like
> > it could lead to extremely long rocksdb rebuild times if we somehow lose a
> > broker's local storage and have to rebuild it.
> >
> > Instead, I think it would be more reasonable to store cold metadata in the
> > "remote" storage (HDFS, s3, etc.).  Not only does this free up space on the
> > local and avoid long rebuild times, but it also gives us more control over
> > the management of our cache.  With rocksDB we are delegating cache
> > management to an external library that doesn't really understand our
> > use-case.
> >
> > To give a concrete example of how this is bad, imagine that we have 10
> > worker threads and we get  10 requests for something that requires us to
> > fetch cold tiered storage metadata.  Now every worker thread is blocked
> > inside rocksDB and the broker can do nothing until it finishes fetching
> > from disk.  When accessing a remote service like HDFS or S3, in contrast,
> > we would be able to check if the data was in our local cache first.  If it
> > wasn't, we could put the request in a purgatory and activate a background
> > thread to fetch the needed data, and then release the worker thread to be
> > used by some other request.  Having control of our own caching strategy
> > increases observability, maintainability, and performance.
> >
> > I can anticipate a possible counter-argument here: the size of the
> > metadata should be small and usually fully resident in memory anyway.
> > While this is true today, I don't think it will always be true.  The
> > current low limit of a few thousand partitions is not competitive in the
> > long term and needs to be lifted.  We'd like to get to at least a million
> > partitions with KIP-500, and much more later.  Also, when you give people
> > the ability to have unlimited retention, they will want to make use of it.
> > That means lots of historical log segments to track.  This scenario is by
> > no means hypothetical.  Even with the current software, it's easy to think
> > of cases where someone misconfigured the log segment roll settings and
> > overwhelmed the system with segments.  So overall, I like to understand why
> > we want to store metadata on local disk rather than remote, and what the
> > options are for the future.
> >
> > best,
> > Colin
> >
> >
> > On Thu, Jul 9, 2020, at 09:55, Harsha Chintalapani wrote:
> > > Hi Jun,
> > >   Thanks for the replies and feedback on design and giving input.
> > > We are coming close to finish the implementation.
> > > We also did several perf tests as well at our peak production loads and
> > > with tiered storage we didn't see any degradation on write throughputs
> > and
> > > latencies.
> > > Ying already added some of the perf tests results in the KIP itself.
> > >  It will be great if we can get design and code reviews from you
> > > and others in the community as we make progress.
> > > Thanks,
> > > Harsha
> > >
> > > On Tue, Jul 7, 2020 at 10:34 AM Jun Rao  wrote:
> > >
> > > > Hi, Ying,
> > > >
> > > > Thanks for the update. It's good to see the progress on this. Please
> > let
> > > > us know when you are done updating the KIP wiki.
> > > >
> > > > Jun
> > > >
> > > > On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng 
> > wrote:
> > > >
> > > >> Hi Jun,
> > > >>
> > > >> Satish and I have added more design details in the KIP, including how
> > to
> > > >> keep consistency between replicas (especially when there is leadership
> > > >> 

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-07-12 Thread Satish Duggana
Hi Colin,
Thanks for looking into the KIP.

I guess you are talking about the option mentioned for a cache used in
the default implementation of RemoteLogMetadataManager.

RocksDB is put as an option of cache for the initial version of the
default implementation because it is easy and works with the
partitions that are generally deployed on brokers. Each remote log
segment metadata may not take more than 200 bytes. If segment size is
500MB(default is 1GB) then 10PB of segments takes around 4GB.
Generally, large internet scale companies have data lakes around 100s
of PBs.
RLMM is not accessed in hot paths but it is used by background threads
to copy/delete remote log segments.

Our main focus in the initial version is to get the overall
architecture work well and it should be extensible for future
improvements. We have plans to improve this further in future KIP to
store in remote storage and fetch when it is needed.

Thanks,
Satish.

On Sat, Jul 11, 2020 at 1:27 AM Colin McCabe  wrote:
>
> Hi all,
>
> Thanks for the KIP.
>
> I took a look and one thing that stood out to me is that the more metadata we 
> have, the more storage we will need on local disk for the rocksDB database.  
> This seems like it contradicts some of the goals of the project.  Ideally the 
> space we need on local disk should be related only to the size of the hot 
> set, not the size of the cold set.  It also seems like it could lead to 
> extremely long rocksdb rebuild times if we somehow lose a broker's local 
> storage and have to rebuild it.
>
> Instead, I think it would be more reasonable to store cold metadata in the 
> "remote" storage (HDFS, s3, etc.).  Not only does this free up space on the 
> local and avoid long rebuild times, but it also gives us more control over 
> the management of our cache.  With rocksDB we are delegating cache management 
> to an external library that doesn't really understand our use-case.
>
> To give a concrete example of how this is bad, imagine that we have 10 worker 
> threads and we get  10 requests for something that requires us to fetch cold 
> tiered storage metadata.  Now every worker thread is blocked inside rocksDB 
> and the broker can do nothing until it finishes fetching from disk.  When 
> accessing a remote service like HDFS or S3, in contrast, we would be able to 
> check if the data was in our local cache first.  If it wasn't, we could put 
> the request in a purgatory and activate a background thread to fetch the 
> needed data, and then release the worker thread to be used by some other 
> request.  Having control of our own caching strategy increases observability, 
> maintainability, and performance.
>
> I can anticipate a possible counter-argument here: the size of the metadata 
> should be small and usually fully resident in memory anyway.  While this is 
> true today, I don't think it will always be true.  The current low limit of a 
> few thousand partitions is not competitive in the long term and needs to be 
> lifted.  We'd like to get to at least a million partitions with KIP-500, and 
> much more later.  Also, when you give people the ability to have unlimited 
> retention, they will want to make use of it.  That means lots of historical 
> log segments to track.  This scenario is by no means hypothetical.  Even with 
> the current software, it's easy to think of cases where someone misconfigured 
> the log segment roll settings and overwhelmed the system with segments.  So 
> overall, I like to understand why we want to store metadata on local disk 
> rather than remote, and what the options are for the future.
>
> best,
> Colin
>
>
> On Thu, Jul 9, 2020, at 09:55, Harsha Chintalapani wrote:
> > Hi Jun,
> >   Thanks for the replies and feedback on design and giving input.
> > We are coming close to finish the implementation.
> > We also did several perf tests as well at our peak production loads and
> > with tiered storage we didn't see any degradation on write throughputs and
> > latencies.
> > Ying already added some of the perf tests results in the KIP itself.
> >  It will be great if we can get design and code reviews from you
> > and others in the community as we make progress.
> > Thanks,
> > Harsha
> >
> > On Tue, Jul 7, 2020 at 10:34 AM Jun Rao  wrote:
> >
> > > Hi, Ying,
> > >
> > > Thanks for the update. It's good to see the progress on this. Please let
> > > us know when you are done updating the KIP wiki.
> > >
> > > Jun
> > >
> > > On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng  wrote:
> > >
> > >> Hi Jun,
> > >>
> > >> Satish and I have added more design details in the KIP, including how to
> > >> keep consistency between replicas (especially when there is leadership
> > >> changes / log truncations) and new metrics. We also made some other minor
> > >> changes in the doc. We will finish the KIP changes in the next couple of
> > >> days. We will let you know when we are done. Most of the changes are
> > >> already updated to the wiki KIP. 

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-07-12 Thread Colin McCabe
Hi Unmesh,

That's an interesting idea, but I think it would be best to strive for single 
metadata events that are complete in themselves, rather than trying to do 
something transactional or EOS-like.  For example, we could have a create event 
that contains all the partitions to be created.

best,
Colin


On Fri, Jul 10, 2020, at 04:12, Unmesh Joshi wrote:
> I was thinking that we might need something like multi-operation
>  record in zookeeper
> to atomically create topic and partition records when this multi record is
> committed.  This way metadata will have both the TopicRecord and
> PartitionRecord together always, and in no situation we can have
> TopicRecord without PartitionRecord. Not sure if there are other situations
> where multi-operation is needed.
> 
> 
> Thanks,
> Unmesh
> 
> On Fri, Jul 10, 2020 at 11:32 AM Colin McCabe  wrote:
> 
> > Hi Unmesh,
> >
> > Yes, once the last stable offset advanced, we would consider the topic
> > creation to be done, and then we could return success to the client.
> >
> > best,
> > Colin
> >
> > On Thu, Jul 9, 2020, at 19:44, Unmesh Joshi wrote:
> > > It still needs HighWaterMark / LastStableOffset to be advanced by two
> > > records? Something like following?
> > >
> > >
> > >||
> > > <--||   HighWaterMark
> > >Response|PartitionRecord |
> > >||
> > >-|
> > >| TopicRecord|  -
> > >||
> > > --->   --   Previous HighWaterMark
> > >CreateTopic ||
> > >||
> > >||
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > On Fri, Jul 10, 2020 at 1:30 AM Colin McCabe  wrote:
> > >
> > > > On Thu, Jul 9, 2020, at 04:37, Unmesh Joshi wrote:
> > > > > I see that, when a new topic is created, two metadata records, a
> > > > > TopicRecord (just the name and id of the topic) and a PartitionRecord
> > > > (more
> > > > > like LeaderAndIsr, with leader id and replica ids for the partition)
> > are
> > > > > created.
> > > > > While creating the topic, log entries for both the records need to be
> > > > > committed in RAFT core. Will it need something like a
> > > > MultiOperationRecord
> > > > > in zookeeper. Then, we can have a single log entry with both the
> > records,
> > > > > and  the create topic request can be fulfilled atomically when both
> > the
> > > > > records are committed?
> > > > >
> > > >
> > > > Hi Unmesh,
> > > >
> > > > Since the active controller is the only node writing to the log, there
> > is
> > > > no need for any kind of synchronization or access control at the log
> > level.
> > > >
> > > > best,
> > > > Colin
> > > >
> > > > >
> > > > > Thanks,
> > > > > Unmesh
> > > > >
> > > > > On Wed, Jul 8, 2020 at 6:57 AM Ron Dagostino 
> > wrote:
> > > > >
> > > > > > HI Colin.  Thanks for the KIP.  Here is some feedback and various
> > > > > > questions.
> > > > > >
> > > > > > "*Controller processes will listen on a separate port from brokers.
> > > > This
> > > > > > will be true even when the broker and controller are co-located in
> > the
> > > > same
> > > > > > JVM*". I assume it is possible that the port numbers could be the
> > same
> > > > when
> > > > > > using separate JVMs (i.e. broker uses port 9192 and controller also
> > > > uses
> > > > > > port 9192).  I think it would be clearer to state this along these
> > > > > > lines: "Controller
> > > > > > nodes will listen on a port, and the controller port must differ
> > from
> > > > any
> > > > > > port that a broker in the same JVM is listening on.  In other
> > words, a
> > > > > > controller and a broker node, when in the same JVM, do not share
> > ports"
> > > > > >
> > > > > > I think the sentence "*In the realm of ACLs, this translates to
> > > > controllers
> > > > > > requiring CLUSTERACTION on CLUSTER for all operations*" is
> > confusing.
> > > > It
> > > > > > feels to me that you can just delete it.  Am I missing something
> > here?
> > > > > >
> > > > > > The KIP states "*The metadata will be stored in memory on all the
> > > > active
> > > > > > controllers.*"  Can there be multiple active controllers?  Should
> > it
> > > > > > instead read "The metadata will be stored in memory on all
> > potential
> > > > > > controllers." (or something like that)?
> > > > > >
> > > > > > KIP-595 states "*we have assumed the name __cluster_metadata for
> > this
> > > > > > topic, but this is not a formal part of this proposal*".  This
> > KIP-631
> > > > > > states "*Metadata changes need to be persisted to the __metadata
> > log
> > > > before
> > > > > > we propagate 

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-07-12 Thread Colin McCabe
On Fri, Jul 10, 2020, at 18:15, Guozhang Wang wrote:
> Hello Colin,
> 
> Thanks for the nice written KIP. A few meta comments:
> 
> 1) We need to talk a bit about broker failure detection: is that piggy
> backed with fencing? i.e. should the controller immediately migrate leader
> partitions from the fenced brokers? On one side, when a broker is fenced it
> cannot take any client requests including produce / fetch, so it seems we'd
> need to let other brokers to take those hosted partitions asap; on the
> other side, if the fenced broker is just due to stale metadata which is
> soon to be caught up then maybe we only want it to stop handling metadata
> request from clients, and if we treat it as a failed broker then we may
> have back-and-forth partition migrations.
>

Hi Guozhang,

Yes, a fenced broker is removed from the cluster.  It is the same as being 
disconnected from ZooKeeper in the current system.  A broker which isn't 
processing metadata updates is a failed broker.  It may be the leader for 
partitions and not know about it.  It may not even know about the existence of 
the partitions themselves.  This clearly would impact availability if we let it 
stay in the cluster.

ISR churn is mainly because of positive feedback in the system, where bad 
conditions create worse ones.  The KIP-631 design should eliminate a lot of the 
positive feedback, although not all of it.  For example, we won't have to 
resend the full state of the cluster when the controller fails, a common source 
of performance collapses (since sending the full state itself degrades 
performance, maybe triggering more failures).  The controller itself could 
rate-limit some of the metadata changes it is making to avoid overloading the 
cluster-- unlike ZK, it can make decisions and do admissions control rather 
than just passively accepting whatever is stored in it.

> 
> 2) I'm a bit confused about ControllerHeartbeat v.s. BrokerHeartbeat RPC:
> in the Broker Registration and State Management section it is stated that
> "each broker sends a ControllerHeartbeat request to the active controller
> every few seconds" but later in RPC it seems that brokers are
> sending BrokerHeartbeat instead. If that's the case, what
> are ControllerHeartbeat used for?
> 

Sorry, that should have read "each broker sends a BrokerHeartbeat..."  I fixed 
the typo.

BrokerHeartbeat is sent by brokers, and ControllerHeartbeat is sent by 
controllers.

best,
Colin

> 
> Guozhang
> 
> 
> On Fri, Jul 10, 2020 at 4:12 AM Unmesh Joshi  wrote:
> 
> > I was thinking that we might need something like multi-operation
> >  record in zookeeper
> > to atomically create topic and partition records when this multi record is
> > committed.  This way metadata will have both the TopicRecord and
> > PartitionRecord together always, and in no situation we can have
> > TopicRecord without PartitionRecord. Not sure if there are other situations
> > where multi-operation is needed.
> > 
> >
> > Thanks,
> > Unmesh
> >
> > On Fri, Jul 10, 2020 at 11:32 AM Colin McCabe  wrote:
> >
> > > Hi Unmesh,
> > >
> > > Yes, once the last stable offset advanced, we would consider the topic
> > > creation to be done, and then we could return success to the client.
> > >
> > > best,
> > > Colin
> > >
> > > On Thu, Jul 9, 2020, at 19:44, Unmesh Joshi wrote:
> > > > It still needs HighWaterMark / LastStableOffset to be advanced by two
> > > > records? Something like following?
> > > >
> > > >
> > > >||
> > > > <--||   HighWaterMark
> > > >Response|PartitionRecord |
> > > >||
> > > >-|
> > > >| TopicRecord|  -
> > > >||
> > > > --->   --   Previous HighWaterMark
> > > >CreateTopic ||
> > > >||
> > > >||
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Jul 10, 2020 at 1:30 AM Colin McCabe 
> > wrote:
> > > >
> > > > > On Thu, Jul 9, 2020, at 04:37, Unmesh Joshi wrote:
> > > > > > I see that, when a new topic is created, two metadata records, a
> > > > > > TopicRecord (just the name and id of the topic) and a
> > PartitionRecord
> > > > > (more
> > > > > > like LeaderAndIsr, with leader id and replica ids for the
> > partition)
> > > are
> > > > > > created.
> > > > > > While creating the topic, log entries for both the records need to
> > be
> > > > > > committed in RAFT core. Will it need something like a
> > > > > MultiOperationRecord
> > > > > > in zookeeper. Then, we can have a single log entry with both the
> > > records,
> > > > > > 

[VOTE] KIP-614: Add Prefix Scan support for State Stores

2020-07-12 Thread Sagar
Hi All,

I would like to start a new voting thread for the below KIP to add prefix
scan support to state stores:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-
614%3A+Add+Prefix+Scan+support+for+State+Stores


Thanks!
Sagar.


Build failed in Jenkins: kafka-trunk-jdk11 #1637

2020-07-12 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10247: Correctly reset state when task is corrupted (#8994)


--
[...truncated 1.87 MB...]
unit.kafka.controller.ControllerContextTest > testPartitionReplicaAssignment 
PASSED

unit.kafka.controller.ControllerContextTest > 
testUpdatePartitionFullReplicaAssignmentUpdatesReplicaAssignment STARTED

unit.kafka.controller.ControllerContextTest > 
testUpdatePartitionFullReplicaAssignmentUpdatesReplicaAssignment PASSED

unit.kafka.controller.ControllerContextTest > 
testPartitionReplicaAssignmentReturnsEmptySeqIfTopicOrPartitionDoesNotExist 
STARTED

unit.kafka.controller.ControllerContextTest > 
testPartitionReplicaAssignmentReturnsEmptySeqIfTopicOrPartitionDoesNotExist 
PASSED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[0] 
STARTED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[0] PASSED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[1] 
STARTED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[1] PASSED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[2] 
STARTED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[2] PASSED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[3] 
STARTED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[3] PASSED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[4] 
STARTED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[4] PASSED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[5] 
STARTED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[5] PASSED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[6] 
STARTED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[6] PASSED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[7] 
STARTED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[7] PASSED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[8] 
STARTED

unit.kafka.cluster.AssignmentStateTest > testPartitionAssignmentStatus[8] PASSED

kafka.integration.UncleanLeaderElectionTest > 
testTopicUncleanLeaderElectionEnable PASSED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionEnabled 
STARTED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionEnabled 
PASSED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionDisabled 
STARTED

kafka.integration.UncleanLeaderElectionTest > testUncleanLeaderElectionDisabled 
PASSED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionInvalidTopicOverride STARTED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionInvalidTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionEnabledByTopicOverride STARTED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionEnabledByTopicOverride PASSED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride STARTED

kafka.integration.UncleanLeaderElectionTest > 
testUncleanLeaderElectionDisabledByTopicOverride PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDataChange 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDataChange PASSED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperSessionStateMetric STARTED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperSessionStateMetric PASSED

kafka.zookeeper.ZooKeeperClientTest > testExceptionInBeforeInitializingSession 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testExceptionInBeforeInitializingSession 
PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testConnection STARTED

kafka.zookeeper.ZooKeeperClientTest > testConnection PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForCreation STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForCreation PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetAclExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetAclExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testSessionExpiryDuringClose STARTED

kafka.zookeeper.ZooKeeperClientTest > testSessionExpiryDuringClose PASSED

kafka.zookeeper.ZooKeeperClientTest > testSetAclNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testSetAclNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testConnectionLossRequestTermination 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testConnectionLossRequestTermination 
PASSED

kafka.zookeeper.ZooKeeperClientTest >