[ 
https://issues.apache.org/jira/browse/KAFKA-15776?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17809592#comment-17809592
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-15776:
------------------------------------------------------

Agree with [~fvisconte] that tweaking an existing config on the consumer side 
it's undesired given that Tiered Storage aims to be transparent to clients.

An additional issue even when caching fetch requests is that remote fetch 
doesn't only fetch the log segment but potentially also the offset index. 
Considering that RemoteIndexCache is a sync cache, interrupted fetches are not 
cached and potentially block consumer's progress. This has [pushed 
us|https://github.com/Aiven-Open/tiered-storage-for-apache-kafka/pull/472] to 
build an additional async cache for indexes as a workaround.

 

Some additional thoughts on how to approach this issue:

On not interrupting the thread:

This would help on removing the flooding exceptions; but will have the effect 
of pilling up threads by potentially having more than one thread per consumer 
fetching a partition caused by retries, potentially running out of threads on 
the reader thread pool (default size = 10), causing other issues.

By the way, I can see that delayed remote fetch operation has a fixed purge 
interval of 1000 without config. Should we have a config for this one? Or 
because there is a thread pool size, there there is no need to have this 
configuration?

 

On the timeout configuration semantics:

Based on [https://github.com/apache/kafka/pull/14778#issuecomment-1820588080]

We should update and make the expectations about `fetch.max.wait.ms` explicit 
on our docs, that it should only apply to data available on local log and if 
topics are tiered, then larger latencies may apply.

We could also consider adding a new exception type for interrupted remote fetch 
operations; this way we can use it on RLM to consider the proper logging level. 
We would need to document RSM interface and request implementations to report 
interrupted exceptions properly.

 

On the remote fetch timeout configuration:

An additional configuration seems certainly needed to redefine this delayed 
operation. But only having a different configuration with a larger default 
value would help to a certain point.

Instead of a fixed timeout config, we could consider having a backoff configs 
to set the boundaries on when to start interrupting remote fetches while 
bumping (e.g. +100ms) these timeouts until an upper bound where failures will 
start to be reported to consumers. This way operators can have better 
configurations to tune. e.g. something around a lower bound of 2 seconds to 
start interrupting remote fetches and 10sec to start failing consumer requests.

Having both, these configs and a new exception type, can enable to have a 
proper handling of these exceptions by reporting them as e.g. WARN/DEBUG level 
when bellow max timeout, and fail consumer requests and log as WARN/ERROR when 
hitting upper bound.

Also, as degradation happens between broker and remote storage, this 
configuration should not be a consumer one – as consumers can't have all the 
context on how to update these values. Instead, these configurations can be on 
the broker side for operators to set them. 

 

cc [~showuon] [~satishd] 

> Update delay timeout for DelayedRemoteFetch request
> ---------------------------------------------------
>
>                 Key: KAFKA-15776
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15776
>             Project: Kafka
>          Issue Type: Task
>            Reporter: Kamal Chandraprakash
>            Assignee: Kamal Chandraprakash
>            Priority: Major
>
> We are reusing the {{fetch.max.wait.ms}} config as a delay timeout for 
> DelayedRemoteFetchPurgatory. {{fetch.max.wait.ms}} purpose is to wait for the 
> given amount of time when there is no data available to serve the FETCH 
> request.
> {code:java}
> The maximum amount of time the server will block before answering the fetch 
> request if there isn't sufficient data to immediately satisfy the requirement 
> given by fetch.min.bytes.
> {code}
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala#L41]
> Using the same timeout in the DelayedRemoteFetchPurgatory can confuse the 
> user on how to configure optimal value for each purpose. Moreover, the config 
> is of *LOW* importance and most of the users won't configure it and use the 
> default value of 500 ms.
> Having the delay timeout of 500 ms in DelayedRemoteFetchPurgatory can lead to 
> higher number of expired delayed remote fetch requests when the remote 
> storage have any degradation.
> We should introduce one {{fetch.remote.max.wait.ms}} config (preferably 
> server config) to define the delay timeout for DelayedRemoteFetch requests 
> (or) take it from client similar to {{request.timeout.ms}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to