Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-08-24 Thread Ying Zheng
considered to be a failure and retried.
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> 5005. When a Kafka cluster is provisioned for the first time with
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> KIP-405 <https://issues.apache.org/jira/browse/KIP-405>
> >> >>>
> >> >>>
> >> >>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> tiered storage enabled, could you explain in the KIP about how the
> >> >>>>> bootstrap for __remote_log_metadata topic will be performed in the
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>
> >> >>
> >> >>
> >> >> the
> >> >>
> >> >>
> >> >>>
> >> >>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> default RLMM implementation?
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> 5006. I currently do not see details on the KIP on why RocksDB was
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> chosen
> >> >>>
> >> >>>
> >> >>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> as the default cache implementation, and how it is going to be
> used.
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>>
> >> >>> Were
> >> >>>
> >> >>>
> >> >>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> alternatives compared/considered? For example, it would be useful
> to
> >> >>>>> explain/evaulate the following: 1) debuggability of the RocksDB
> JNI
> >> >>>>> interface, 2) performance, 3) portability across platforms and 4)
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> interface
> >> >>>>
> >> >>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> parity of RocksDB’s JNI api with it's underlying C/C++ api.
> >> >>>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> 5007. For the RocksDB cache (the default implementation of RLMM),
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>
> >> >>
> >> >>
> >> >> what
> >> >>
> >> >>
> >> >>>
> >> >>>
> >> >>> is
> >> >>>
> >> >>>
> >> >>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> the relationship/mapping between the following: 1) # of tiered
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>>
> >> >>>>
> >> >>>> partitions,
> >> >>>>
> >> >>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> 2) # of partitions of metadata topic __remote_log_metadata and 3)
> #
> >> >>>>>
> >> >>>>>
> >> >>>>
> >> >>>>
> >> >>>
> >> >>>
> >> >>
> >> >>
> >> >>
> >> >> of
> >> >>
> >> >>
> >> >>>
> >> >>>>
> >> >>>>>
> >> >>>>>
> >> >>>>> RocksDB instances? i.e. is the pla

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-07-28 Thread Ying Zheng
> > > > > 1001. The KIP described a few scenarios of unclean leader
> elections.
> > This
> > > > > is very useful, but I am wondering if this is the best approach. My
> > > > > understanding of the proposed approach is to allow the new
> (unclean)
> > > > leader
> > > > > to take new messages immediately. While this increases
> availability,
> > it
> > > > > creates the problem that there could be multiple conflicting
> > segments in
> > > > > the remote store for the same offset range. This seems to make it
> > harder
> > > > > for RLMM to determine which archived log segments contain the
> correct
> > > > data.
> > > > > For example, an archived log segment could at one time be the
> correct
> > > > data,
> > > > > but be changed to incorrect data after an unclean leader election.
> An
> > > > > alternative approach is to let the unclean leader use the archived
> > data
> > > > as
> > > > > the source of truth. So, when the new (unclean) leader takes over,
> it
> > > > first
> > > > > reconciles the local data based on the archived data before taking
> > new
> > > > > messages. This makes the job of RLMM a bit easier since all
> archived
> > data
> > > > > are considered correct. This increases availability a bit. However,
> > since
> > > > > unclean leader elections are rare, this may be ok.
> > > > >
> > > > > 1002. RemoteStorageManager.
> > > > > 1002.1 There seems to be some inconsistencies in
> > RemoteStorageManager. We
> > > > > pass in RemoteLogSegmentId copyLogSegment(). For all other methods,
> > we
> > > > pass
> > > > > in RemoteLogSegmentMetadata.
> > > > > 1002.2 Is endOffset in RemoteLogSegmentMetadata inclusive or
> > exclusive?
> > > > > 1002.3 It seems that we need an api to get the leaderEpoch history
> > for a
> > > > > partition.
> > > > > 1002.4 Could you define the type of RemoteLogSegmentContext?
> > > > >
> > > > > 1003 RemoteLogMetadataManager
> > > > > 1003.1 I am not sure why we need both of the following methods
> > > > > in RemoteLogMetadataManager. Could we combine them into one that
> > takes in
> > > > > offset and returns RemoteLogSegmentMetadata?
> > > > > RemoteLogSegmentId getRemoteLogSegmentId(TopicPartition
> > > > topicPartition,
> > > > > long offset) throws IOException;
> > > > > RemoteLogSegmentMetadata
> > > > getRemoteLogSegmentMetadata(RemoteLogSegmentId
> > > > > remoteLogSegmentId) throws IOException;
> > > > > 1003.2 There seems to be some inconsistencies in the methods below.
> > I am
> > > > > not sure why one takes RemoteLogSegmentMetadata and the other
> > > > > takes RemoteLogSegmentId.
> > > > > void putRemoteLogSegmentData(RemoteLogSegmentMetadata
> > > > > remoteLogSegmentMetadata) throws IOException;
> > > > > void deleteRemoteLogSegmentMetadata(RemoteLogSegmentId
> > > > > remoteLogSegmentId) throws IOException;
> > > > > 1003.3 In void onServerStarted(final String serverEndpoint), what
> > > > > is serverEndpoint used for?
> > > > >
> > > > > 1004. It would be useful to document how all the new APIs are being
> > used.
> > > > > For example, when is RemoteLogSegmentMetadata.markedForDeletion
> > being set
> > > > > and used? How are
> > > > > RemoteLogMetadataManager.earliestLogOffset/highestLogOffset being
> > used?
> > > > >
> > > > > 1005. Handling partition deletion: The KIP says "RLMM will
> eventually
> > > > > delete these segments by using RemoteStorageManager." Which replica
> > does
> > > > > this logic?
> > > > >
> > > > > 1006. "If there are any failures in removing remote log segments
> then
> > > > those
> > > > > are stored in a specific topic (default as
> > > > __remote_segments_to_be_deleted)
> > > > > and user can consume the events(which contain
> remote-log-segment-id)
> > from
> > > > > that topic and clean them up from remote storage.  " Not sure if
> it's
> > > > worth
> > &

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-07-14 Thread Ying Zheng
Hi Jun,
Hi Colin,

Satish and I are still discussing some details about how to handle
transactions / producer ids. Satish is going to make some minor changes to
RLMM API and other parts. Other than that, we have finished updating the KIP

I agree with Colin that the current design of using rocksDB is not
optimal. But this design is simple and should work for almost all the
existing Kafka users. RLMM is a plugin. Users can replace rocksDB with
their own RLMM implementation, if needed. So, I think we can keep rocksDB
for now. What do you think?


Thanks,
Ying



On Tue, Jul 7, 2020 at 10:35 AM Jun Rao  wrote:

> Hi, Ying,
>
> Thanks for the update. It's good to see the progress on this. Please let us
> know when you are done updating the KIP wiki.
>
> Jun
>
> On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng  wrote:
>
> > Hi Jun,
> >
> > Satish and I have added more design details in the KIP, including how to
> > keep consistency between replicas (especially when there is leadership
> > changes / log truncations) and new metrics. We also made some other minor
> > changes in the doc. We will finish the KIP changes in the next couple of
> > days. We will let you know when we are done. Most of the changes are
> > already updated to the wiki KIP. You can take a look. But it's not the
> > final version yet.
> >
> > As for the implementation, the code is mostly done and we already had
> some
> > feature tests / system tests. I have added the performance test results
> in
> > the KIP. However the recent design changes (e.g. leader epoch info
> > management / log truncation / some of the new metrics) have not been
> > implemented yet. It will take about 2 weeks for us to implement after you
> > review and agree with those design changes.
> >
> >
> >
> > On Tue, Jul 7, 2020 at 9:23 AM Jun Rao  wrote:
> >
> > > Hi, Satish, Harsha,
> > >
> > > Any new updates on the KIP? This feature is one of the most important
> and
> > > most requested features in Apache Kafka right now. It would be helpful
> if
> > > we can make sustained progress on this. Could you share how far along
> is
> > > the design/implementation right now? Is there anything that other
> people
> > > can help to get it across the line?
> > >
> > > As for "transactional support" and "follower requests/replication", no
> > > further comments from me as long as the producer state and leader epoch
> > can
> > > be restored properly from the object store when needed.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Jun 9, 2020 at 3:39 AM Satish Duggana <
> satish.dugg...@gmail.com>
> > > wrote:
> > >
> > > > We did not want to add many implementation details in the KIP. But we
> > > > decided to add them in the KIP as appendix or sub-sections(including
> > > > follower fetch protocol) to describe the flow with the main cases.
> > > > That will answer most of the queries. I will update on this mail
> > > > thread when the respective sections are updated.
> > > >
> > > > Thanks,
> > > > Satish.
> > > >
> > > > On Sat, Jun 6, 2020 at 7:49 PM Alexandre Dupriez
> > > >  wrote:
> > > > >
> > > > > Hi Satish,
> > > > >
> > > > > A couple of questions specific to the section "Follower
> > > > > Requests/Replication", pages 16:17 in the design document [1].
> > > > >
> > > > > 900. It is mentioned that followers fetch auxiliary states from the
> > > > > remote storage.
> > > > >
> > > > > 900.a Does the consistency model of the external storage impacts
> > reads
> > > > > of leader epochs and other auxiliary data?
> > > > >
> > > > > 900.b What are the benefits of using a mechanism to store and
> access
> > > > > the leader epochs which is different from other metadata associated
> > to
> > > > > tiered segments? What are the benefits of retrieving this
> information
> > > > > on-demand from the follower rather than relying on propagation via
> > the
> > > > > topic __remote_log_metadata? What are the advantages over using a
> > > > > dedicated control structure (e.g. a new record type) propagated via
> > > > > this topic? Since in the document, different control paths are
> > > > > operating in the system, how are the metadata stored in
&

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-07-07 Thread Ying Zheng
>>>
> > > > > >>>>>>> A.3 Caching – storing locally the segments retrieved from
> > the remote storage is excluded as it does not align with the original
> intent
> > > > > >>>>>>>
> > > > > >>>>>>> and even
> > > > > >>>>>>>
> > > > > >>>>>>> defeat some of its purposes (save disk space etc.). That
> > said, could
> > > > > >>>>>>>
> > > > > >>>>>>> there
> > > > > >>>>>>>
> > > > > >>>>>>> be other types of use cases where the pattern of access to
> > the
> > > > > >>>>>>>
> > > > > >>>>>>> remotely
> > > > > >>>>>>>
> > > > > >>>>>>> stored segments would benefit from local caching (and
> > potentially read-ahead)? Consider the use case of a large pool of
> consumers
> > which
> > > > > >>>>>>>
> > > > > >>>>>>> start
> > > > > >>>>>>>
> > > > > >>>>>>> a backfill at the same time for one day worth of data from
> > one year
> > > > > >>>>>>>
> > > > > >>>>>>> ago
> > > > > >>>>>>>
> > > > > >>>>>>> stored remotely. Caching the segments locally would allow
> to
> > > > > >>>>>>>
> > > > > >>>>>>> uncouple the
> > > > > >>>>>>>
> > > > > >>>>>>> load on the remote storage from the load on the Kafka
> > cluster. Maybe
> > > > > >>>>>>>
> > > > > >>>>>>> the
> > > > > >>>>>>>
> > > > > >>>>>>> RLM could expose a configuration parameter to switch that
> > feature
> > > > > >>>>>>>
> > > > > >>>>>>> on/off?
> > > > > >>>>>>>
> > > > > >>>>>>> I tend to agree here, caching remote segments locally and
> > making this configurable sounds pretty practical to me. We should
> implement
> > > > > >>>>>>>
> > > > > >>>>>>> this,
> > > > > >>>>>>>
> > > > > >>>>>>> maybe not in the first iteration.
> > > > > >>>>>>>
> > > > > >>>>>>> Br,
> > > > > >>>>>>> Ivan
> > > > > >>>>>>>
> > > > > >>>>>>> [1]
> > > > > >>>>>>>
> > > > > >>>>>>>
> >
> https://github.com/harshach/kafka/pull/18/files#diff-4d73d01c16caed6f2548fc3063550ef0R152
> > > > > >>>>>>>
> > > > > >>>>>>> On Thu, 19 Dec 2019 at 19:49, Alexandre Dupriez <
> > > > > >>>>>>>
> > > > > >>>>>>> alexandre.dupr...@gmail.com>
> > > > > >>>>>>>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>
> > > > > >>>>>>> Hi Jun,
> > > > > >>>>>>>
> > > > > >>>>>>> Thank you for the feedback. I am trying to understand how a
> > > > > >>>>>>>
> > > > > >>>>>>> push-based
> > > > > >>>>>>>
> > > > > >>>>>>> approach would work.
> > > > > >>>>>>> In order for the metadata to be propagated (under the
> > assumption you stated), would you plan to add a new API in Kafka to allow
> > the metadata store to send them directly to the brokers?
> > > > > >>>>>>>
> > > > > >>>>>>> Thanks,
> > > > > >>>>>>> Alexandre
> > > > > >>>>>>>
> > > > > >>>>>&g

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-11-19 Thread Ying Zheng
Please ignore my previous email
I didn't know Apache requires all the discussions to be "open"

On Tue, Nov 19, 2019, 5:40 PM Ying Zheng  wrote:

> Hi Jun,
>
> Thank you very much for your feedback!
>
> Can we schedule a meeting in your Palo Alto office in December? I think a
> face to face discussion is much more efficient than emails. Both Harsha and
> I can visit you. Satish may be able to join us remotely.
>
> On Fri, Nov 15, 2019 at 11:04 AM Jun Rao  wrote:
>
>> Hi, Satish and Harsha,
>>
>> The following is a more detailed high level feedback for the KIP. Overall,
>> the KIP seems useful. The challenge is how to design it such that it’s
>> general enough to support different ways of implementing this feature and
>> support existing features.
>>
>> 40. Local segment metadata storage: The KIP makes the assumption that the
>> metadata for the archived log segments are cached locally in every broker
>> and provides a specific implementation for the local storage in the
>> framework. We probably should discuss this more. For example, some tier
>> storage providers may not want to cache the metadata locally and just rely
>> upon a remote key/value store if such a store is already present. If a
>> local store is used, there could be different ways of implementing it
>> (e.g., based on customized local files, an embedded local store like
>> RocksDB, etc). An alternative of designing this is to just provide an
>> interface for retrieving the tier segment metadata and leave the details
>> of
>> how to get the metadata outside of the framework.
>>
>> 41. RemoteStorageManager interface and the usage of the interface in the
>> framework: I am not sure if the interface is general enough.  For example,
>> it seems that RemoteLogIndexEntry is tied to a specific way of storing the
>> metadata in remote storage. The framework uses listRemoteSegments() api in
>> a pull based approach. However, in some other implementations, a push
>> based
>> approach may be more preferred. I don’t have a concrete proposal yet. But,
>> it would be useful to give this area some more thoughts and see if we can
>> make the interface more general.
>>
>> 42. In the diagram, the RemoteLogManager is side by side with LogManager.
>> This KIP only discussed how the fetch request is handled between the two
>> layer. However, we should also consider how other requests that touch the
>> log can be handled. e.g., list offsets by timestamp, delete records, etc.
>> Also, in this model, it's not clear which component is responsible for
>> managing the log start offset. It seems that the log start offset could be
>> changed by both RemoteLogManager and LogManager.
>>
>> 43. There are quite a few existing features not covered by the KIP. It
>> would be useful to discuss each of those.
>> 43.1 I won’t say that compacted topics are rarely used and always small.
>> For example, KStreams uses compacted topics for storing the states and
>> sometimes the size of the topic could be large. While it might be ok to
>> not
>> support compacted topics initially, it would be useful to have a high
>> level
>> idea on how this might be supported down the road so that we don’t have to
>> make incompatible API changes in the future.
>> 43.2 We need to discuss how EOS is supported. In particular, how is the
>> producer state integrated with the remote storage.
>> 43.3 Now that KIP-392 (allow consumers to fetch from closest replica) is
>> implemented, we need to discuss how reading from a follower replica is
>> supported with tier storage.
>> 43.4 We need to discuss how JBOD is supported with tier storage.
>>
>> Thanks,
>>
>> Jun
>>
>> On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley  wrote:
>>
>> > Thanks for those insights Ying.
>> >
>> > On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng 
>> wrote:
>> >
>> > > >
>> > > >
>> > > >
>> > > > Thanks, I missed that point. However, there's still a point at which
>> > the
>> > > > consumer fetches start getting served from remote storage (even if
>> that
>> > > > point isn't as soon as the local log retention time/size). This
>> > > represents
>> > > > a kind of performance cliff edge and what I'm really interested in
>> is
>> > how
>> > > > easy it is for a consumer which falls off that cliff to catch up
>> and so
>> > > its
>> > > > fetches again come from local storage. Obviously this can depend on
>

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-11-19 Thread Ying Zheng
Hi Jun,

Thank you very much for your feedback!

Can we schedule a meeting in your Palo Alto office in December? I think a
face to face discussion is much more efficient than emails. Both Harsha and
I can visit you. Satish may be able to join us remotely.

On Fri, Nov 15, 2019 at 11:04 AM Jun Rao  wrote:

> Hi, Satish and Harsha,
>
> The following is a more detailed high level feedback for the KIP. Overall,
> the KIP seems useful. The challenge is how to design it such that it’s
> general enough to support different ways of implementing this feature and
> support existing features.
>
> 40. Local segment metadata storage: The KIP makes the assumption that the
> metadata for the archived log segments are cached locally in every broker
> and provides a specific implementation for the local storage in the
> framework. We probably should discuss this more. For example, some tier
> storage providers may not want to cache the metadata locally and just rely
> upon a remote key/value store if such a store is already present. If a
> local store is used, there could be different ways of implementing it
> (e.g., based on customized local files, an embedded local store like
> RocksDB, etc). An alternative of designing this is to just provide an
> interface for retrieving the tier segment metadata and leave the details of
> how to get the metadata outside of the framework.
>
> 41. RemoteStorageManager interface and the usage of the interface in the
> framework: I am not sure if the interface is general enough.  For example,
> it seems that RemoteLogIndexEntry is tied to a specific way of storing the
> metadata in remote storage. The framework uses listRemoteSegments() api in
> a pull based approach. However, in some other implementations, a push based
> approach may be more preferred. I don’t have a concrete proposal yet. But,
> it would be useful to give this area some more thoughts and see if we can
> make the interface more general.
>
> 42. In the diagram, the RemoteLogManager is side by side with LogManager.
> This KIP only discussed how the fetch request is handled between the two
> layer. However, we should also consider how other requests that touch the
> log can be handled. e.g., list offsets by timestamp, delete records, etc.
> Also, in this model, it's not clear which component is responsible for
> managing the log start offset. It seems that the log start offset could be
> changed by both RemoteLogManager and LogManager.
>
> 43. There are quite a few existing features not covered by the KIP. It
> would be useful to discuss each of those.
> 43.1 I won’t say that compacted topics are rarely used and always small.
> For example, KStreams uses compacted topics for storing the states and
> sometimes the size of the topic could be large. While it might be ok to not
> support compacted topics initially, it would be useful to have a high level
> idea on how this might be supported down the road so that we don’t have to
> make incompatible API changes in the future.
> 43.2 We need to discuss how EOS is supported. In particular, how is the
> producer state integrated with the remote storage.
> 43.3 Now that KIP-392 (allow consumers to fetch from closest replica) is
> implemented, we need to discuss how reading from a follower replica is
> supported with tier storage.
> 43.4 We need to discuss how JBOD is supported with tier storage.
>
> Thanks,
>
> Jun
>
> On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley  wrote:
>
> > Thanks for those insights Ying.
> >
> > On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng 
> wrote:
> >
> > > >
> > > >
> > > >
> > > > Thanks, I missed that point. However, there's still a point at which
> > the
> > > > consumer fetches start getting served from remote storage (even if
> that
> > > > point isn't as soon as the local log retention time/size). This
> > > represents
> > > > a kind of performance cliff edge and what I'm really interested in is
> > how
> > > > easy it is for a consumer which falls off that cliff to catch up and
> so
> > > its
> > > > fetches again come from local storage. Obviously this can depend on
> all
> > > > sorts of factors (like production rate, consumption rate), so it's
> not
> > > > guaranteed (just like it's not guaranteed for Kafka today), but this
> > > would
> > > > represent a new failure mode.
> > > >
> > >
> > >  As I have explained in the last mail, it's a very rare case that a
> > > consumer
> > > need to read remote data. With our experience at Uber, this only
> happens
> > > when the consumer service had an outage fo

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-11-19 Thread Ying Zheng
On Fri, Nov 15, 2019 at 11:04 AM Jun Rao  wrote:

> Hi, Satish and Harsha,
>
> The following is a more detailed high level feedback for the KIP. Overall,
> the KIP seems useful. The challenge is how to design it such that it’s
> general enough to support different ways of implementing this feature and
> support existing features.
>
> 40. Local segment metadata storage: The KIP makes the assumption that the
> metadata for the archived log segments are cached locally in every broker
> and provides a specific implementation for the local storage in the
> framework. We probably should discuss this more. For example, some tier
> storage providers may not want to cache the metadata locally and just rely
> upon a remote key/value store if such a store is already present. If a
> local store is used, there could be different ways of implementing it
> (e.g., based on customized local files, an embedded local store like
> RocksDB, etc). An alternative of designing this is to just provide an
> interface for retrieving the tier segment metadata and leave the details of
> how to get the metadata outside of the framework.
>

[Ying]
Early this year, when we just started design tiered storage, we did plan to
make RemoteLogManager
a Kafka plugin. So that, there can be totally different implementations of
tiered storage.

However, one feedback we received from the community is that
developing RemoteLogManager
implementations are too hard for most potential users. People actually
prefer to one standard
implementation that can satisfy most of the requirements.

We accepted that feedback, and decided to trade some of the flexibility for
simplicity in the 1st version.
It's still possible to allow users provide different implementations in the
future.

We had discussions with different companies (e.g. Slack, AirBnb) that are
interested in tiered storage.
Our conclusion is that the current design (a standard RemoteLogManager that
caches remote metadata
locally + HDFS and S3 RemoteStorageMangers) is good enough for all of the
companies we have talked
with.

We don't have much knowledge about the use-cases out of Internet industry.
Do any consumers of
Confluent need to manage the metadata in different ways?





> 43. There are quite a few existing features not covered by the KIP. It
> would be useful to discuss each of those.
> 43.1 I won’t say that compacted topics are rarely used and always small.
> For example, KStreams uses compacted topics for storing the states and
> sometimes the size of the topic could be large. While it might be ok to not
> support compacted topics initially, it would be useful to have a high level
> idea on how this might be supported down the road so that we don’t have to
> make incompatible API changes in the future.
> 43.2 We need to discuss how EOS is supported. In particular, how is the
> producer state integrated with the remote storage.
> 43.3 Now that KIP-392 (allow consumers to fetch from closest replica) is
> implemented, we need to discuss how reading from a follower replica is
> supported with tier storage.
> 43.4 We need to discuss how JBOD is supported with tier storage.
>
> [Ying]
The support of compacted topics and EOS are definitely possible. We will
discuss the possible design in the KIP.

But for the 1st version, we prefer to focus on a relatively small scope, and
develop a simple and just enough solution for most users. Most features
will be gradually added in the future releases.

For compacted topic, we can save a new version of remote segment files
after each compact. The old remote version will be deleted after the new
version is available on remote storage.

For EOS, the snapshots can also be shipped to remote storage.

KIP-392 will be supported in the 1st version of tiered storage. We will add
the design details in the KIP.

JBOD of in remote storage is provided by the remote storage system
(e.g. HDFS, S3). This should be totally transparent for Kafka.

Tiered storage will make Kafka local storage much smaller, and make
JBOD of local storage less needed. We should be able to support JBOD
in local storage in the future. This shouldn't require any changes in RSM,
because only Kafka and RemoteLogManager talk with local storage.
So, there shouldn't be any compatibility issue when we support local
storage JBOD in the next version.



> Thanks,
>
> Jun
>
>
>


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-11-07 Thread Ying Zheng
>
>
>
> Thanks, I missed that point. However, there's still a point at which the
> consumer fetches start getting served from remote storage (even if that
> point isn't as soon as the local log retention time/size). This represents
> a kind of performance cliff edge and what I'm really interested in is how
> easy it is for a consumer which falls off that cliff to catch up and so its
> fetches again come from local storage. Obviously this can depend on all
> sorts of factors (like production rate, consumption rate), so it's not
> guaranteed (just like it's not guaranteed for Kafka today), but this would
> represent a new failure mode.
>

 As I have explained in the last mail, it's a very rare case that a consumer
need to read remote data. With our experience at Uber, this only happens
when the consumer service had an outage for several hours.

There is not a "performance cliff" as you assume. The remote storage is
even faster than local disks in terms of bandwidth. Reading from remote
storage is going to have higher latency than local disk. But since the
consumer
is catching up several hours data, it's not sensitive to the sub-second
level
latency, and each remote read request will read a large amount of data to
make the overall performance better than reading from local disks.



> Another aspect I'd like to understand better is the effect of serving fetch
> request from remote storage has on the broker's network utilization. If
> we're just trimming the amount of data held locally (without increasing the
> overall local+remote retention), then we're effectively trading disk
> bandwidth for network bandwidth when serving fetch requests from remote
> storage (which I understand to be a good thing, since brokers are
> often/usually disk bound). But if we're increasing the overall local+remote
> retention then it's more likely that network itself becomes the bottleneck.
> I appreciate this is all rather hand wavy, I'm just trying to understand
> how this would affect broker performance, so I'd be grateful for any
> insights you can offer.
>
>
Network bandwidth is a function of produce speed, it has nothing to do with
remote retention. As long as the data is shipped to remote storage, you can
keep the data there for 1 day or 1 year or 100 years, it doesn't consume any
network resources.


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-11-07 Thread Ying Zheng
On Wed, Nov 6, 2019 at 6:28 PM Tom Bentley  wrote:

> Hi Ying,
>
> Because only inactive segments can be shipped to remote storage, to be able
> > to ship log data as soon
> > as possible, we will roll log segment very fast (e.g. every half hour).
> >
>
> So that means a consumer which gets behind by half an hour will find its
> reads being served from remote storage.


No, the segments are shipped to remote storage as soon as possible. But
the local segment is not deleted until a configurable time (e.g. 6 hours).
The consumer request is served from local storage as long as the local
copy is still available. After 6 hour or longer, the consumer request will
be
served by remote storage.


> And, if I understand the proposed
> algorithm, each such consumer fetch request could result in a separate
> fetch request from the remote storage. I.e. there's no mechanism to
> amortize the cost of the fetching between multiple consumers fetching
> similar ranges?
>
>
We can have a small in memory cache on the broker. But this is not a high
priority right now. In any normal case, a Kafka consumer should not lag for
more than several hours. Only in some very extreme cases, a Kafka
consumer have to read from remote storage. It's very rare that 2 or more
consumers are read the same piece of remote data at about the same time.

(Actually the doc for RemoteStorageManager.read() says "It will read at
> least one batch, if the 1st batch size is larger than maxBytes.". Does that
> mean the broker might have to retry with increased maxBytes if the first
> request fails to read a batch? If so, how does it know how much to increase
> maxBytes by?)
>
>
No, there is no retry, just continuously reading until a full batch is
received.
The logic is exactly the same with the existing local segment read.


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-11-06 Thread Ying Zheng
On Wed, Nov 6, 2019 at 4:33 PM Ying Zheng  wrote:

> 21. I am not sure that I understood the need for RemoteLogIndexEntry and
> its relationship with RemoteLogSegmentInfo. It seems
> that RemoteLogIndexEntry are offset index entries pointing to record
> batches inside a segment. That seems to be the same as the .index file?
>
> We do not assume the how the data is stored in the remote storage.
> Depends on the implementation, the data of one segment may not necessary
> be stored in a single file.
> There could be a maximum object / chunk / file size restriction on the
> remote storage. So, one Kafka
> segment could be saved in multiple chunks in remote storage.
>
> The remote log index also have a larger index interval. The default
> interval of the local .index file
> (log.index.interval.bytes) is 4KB. In the current HDFS RSM implementation,
> the default remote
> index interval (hdfs.remote.index.interval.bytes) is 256KB. The
> coarse-grained remote index saves
> some local disk space. The smaller size also makes it more likely to be
> cached in physical memory.
>

The remote log index file is also very different from the existing .index
file. With the current design,
one .index file correspond to one segment file. But one remote log index
file can correspond to many
remote segments.

Because only inactive segments can be shipped to remote storage, to be able
to ship log data as soon
as possible, we will roll log segment very fast (e.g. every half hour).
This will lead to a large number of
small segments. If we maintain one remote index file for each remote
segment, we can easily hit some
OS limitations, like the maximum # of open files or the maximum # of
mmapped files.

So, instead of creating a new remote index file, we append
the RemoteLogIndexEntries of multiple
remote segments to one local file. We will roll the remote index file at a
configurable size or time interval.


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-11-06 Thread Ying Zheng
21. I am not sure that I understood the need for RemoteLogIndexEntry and
its relationship with RemoteLogSegmentInfo. It seems
that RemoteLogIndexEntry are offset index entries pointing to record
batches inside a segment. That seems to be the same as the .index file?

We do not assume the how the data is stored in the remote storage.
Depends on the implementation, the data of one segment may not necessary be
stored in a single file.
There could be a maximum object / chunk / file size restriction on the
remote storage. So, one Kafka
segment could be saved in multiple chunks in remote storage.

The remote log index also have a larger index interval. The default
interval of the local .index file
(log.index.interval.bytes) is 4KB. In the current HDFS RSM implementation,
the default remote
index interval (hdfs.remote.index.interval.bytes) is 256KB. The
coarse-grained remote index saves
some local disk space. The smaller size also makes it more likely to be
cached in physical memory.




On Thu, Oct 31, 2019 at 1:58 PM Jun Rao  wrote:

> Hi, Harsha,
>
> I am still looking at the KIP and the PR. A couple of quick
> comments/questions.
>
> 20. It's fine to keep the HDFS binding temporarily in the PR. We just need
> to remove it before it's merged to trunk. As Victor mentioned, we can
> provide a reference implementation based on a mocked version of remote
> storage.
>
> 21. I am not sure that I understood the need for RemoteLogIndexEntry and
> its relationship with RemoteLogSegmentInfo. It seems
> that RemoteLogIndexEntry are offset index entries pointing to record
> batches inside a segment. That seems to be the same as the .index file?
>
> Thanks,
>
> Jun
>
> On Mon, Oct 28, 2019 at 9:11 PM Satish Duggana 
> wrote:
>
> > Hi Viktor,
> > >1. Can we allow RLM Followers to serve read requests? After all segments
> > on
> > the cold storage are closed ones, no modification is allowed. Besides
> > KIP-392 (
> >
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D392-253A-2BAllow-2Bconsumers-2Bto-2Bfetch-2Bfrom-2Bclosest-2Breplica=DwIFaQ=r2dcLCtU9q6n0vrtnDw9vg=g7ujYPRBvNrON18SBeCt4g=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE=HTPACirRO-wVmOHmGEMlTIAov4szGHn38xrbFbMZK_I=
> > )
> > would introduce follower fetching too, so I think it would be nice to
> > prepare RLM for this as well.
> >
> > That is a good point. We plan to support fetching remote storage from
> > followers too. Current code in the PR work fine for this scenario
> > though there may be some edge cases to be handled. We have not yet
> > tested this scenario.
> >
> > >2. I think the remote.log.storage.enable config is redundant. By
> > specifying
> >
> https://urldefense.proofpoint.com/v2/url?u=http-3A__remote.log.storage.manager.class.name=DwIFaQ=r2dcLCtU9q6n0vrtnDw9vg=g7ujYPRBvNrON18SBeCt4g=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE=QsUunkBFX3dne_4caCiEAbp9xKUPrFx1srwznOR_Sfc=
> one already declares that they want
> > to use remote storage. Would it make sense to remove
> > the remote.log.storage.enable config?
> >
> > I do not think it is really needed. `remote.log.storage.enable`
> > property can be removed.
> >
> > Thanks,
> > Satish.
> >
> >
> > On Thu, Oct 24, 2019 at 2:46 PM Viktor Somogyi-Vass
> >  wrote:
> > >
> > > Hi Harsha,
> > >
> > > A couple more questions:
> > > 1. Can we allow RLM Followers to serve read requests? After all
> segments
> > on
> > > the cold storage are closed ones, no modification is allowed. Besides
> > > KIP-392 (
> > >
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D392-253A-2BAllow-2Bconsumers-2Bto-2Bfetch-2Bfrom-2Bclosest-2Breplica=DwIFaQ=r2dcLCtU9q6n0vrtnDw9vg=g7ujYPRBvNrON18SBeCt4g=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE=HTPACirRO-wVmOHmGEMlTIAov4szGHn38xrbFbMZK_I=
> > )
> > > would introduce follower fetching too, so I think it would be nice to
> > > prepare RLM for this as well.
> > > 2. I think the remote.log.storage.enable config is redundant. By
> > specifying
> > >
> https://urldefense.proofpoint.com/v2/url?u=http-3A__remote.log.storage.manager.class.name=DwIFaQ=r2dcLCtU9q6n0vrtnDw9vg=g7ujYPRBvNrON18SBeCt4g=CKNMp77DfMghjYo1JqbWr5jl-DRDBGF2owao5zUXDeE=QsUunkBFX3dne_4caCiEAbp9xKUPrFx1srwznOR_Sfc=
> one already declares that they
> > want
> > > to use remote storage. Would it make sense to remove
> > > the remote.log.storage.enable config?
> > >
> > > Thanks,
> > > Viktor
> > >
> > >
> > > On Thu, Oct 24, 2019 at 10:37 AM Viktor Somogyi-Vass <
> > > viktorsomo...@gmail.com> wrote:
> > >
> > > > Hi Jun & Harsha,
> > > >
> > > > I think it would be beneficial to at least provide one simple
> reference
> > > > implementation (file system based?) as we do with connect too.
> > > > That would as a simple example and would help plugin developers to
> > better
> > > > understand the concept and the interfaces.
> > > >
> > > > Best,
> > > > Viktor
> > > >
> > > > On Wed, Oct 23, 2019 at 8:49 PM Jun Rao  

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-10-28 Thread Ying Zheng
On Thu, Oct 24, 2019 at 7:13 AM Eno Thereska  wrote:

> Going back to initial thread with general questions on KIP. I think
> aspects of the user experience still need clarification:
>
> - if a user has a mix of compacted and non-compacted topics it will be
> hard to reason about storage needs overall. Could you give a reason
> why compacted topics are not supported? This is probably because to do
> that you'd have to go with a paging approach (like Ryanne earlier
> suggested) and that will be expensive in terms of IO. Do you want to
> discount supporting compacted topics this early in the KIP design or
> do you want to leave open the option of supporting them eventually? In
> an ideal system, Kafka figures out if the topic is compacted or not
> and for non-compacted topics it doesn't do the local copy so it goes
> through a fast path.
>
>
Hi Eno,

I think the main purpose of tiered storage is to save local disk space.
Compact topics are
just in-memory hash tables. They are anyway very small. They can be totally
ignored when
you calculate local storage needs.

The segment files in remote storage are considered as read-only. They are
not going to be
changed until deleted after expire.

The segment files of a compact topic have to be compacted / rewritten
periodically. Doing
this on remote storage can may things much more complicated.


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2019-10-28 Thread Ying Zheng
>
>
>
>
- Why wait until local segments expire before offloading them to cold
> storage? Why not stream to HDFS/S3 on an ongoing basis? I'd think this
> would reduce bursty behavior from periodic uploads.
>
>
I think you misunderstood the KIP. We do plan to ship the segment files to
remote storage
as soon as they are closed (no longer actively appended)


> - Can we write to multiple remote stores at the same time? Can we have some
> topics go to S3 and some to HDFS? Since we're storing "RDIs" that point to
> remote locations, can we generalize this to full URIs that may be in any
> supported remote store? In particular, what happens when you want to switch
> from HDFS to S3 -- can we add a new plugin and keep going? Can we fetch
> s3:/// URIs from S3 and hdfs:/// URIs from HDFS?
>
>
This is definitely one goal when we design the KIP. The reason we call the
field
RDI (byte[]) rather than URI (string) is just to give the plugin developers
more flexibility.
As we don't require the field to follow any standard schema and don't try
to interpret the
field in Kafka core, developers can put whatever information that are
needed by their
remote storage into the field.

For now, we just want to keep the first version simpler. We think most
users will not use
2 different remote storages in one cluster. So, this is not a high priority
feature. We can
add the support of mix remote storage types in the next version.


> - Instead of having brokers do all this, what if we just expose an API that
> lets external tooling register a URI for a given segment? If I've copied a
> segment file to S3, say with a daily cron job, why not just tell Kafka
> where to find it? Assuming I've got a plugin to _read_ from S3, that's all
> Kafka would need to know.
>
>
I think you agree that we anyway need a plugin to read the remote storage.
So, I think your question
is should we let Kafka broker drive the remote storage write, or should we
let the connector / external
tool drive the remote storage write. I think it's slightly better to let
Kafka broker drive this, because
1. Kafka broker has the latest topic information, such as if the broker is
still the leader, if a segment
file is already closed (high watermark has been moved to the next segment)
2. For the users, it's also easier to manage everything at one place: the
Kafka topic configuration.



> Ryanne
>
> On Thu, Oct 24, 2019, 9:13 AM Eno Thereska  wrote:
>
> > Going back to initial thread with general questions on KIP. I think
> > aspects of the user experience still need clarification:
> >
> > - if a user has a mix of compacted and non-compacted topics it will be
> > hard to reason about storage needs overall. Could you give a reason
> > why compacted topics are not supported? This is probably because to do
> > that you'd have to go with a paging approach (like Ryanne earlier
> > suggested) and that will be expensive in terms of IO. Do you want to
> > discount supporting compacted topics this early in the KIP design or
> > do you want to leave open the option of supporting them eventually? In
> > an ideal system, Kafka figures out if the topic is compacted or not
> > and for non-compacted topics it doesn't do the local copy so it goes
> > through a fast path.
> >
> > - why do we need per topic remote retention time and bytes? Why isn't
> > per topic retention time and bytes (without the "remote" part)
> > sufficient? E.g., if I have a topic and I want retention bytes to be
> > 1TB, and I currently have 500GB local and 500GB remote, Kafka can
> > manage what segments get deleted first. This would avoid the user
> > needing to think even more about these extra configs.
> >
> > Thanks
> > Eno
> >
> >
> > On Mon, Oct 21, 2019 at 4:46 PM Harsha  wrote:
> > >
> > > Hi All,
> > >   Thanks for the initial feedback on the KIP-405.  We opened a
> > PR here
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_kafka_pull_7561=DwIFaQ=r2dcLCtU9q6n0vrtnDw9vg=g7ujYPRBvNrON18SBeCt4g=2-rwlz_JuUwIr8hPRvJlA_52N5OMAoraCKZiFt-o7ck=9Es3dwy4hpDS6Bp22FdBjS6xa9LRHiOutx_AakhxVro=
> .
> > > Please take a look and let us know if you have any questions.
> > > Since this feature is being developed by engineers from different
> > companies we would like to open a feature branch in apache kafka git. It
> > will allow us collaborate in open source community rather than in private
> > branches. Please let me know if you have any objections to opening a
> > feature branch in kafka's git repo.
> > >
> > > Thanks,
> > > Harsha
> > >
> > > On Mon, Apr 8, 2019, at 10:04 PM, Harsha wrote:
> > > > Thanks, Ron. Updating the KIP. will add answers here as well
> > > >
> > > >  1) If the cold storage technology can be cross-region, is there a
> > > >  possibility for a disaster recovery Kafka cluster to share the
> > messages in
> > > >  cold storage?  My guess is the answer is no, and messages replicated
> > to the
> > > >  D/R cluster have to be migrated to cold storage from there
> > 

Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-17 Thread Ying Zheng
gt; the broker only supports min.api.version and above. So that users can
> see a
> > > clear message and upgrade to a newer version.
> > >
> > >
> > > Thanks,
> > > Harsha
> > >
> > >
> > > On Fri, Apr 12, 2019, at 12:19 PM, Ismael Juma wrote:
> > > > Hi Ying,
> > > >
> > > > The actual reasons are important so that people can evaluate the KIP
> (and
> > > > vote). :) Thanks for providing a few more:
> > > >
> > > > (1) force users to check pointing in Kafka instead of zookeeper
> > > > (2) forbid an old go (sarama) client library which is known to have
> some
> > > > serious bugs
> > > > (3) force kafka 1.x clients with the ability to roll back if there's
> an
> > > > issue (unlike a message format upgrade)
> > > >
> > > > Relying on min.version seems like a pretty clunky way to achieve the
> > > above
> > > > list. The challenge is that it's pretty difficult to do it in a way
> that
> > > > works for clients across languages. They each add support for new
> > > protocol
> > > > versions independently (it could even happen in a bug fix release).
> So,
> > > if
> > > > you tried to block Sarama in #2, you may block Java clients too.
> > > >
> > > > For #3, it seems simplest to have a config that requires clients to
> > > support
> > > > a given message format version (or higher). For #2, it seems like
> you'd
> > > > want clients to advertise their versions. That would be useful for
> > > multiple
> > > > reasons.
> > > >
> > > > Ismael
> > > >
> > > > On Fri, Apr 12, 2019 at 8:42 PM Ying Zheng 
> > > wrote:
> > > >
> > > > > Hi Ismael,
> > > > >
> > > > > Those are just examples. I think the administrators should be able
> to
> > > block
> > > > > certain client libraries for whatever reason. Some other possible
> > > reasons
> > > > > include, force users to check pointing in Kafka instead of
> zookeeper,
> > > > > forbid an old go (sarama) client library which is known to have
> some
> > > > > serious bugs.
> > > > >
> > > > > message.downconversion.enable does not solve our problems. We are
> now
> > > > > planning to upgrade to message format V3, and force users to
> upgrade to
> > > > > Kafka 1.x clients. With the proposed min.api.version setting, in
> case
> > > of
> > > > > there is anything wrong, we can roll back the setting. If we
> upgrade
> > > the
> > > > > file format, there is no way to rollback (Kafka doesn't support
> > > downgrading
> > > > > message format).
> > > > >
> > > > > On Thu, Apr 11, 2019 at 7:05 PM Ismael Juma 
> wrote:
> > > > >
> > > > > > Hi Ying,
> > > > > >
> > > > > > It looks to me that all the examples given in the KIP can be
> handled
> > > with
> > > > > > the existing "message.downconversion.enable" config and by
> > > configuring
> > > > > the
> > > > > > message format to be the latest:
> > > > > >
> > > > > > 1. Kafka 8 / 9 / 10 consumer hangs when the message contains
> message
> > > > > header
> > > > > > > ( KAFKA-6739 - Down-conversion fails for records with headers
> > > > > RESOLVED  )
> > > > > > > 2. LZ4 is not correctly handled in Kafka 8 and Kafka 9 (
> > > KAFKA-3160 -
> > > > > > > Kafka LZ4 framing code miscalculates header checksum RESOLVED
> )
> > > > > > > 3. Performance penalty of converting message format from V3 to
> V1
> > > or V2
> > > > > > > for the old consumers (KIP-31 - Move to relative offsets in
> > > compressed
> > > > > > > message sets)
> > > > > >
> > > > > >
> > > > > > Am I missing something? Are there other examples that are not
> > > related to
> > > > > > message conversion?
> > > > > >
> > > > > > Ismael
> > > > > >
> > > > > > On Thu, Apr 11, 2019 at 11:53 PM Ying Zheng
> 
> > > > > > wrote:
> > > > > >
> > > > > > > Hi here,
> > > > > > >
> > > > > > > Please vote for
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+brokers
> > > > > > >
> > > > > > > Thank you!
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-12 Thread Ying Zheng
Hi Ismael,

Those are just examples. I think the administrators should be able to block
certain client libraries for whatever reason. Some other possible reasons
include, force users to check pointing in Kafka instead of zookeeper,
forbid an old go (sarama) client library which is known to have some
serious bugs.

message.downconversion.enable does not solve our problems. We are now
planning to upgrade to message format V3, and force users to upgrade to
Kafka 1.x clients. With the proposed min.api.version setting, in case of
there is anything wrong, we can roll back the setting. If we upgrade the
file format, there is no way to rollback (Kafka doesn't support downgrading
message format).

On Thu, Apr 11, 2019 at 7:05 PM Ismael Juma  wrote:

> Hi Ying,
>
> It looks to me that all the examples given in the KIP can be handled with
> the existing "message.downconversion.enable" config and by configuring the
> message format to be the latest:
>
> 1. Kafka 8 / 9 / 10 consumer hangs when the message contains message header
> > ( KAFKA-6739 - Down-conversion fails for records with headers RESOLVED  )
> > 2. LZ4 is not correctly handled in Kafka 8 and Kafka 9 ( KAFKA-3160 -
> > Kafka LZ4 framing code miscalculates header checksum RESOLVED  )
> > 3. Performance penalty of converting message format from V3 to V1 or V2
> > for the old consumers (KIP-31 - Move to relative offsets in compressed
> > message sets)
>
>
> Am I missing something? Are there other examples that are not related to
> message conversion?
>
> Ismael
>
> On Thu, Apr 11, 2019 at 11:53 PM Ying Zheng 
> wrote:
>
> > Hi here,
> >
> > Please vote for
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+brokers
> >
> > Thank you!
> >
>


Re: [VOTE] KIP-433: Block old clients on brokers

2019-04-11 Thread Ying Zheng
Hi Gwen,

Thank you very much for the feedback! I have updated the KIP.

"reject request" means return an UNSUPPORTED_VERSION (35) error. Client
libraries after 0.10.2 will show the message "The version of API is not
supported." Client libraries before 0.10.2 will treat this error as an
unknown server error.

On Thu, Apr 11, 2019 at 3:01 PM Gwen Shapira  wrote:

> In general, I support this proposal, but I'd like some more details on what
> "rejecting API requests" mean? Close the connections? Return some kind of
> error? Is there a way for the client to know what happened? Is there a way
> for the admin to know how many clients are rejected?
>
> As a nit, the "migration plan" part of the KIP still mentions the
> authorizer.
>
> Gwen
>
> On Thu, Apr 11, 2019 at 2:53 PM Ying Zheng  wrote:
>
> > Hi here,
> >
> > Please vote for
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+brokers
> >
> > Thank you!
> >
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter <https://twitter.com/ConfluentInc> | blog
> <http://www.confluent.io/blog>
>


[VOTE] KIP-433: Block old clients on brokers

2019-04-11 Thread Ying Zheng
Hi here,

Please vote for
https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Block+old+clients+on+brokers

Thank you!


Re: [DISCUSS] KIP-433: Provide client API version to authorizer

2019-04-11 Thread Ying Zheng
@Colin, Thank you for the feedback!

I have updated the KIP, added the explanation of why we use API version
rather than Kafka version.

I will start a vote for this KIP


On Fri, Mar 29, 2019 at 9:47 AM Colin McCabe  wrote:

> Hi Ying,
>
> That's a fair point.  Maybe using API keys directly is reasonable here.
>
> One thing that's probably worth calling out is that if we make the name
> part of the configuration, we can't rename APIs in the future.  That's
> probably OK as long as it's documented.
>
> best,
> Colin
>
> On Thu, Mar 28, 2019, at 17:36, Ying Zheng wrote:
> > @Colin McCabe 
> >
> > I did think about that option. Yes, for most users, it's much easier to
> > understand Kafka version, rather than API version. However, the existing
> > Kafka clients only report API versions in the Kafka requests. So, the
> > brokers can only block clients by API versions rather than the real Kafka
> > versions. This can be very confusing for the users, if an API did not
> > change for a specified Kafka version.
> >
> > For example, a user sets the min Kafka version of produce request to
> Kafka
> > 1.1. She would expect the broker will reject Kafka 1.0 producers.
> However,
> > both Kafka 1.1 and Kafka 1.0 are using api version 5. The broker can't
> > distinguish the 2 version. So, Kafka 1.0 producers are still allowed.
> >
> > I think we can say this configuration is only for "advanced users". The
> > user has to know the concept of "api version", and know the function of
> > each API, to be able to use this feature. (Similarly, it's easier for the
> > users to say: "I want to block old consumers, or old admin clients.". But
> > there is no clear definition of which set of APIs are "consumer APIs".
> So,
> > we still have to use API names, rather than "client roles")
> >
> >
> >
> > On Wed, Mar 27, 2019 at 5:32 PM Colin McCabe  wrote:
> >
> > > Thanks, Ying Zheng.  Looks good overall.
> > >
> > > One question is, should the version be specified as a Kafka version
> rather
> > > than as a RPC API version?  I don't think most users are aware of RPC
> > > versions, but something like "min kafka version" would be easier to
> > > understand.  That is how we handle the minimum inter-broker protocol
> > > version and the minimum on-disk format version, after all.
> > >
> > > best,
> > > Colin
> > >
> > > On Tue, Mar 26, 2019, at 17:52, Ying Zheng wrote:
> > > > I have rewritten the KIP. The new proposal is adding a new
> configuration
> > > > min.api.version in Kafka broker.
> > > >
> > > > Please review the new KIP. Thank you!
> > > >
> > > > On Fri, Mar 1, 2019 at 11:06 AM Colin McCabe 
> wrote:
> > > >
> > > > > On Wed, Feb 27, 2019, at 15:53, Harsha wrote:
> > > > > > HI Colin,
> > > > > > Overlooked the IDEMPOTENT_WRITE ACL. This along with
> > > > > > client.min.version should solve the cases proposed in the KIP.
> > > > > > Can we turn this KIP into adding min.client.version config to
> broker
> > > > > > and it could be part of the dynamic config .
> > > > >
> > > > > +1, sounds like a good idea.
> > > > >
> > > > > Colin
> > > > >
> > > > >
> > > > > >
> > > > > > Thanks,
> > > > > > Harsha
> > > > > >
> > > > > > On Wed, Feb 27, 2019, at 12:17 PM, Colin McCabe wrote:
> > > > > > > On Tue, Feb 26, 2019, at 16:33, Harsha wrote:
> > > > > > > > Hi Colin,
> > > > > > > >
> > > > > > > > "> I think Ismael and Gwen here bring up a good point.  The
> > > version
> > > > > of the
> > > > > > > > > request is a technical detail that isn't really related to
> > > > > > > > > authorization.  There are a lot of other technical details
> like
> > > > > this
> > > > > > > > > like the size of the request, the protocol it came in on,
> etc.
> > > > > None of
> > > > > > > > > them are passed to the authorizer-- they all have
> configuration
> > > > > knobs
> > > > > > > > > to control how we handle them.  If we add this technical
> > > detail,
> > > >

Re: [DISCUSS] KIP-433: Provide client API version to authorizer

2019-03-28 Thread Ying Zheng
@Colin McCabe 

I did think about that option. Yes, for most users, it's much easier to
understand Kafka version, rather than API version. However, the existing
Kafka clients only report API versions in the Kafka requests. So, the
brokers can only block clients by API versions rather than the real Kafka
versions. This can be very confusing for the users, if an API did not
change for a specified Kafka version.

For example, a user sets the min Kafka version of produce request to Kafka
1.1. She would expect the broker will reject Kafka 1.0 producers. However,
both Kafka 1.1 and Kafka 1.0 are using api version 5. The broker can't
distinguish the 2 version. So, Kafka 1.0 producers are still allowed.

I think we can say this configuration is only for "advanced users". The
user has to know the concept of "api version", and know the function of
each API, to be able to use this feature. (Similarly, it's easier for the
users to say: "I want to block old consumers, or old admin clients.". But
there is no clear definition of which set of APIs are "consumer APIs". So,
we still have to use API names, rather than "client roles")



On Wed, Mar 27, 2019 at 5:32 PM Colin McCabe  wrote:

> Thanks, Ying Zheng.  Looks good overall.
>
> One question is, should the version be specified as a Kafka version rather
> than as a RPC API version?  I don't think most users are aware of RPC
> versions, but something like "min kafka version" would be easier to
> understand.  That is how we handle the minimum inter-broker protocol
> version and the minimum on-disk format version, after all.
>
> best,
> Colin
>
> On Tue, Mar 26, 2019, at 17:52, Ying Zheng wrote:
> > I have rewritten the KIP. The new proposal is adding a new configuration
> > min.api.version in Kafka broker.
> >
> > Please review the new KIP. Thank you!
> >
> > On Fri, Mar 1, 2019 at 11:06 AM Colin McCabe  wrote:
> >
> > > On Wed, Feb 27, 2019, at 15:53, Harsha wrote:
> > > > HI Colin,
> > > > Overlooked the IDEMPOTENT_WRITE ACL. This along with
> > > > client.min.version should solve the cases proposed in the KIP.
> > > > Can we turn this KIP into adding min.client.version config to broker
> > > > and it could be part of the dynamic config .
> > >
> > > +1, sounds like a good idea.
> > >
> > > Colin
> > >
> > >
> > > >
> > > > Thanks,
> > > > Harsha
> > > >
> > > > On Wed, Feb 27, 2019, at 12:17 PM, Colin McCabe wrote:
> > > > > On Tue, Feb 26, 2019, at 16:33, Harsha wrote:
> > > > > > Hi Colin,
> > > > > >
> > > > > > "> I think Ismael and Gwen here bring up a good point.  The
> version
> > > of the
> > > > > > > request is a technical detail that isn't really related to
> > > > > > > authorization.  There are a lot of other technical details like
> > > this
> > > > > > > like the size of the request, the protocol it came in on, etc.
> > > None of
> > > > > > > them are passed to the authorizer-- they all have configuration
> > > knobs
> > > > > > > to control how we handle them.  If we add this technical
> detail,
> > > > > > > logically we'll have to start adding all the others, and the
> > > authorizer
> > > > > > > API will get really bloated.  It's better to keep it focused on
> > > > > > > authorization, I think."
> > > > > >
> > > > > > probably my previous email is not clear but I am agreeing with
> > > Gwen's point.
> > > > > > I am not in favor of extending authorizer to support this.
> > > > > >
> > > > > >
> > > > > > "> Another thing to consider is that if we add a new broker
> > > configuration
> > > > > > > that lets us set a minimum client version which is allowed,
> that
> > > could
> > > > > > > be useful to other users as well.  On the other hand, most
> users
> > > are
> > > > > > > not likely to write a custom authorizer to try to take
> advantage
> > > of
> > > > > > > version information being passed to the authorizer.  So, I
> think
> > > using> a configuration is clearly the better way to go here.  Perhaps
> it
> > > can
> > > > > > > be a KIP-226 dynamic configuration to make this easier to
> deploy?"
> > > &

Re: [DISCUSS] KIP-433: Provide client API version to authorizer

2019-03-26 Thread Ying Zheng
gt; API will get really bloated.  It's better to keep it focused on
> > > > > authorization, I think.
> > > > >
> > > > > Another thing to consider is that if we add a new broker
> configuration
> > > > > that lets us set a minimum client version which is allowed, that
> could
> > > > > be useful to other users as well.  On the other hand, most users
> are
> > > > > not likely to write a custom authorizer to try to take advantage
> of
> > > > > version information being passed to the authorizer.  So, I think
> using
> > > > > a configuration is clearly the better way to go here.  Perhaps it
> can
> > > > > be a KIP-226 dynamic configuration to make this easier to deploy?
> > > > >
> > > > > cheers,
> > > > > Colin
> > > > >
> > > > >
> > > > > On Mon, Feb 25, 2019, at 15:43, Harsha wrote:
> > > > > > Hi Ying,
> > > > > > I think the question is can we add a module in the core
> which
> > > > > > can take up the dynamic config and does a block certain APIs.
> This
> > > > > > module will be called in each of the APIs like the authorizer
> does
> > > > > > today to check if the API is supported for the client.
> > > > > > Instead of throwing AuthorizationException like the authorizer
> does
> > > > > > today it can throw UnsupportedException.
> > > > > > Benefits are,  we are keeping the authorizer interface as is and
> adding
> > > > > > the flexibility based on dynamic configs without the need for
> > > > > > categorizing broker APIs and it will be easy to extend to do
> additional
> > > > > > options,  like turning off certain features which might be in
> interest
> > > > > > to the service providers.
> > > > > > One drawback,  It will introduce another call to check instead
> of
> > > > > > centralizing everything around Authorizer.
> > > > > >
> > > > > > Thanks,
> > > > > > Harsha
> > > > > >
> > > > > > On Mon, Feb 25, 2019, at 2:43 PM, Ying Zheng wrote:
> > > > > > > If you guys don't like the extension of authorizer interface,
> I will just
> > > > > > > propose a single broker dynamic configuration:
> client.min.api.version, to
> > > > > > > keep things simple.
> > > > > > >
> > > > > > > What do you think?
> > > > > > >
> > > > > > > On Mon, Feb 25, 2019 at 2:23 PM Ying Zheng 
> wrote:
> > > > > > >
> > > > > > > > @Viktor Somogyi-Vass, @Harsha, It seems the biggest concern
> is the
> > > > > > > > backward-compatibility to the existing authorizers. We can
> put the new
> > > > > > > > method into a new trait / interface:
> > > > > > > > trait AuthorizerEx extends Authorizer {
> > > > > > > >def authorize(session: Session, operation: Operation,
> resource: Resource,
> > > > > > > > apiVersion: Short): Boolean
> > > > > > > > }
> > > > > > > >
> > > > > > > > When loading an authorizer class, broker will check if the
> class
> > > > > > > > implemented AuthorizerEx interface. If not, broker will
> wrapper the
> > > > > > > > Authorizer object with an Adapter class, in which
> authorizer(...
> > > > > > > > apiVersion) call is translated to the old authorizer() call.
> So that, both
> > > > > > > > old and new Authorizer is supported and can be treated as
> AuthorizerEx in
> > > > > > > > the new broker code.
> > > > > > > >
> > > > > > > > As for the broker dynamic configuration approach, I'm not
> sure how to
> > > > > > > > correctly categorize the 40+ broker APIs into a few
> categories.
> > > > > > > > For example, describe is used by producer, consumer, and
> admin. Should it
> > > > > > > > be controlled by producer.min.api.version or
> consumer.min.api.version?
> > > > > > > > Should producer.min.api.version apply to transaction
> operations?
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 25, 2019 at 10:33 AM Harsha 
> wrote:
> > > > > > > >
> > > > > > > >> I think the motivation of the KIP is to configure which API
> we want to
> > > > > > > >> allow for a broker.
> > > > > > > >> This is challenging for a hosted service where you have
> customers with
> > > > > > > >> different versions of clients.
> > > > > > > >> It's not just about down conversion but for example
> transactions, there
> > > > > > > >> is a case where we do not want to allow users to start
> using transactions
> > > > > > > >> and there is no way to disable to this right now and as
> specified in the
> > > > > > > >> KIP, having a lock on which client versions we support.
> > > > > > > >> Authorizer's original purpose is to allow policies to be
> enforced for
> > > > > > > >> each of the Kafka APIs, specifically in the context of
> security.
> > > > > > > >> Extending this to a general purpose gatekeeper might not be
> suitable and
> > > > > > > >> as mentioned in the thread every implementation of
> authorizer needs to
> > > > > > > >> re-implement to provide the same set of functionality.
> > > > > > > >> I think it's better to add an implementation which will use
> a broker's
> > > > > > > >> dynamic config as mentioned in approach 1.
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >> Harsha
> > > > > > > >>
> > > > > > > >> On Sat, Feb 23, 2019, at 6:21 AM, Ismael Juma wrote:
> > > > > > > >> > Thanks for the KIP. Have we considered the existing topic
> config that
> > > > > > > >> makes
> > > > > > > >> > it possible to disallow down conversions? That's the
> biggest downside in
> > > > > > > >> > allowing older clients.
> > > > > > > >> >
> > > > > > > >> > Ismael
> > > > > > > >> >
> > > > > > > >> > On Fri, Feb 22, 2019, 2:11 PM Ying Zheng
> 
> > > > > > > >> wrote:
> > > > > > > >> >
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-433: Provide client API version to authorizer

2019-02-25 Thread Ying Zheng
If you guys don't like the extension of authorizer interface, I will just
propose a single broker dynamic configuration: client.min.api.version, to
keep things simple.

What do you think?

On Mon, Feb 25, 2019 at 2:23 PM Ying Zheng  wrote:

> @Viktor Somogyi-Vass, @Harsha, It seems the biggest concern is the
> backward-compatibility to the existing authorizers. We can put the new
> method into a new trait / interface:
> trait AuthorizerEx extends Authorizer {
>def authorize(session: Session, operation: Operation, resource: Resource,
> apiVersion: Short): Boolean
> }
>
> When loading an authorizer class, broker will check if the class
> implemented AuthorizerEx interface. If not, broker will wrapper the
> Authorizer object with an Adapter class, in which authorizer(...
> apiVersion) call is translated to the old authorizer() call. So that, both
> old and new Authorizer is supported and can be treated as AuthorizerEx in
> the new broker code.
>
> As for the broker dynamic configuration approach, I'm not sure how to
> correctly categorize the 40+ broker APIs into a few categories.
> For example, describe is used by producer, consumer, and admin. Should it
> be controlled by producer.min.api.version or consumer.min.api.version?
> Should producer.min.api.version apply to transaction operations?
>
>
> On Mon, Feb 25, 2019 at 10:33 AM Harsha  wrote:
>
>> I think the motivation of the KIP is to configure which API we want to
>> allow for a broker.
>> This is challenging for a hosted service where you have customers with
>> different versions of clients.
>> It's not just about down conversion but for example transactions, there
>> is a case where we do not want to allow users to start using transactions
>> and there is no way to disable to this right now and as specified in the
>> KIP, having a lock on which client versions we support.
>> Authorizer's original purpose is to allow policies to be enforced for
>> each of the Kafka APIs, specifically in the context of security.
>> Extending this to a general purpose gatekeeper might not be suitable and
>> as mentioned in the thread every implementation of authorizer needs to
>> re-implement to provide the same set of functionality.
>> I think it's better to add an implementation which will use a broker's
>> dynamic config as mentioned in approach 1.
>>
>> Thanks,
>> Harsha
>>
>> On Sat, Feb 23, 2019, at 6:21 AM, Ismael Juma wrote:
>> > Thanks for the KIP. Have we considered the existing topic config that
>> makes
>> > it possible to disallow down conversions? That's the biggest downside in
>> > allowing older clients.
>> >
>> > Ismael
>> >
>> > On Fri, Feb 22, 2019, 2:11 PM Ying Zheng 
>> wrote:
>> >
>> > >
>> > >
>> >
>>
>


Re: [DISCUSS] KIP-433: Provide client API version to authorizer

2019-02-25 Thread Ying Zheng
@Viktor Somogyi-Vass, @Harsha, It seems the biggest concern is the
backward-compatibility to the existing authorizers. We can put the new
method into a new trait / interface:
trait AuthorizerEx extends Authorizer {
   def authorize(session: Session, operation: Operation, resource: Resource,
apiVersion: Short): Boolean
}

When loading an authorizer class, broker will check if the class
implemented AuthorizerEx interface. If not, broker will wrapper the
Authorizer object with an Adapter class, in which authorizer(...
apiVersion) call is translated to the old authorizer() call. So that, both
old and new Authorizer is supported and can be treated as AuthorizerEx in
the new broker code.

As for the broker dynamic configuration approach, I'm not sure how to
correctly categorize the 40+ broker APIs into a few categories.
For example, describe is used by producer, consumer, and admin. Should it
be controlled by producer.min.api.version or consumer.min.api.version?
Should producer.min.api.version apply to transaction operations?


On Mon, Feb 25, 2019 at 10:33 AM Harsha  wrote:

> I think the motivation of the KIP is to configure which API we want to
> allow for a broker.
> This is challenging for a hosted service where you have customers with
> different versions of clients.
> It's not just about down conversion but for example transactions, there is
> a case where we do not want to allow users to start using transactions and
> there is no way to disable to this right now and as specified in the KIP,
> having a lock on which client versions we support.
> Authorizer's original purpose is to allow policies to be enforced for each
> of the Kafka APIs, specifically in the context of security.
> Extending this to a general purpose gatekeeper might not be suitable and
> as mentioned in the thread every implementation of authorizer needs to
> re-implement to provide the same set of functionality.
> I think it's better to add an implementation which will use a broker's
> dynamic config as mentioned in approach 1.
>
> Thanks,
> Harsha
>
> On Sat, Feb 23, 2019, at 6:21 AM, Ismael Juma wrote:
> > Thanks for the KIP. Have we considered the existing topic config that
> makes
> > it possible to disallow down conversions? That's the biggest downside in
> > allowing older clients.
> >
> > Ismael
> >
> > On Fri, Feb 22, 2019, 2:11 PM Ying Zheng  wrote:
> >
> > >
> > >
> >
>


Re: [DISCUSS] KIP-433: Provide client API version to authorizer

2019-02-22 Thread Ying Zheng
Hi Gwen,

Thank you for the quick feedback!

It's a good point that broker configuration can be dynamic and is more
convenient. Technically, anything inside the authorizer can also be
dynamic. For example, the SimpleAclAuthorizer in Kafka stores ACLs in
Zookeeper, which can be dynamically changed with CLI.





On Fri, Feb 22, 2019 at 2:41 PM Gwen Shapira  wrote:

> Link, for convenience:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-433%3A+Provide+client+API+version+to+authorizer
>
> I actually prefer the first rejected alternative (add a
> configuration). While you are right that configuration is inherently
> less flexible, putting the logic in the authorizer means that an admin
> that wants to limit the allowed client API versions has to implement
> an authorizer. This is more challenging than changing a config (and
> AFAIK, can't be done dynamically - configs can be dynamic and the
> admin can avoid a restart).
>
> Would be interested to hear what others think.
>
> Gwen
>
> On Fri, Feb 22, 2019 at 2:11 PM Ying Zheng  wrote:
> >
> >
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


[DISCUSS] KIP-433: Provide client API version to authorizer

2019-02-22 Thread Ying Zheng



[jira] [Created] (KAFKA-7975) Provide client API version to authorizer

2019-02-21 Thread Ying Zheng (JIRA)
Ying Zheng created KAFKA-7975:
-

 Summary: Provide client API version to authorizer
 Key: KAFKA-7975
 URL: https://issues.apache.org/jira/browse/KAFKA-7975
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Ying Zheng
Assignee: Ying Zheng






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7142) Rebalancing large consumer group can block the coordinator broker for several seconds

2018-07-09 Thread Ying Zheng (JIRA)
Ying Zheng created KAFKA-7142:
-

 Summary: Rebalancing large consumer group can block the 
coordinator broker for several seconds
 Key: KAFKA-7142
 URL: https://issues.apache.org/jira/browse/KAFKA-7142
 Project: Kafka
  Issue Type: Improvement
Reporter: Ying Zheng






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6432) Lookup indices may cause unnecessary page fault

2018-01-07 Thread Ying Zheng (JIRA)
Ying Zheng created KAFKA-6432:
-

 Summary: Lookup indices may cause unnecessary page fault
 Key: KAFKA-6432
 URL: https://issues.apache.org/jira/browse/KAFKA-6432
 Project: Kafka
  Issue Type: Improvement
  Components: core, log
Reporter: Ying Zheng
 Attachments: Binary Search - Diagram 1.png, Binary Search - Diagram 
2.png

For each topic-partition, Kafka broker maintains two indices: one for message 
offset, one for message timestamp. By default, a new index entry is appended to 
each index for every 4KB messages. The lookup of the indices is a simple binary 
search. The indices are mmaped files, and cached by Linux page cache.

Both consumer fetch and follower fetch have to do an offset lookup, before 
accessing the actual message data. The simple binary search algorithm used for 
looking up the index is not cache friendly, and may cause page faults even on 
high QPS topic-partitions.

For example (diagram 1), when looking up an index entry in page 12, the binary 
search algorithm has to read page 0, 6, 9 and 11. After new messages are 
appended to the topic-partition, the index grows to 13 pages. Now, if the 
follower fetch request looking up the 1st index entry of page 13, the binary 
search algorithm will go to page 0, 7, 10 and 12. Among those pages, page 7 and 
10 have not been used for a long time, and may already be swapped to hard disk.

Actually, in a normal Kafka broker, all the follower fetch requests and most 
consumer fetch requests should only look up the last few entries of the index. 
We can make the index lookup more cache friendly, by searching in the last one 
or two pages of the index first. (Diagram 2)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6431) Lock contention in Purgatory

2018-01-07 Thread Ying Zheng (JIRA)
Ying Zheng created KAFKA-6431:
-

 Summary: Lock contention in Purgatory
 Key: KAFKA-6431
 URL: https://issues.apache.org/jira/browse/KAFKA-6431
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Ying Zheng
Priority: Minor


Purgatory is the data structure in Kafka broker that manages delayed 
operations. There is a ConcurrentHashMap (Kafka Pool) maps each operation key 
to the operations (in a ConcurrentLinkedQueue) that are interested in the key.

When an operation is done or expired, it's removed from the list 
(ConcurrentLinkedQueue). When the list is empty, it's removed from the 
ConcurrentHashMap. The 2nd operation has to be protected by a lock, to avoid 
adding new operations into a list that is being removed. This is currently done 
by a globally shared ReentrantReadWriteLock. All the read operations on 
purgatory have to acquire the read permission of this lock. The list removing 
operations needs the write permission of this lock.

Our profiling result shows that Kafka broker is spending a nontrivial time on 
this read write lock.

The problem is exacerbated when there are a large amount of short operations. 
For example, when we are doing sync produce operations (acks=all), a 
DelayedProduce operation is added and then removed for each message. If the QPS 
of the topic is not high, it's very likely that, when the operation is done and 
removed, the list of that key (topic partitions) becomes empty, and has to be 
removed when holding the write lock. This operation blocks all the read / write 
operations on purgatory for awhile. As there are tens of IO threads accessing 
purgatory concurrently, this shared lock can easily become a bottleneck. 

Actually, we only want to avoid concurrent read / write on the same key. The 
operations on different keys do not conflict with each other.

I suggest to shard purgatory into smaller partitions, and lock each individual 
partition independently.

Assuming there are 10 io threads actively accessing purgatory, sharding 
purgatory into 512 partitions will make the probability for 2 threads accessing 
the same partition at the same time to about 2%. We also can use ReentrantLock 
instead of ReentrantReadWriteLock. When the read operations are not much more 
than write operations, ReentrantLock has lower overhead than 
ReentrantReadWriteLock.







--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6430) Improve Kafka GZip compression performance

2018-01-07 Thread Ying Zheng (JIRA)
Ying Zheng created KAFKA-6430:
-

 Summary: Improve Kafka GZip compression performance
 Key: KAFKA-6430
 URL: https://issues.apache.org/jira/browse/KAFKA-6430
 Project: Kafka
  Issue Type: Improvement
  Components: clients, core
Reporter: Ying Zheng
Priority: Minor


To compress messages, Kafka uses DataOutputStream on top of GZIPOutputStream:
new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
To decompress messages, Kafka uses DataInputStream on top of GZIPInputStream:
   new DataInputStream(new GZIPInputStream(buffer));
This is very straight forward, but actually inefficient. For each message, in 
addition to the key and value data, Kafka has to write about 30 some metadata 
bytes (slightly varies in different Kafka version), including magic byte, 
checksum, timestamp, offset, key length, value length etc. For each of these 
bytes, java DataOutputStream has to call write(byte) once. Here is the awkward 
writeInt() method in DataInputStream, which writes 4 bytes separately in 
big-endian order. 
public final void writeInt(int v) throws IOException {
out.write((v >>> 24) & 0xFF);
out.write((v >>> 16) & 0xFF);
out.write((v >>>  8) & 0xFF);
out.write((v >>>  0) & 0xFF);
incCount(4);
}

Unfortunately, GZIPOutputStream does not implement the write(byte) method. 
Instead, it only provides a write(byte[], offset, len) method, which calls the 
corresponding JNI zlib function. The write(byte) calls from DataOutputStream 
are translated into write(byte[], offset, len) calls in a very inefficient way: 
(Oracle JDK 1.8 code)
class DeflaterOutputStream {
public void write(int b) throws IOException {
byte[] buf = new byte[1];
buf[0] = (byte)(b & 0xff);
write(buf, 0, 1);
}

public void write(byte[] b, int off, int len) throws IOException {
if (def.finished()) {
throw new IOException("write beyond end of stream");
}
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
if (!def.finished()) {
def.setInput(b, off, len);
while (!def.needsInput()) {
deflate();
}
}
}
}

class GZIPOutputStream extends DeflaterOutputStream {
public synchronized void write(byte[] buf, int off, int len)
throws IOException
{
super.write(buf, off, len);
crc.update(buf, off, len);
}
}

class Deflater {
private native int deflateBytes(long addr, byte[] b, int off, int len, int 
flush);
}

class CRC32 {
public void update(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
crc = updateBytes(crc, b, off, len);
}

private native static int updateBytes(int crc, byte[] b, int off, int len);
}
For each meta data byte, the code above has to allocate 1 single byte array, 
acquire several locks, call two native JNI methods (Deflater.deflateBytes and 
CRC32.updateBytes). In each Kafka message, there are about 30 some meta data 
bytes.

The call stack of Deflater.deflateBytes():
DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] 
buf, int off, int len) -> DeflaterOutputStream.write(byte[] b, int off, int 
len) -> DeflaterOutputStream.deflate() -> Deflater.deflate(byte[] b, int off, 
int len) -> Deflater.deflate(byte[] b, int off, int len, int flush) -> 
Deflater.deflateBytes(long addr, byte[] b, int off, int len, int flush)

The call stack of CRC32.updateBytes():
DeflaterOutputStream.public void write(int b) -> GZIPOutputStream.write(byte[] 
buf, int off, int len) -> CRC32.update(byte[] b, int off, int len) -> 
CRC32.updateBytes(int crc, byte[] b, int off, int len)

At Uber, we found that adding a small buffer between DataOutputStream and 
GZIPOutputStream can speed up Kafka GZip compression speed by about 60% in 
average.
 -return new DataOutputStream(new GZIPOutputStream(buffer, 
bufferSize));
+return new DataOutputStream(new BufferedOutputStream(new 
GZIPOutputStream(buffer, bufferSize), 1 << 14));

The similar fix also applies to GZip decompression.

Here is the test result using the production traffic at Uber:
|| Topic || Avg Message Size (bytes) || Vanilla Kafka Throughput (MB/s) || 
Kafka /w GZip Buffer Throughput (MB/s) || Speed Up||
| topic 1 | 197 | 10.9 | 21.9 | 2.0 |
| topic 2 | 208 | 8.5 | 15.9 | 1.9 |
| topic 3 | 624 | 15.3 | 20.2 | 1.3 |
| topic 4 | 766 | 28.0 | 43.7 | 1.6