[jira] [Created] (KAFKA-15169) Add a test to make sure the remote index file is overwritten for the earlier existing(corrupted) files.

2023-07-09 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-15169:
--

 Summary: Add a test to make sure the remote index file is 
overwritten for the earlier existing(corrupted) files.
 Key: KAFKA-15169
 URL: https://issues.apache.org/jira/browse/KAFKA-15169
 Project: Kafka
  Issue Type: Test
Reporter: Satish Duggana






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-932: Queues for Kafka

2023-07-09 Thread Matthias J. Sax

Daniel, sure.

To allow the client to filter aborted messages, the broker currently 
attaches metadata that tell the client which records were aborted. But 
the first message after the LSO is a messages in pending state, ie, it 
was neither committed nor aborted yet, so it's not possible to filter or 
deliver it. Thus, the broker cannot provide this metadata (not sure if 
the client could filter without this metadata?)


The main reason why this happens broker side is to avoid that the client 
needs to buffer pending messages "indefinitely" until the TX might 
eventually commit or abort, and thus put a lot a memory pressure on the 
client. For the "classic" case, the situation is  more severe as we 
guarantee ordered delivery, and thus, the client would need to buffer 
everything after the LSO. -- While it's relaxed for queuing as we might 
not guarantee order (ie, instead of buffering everything, only pending 
messages must be buffered), it would still imply a huge additional 
burden on tracking metadata (for both the broker and the consumer), and 
the wire protocol, and I am already worried about the metadata we might 
need to track for queuing in general.


Does this make sense?


-Matthias



On 7/7/23 01:35, Dániel Urbán wrote:

Hi Matthias,
Can you please elaborate on this: "First, you need to understand that
aborted records are filtered client side, and thus for "read-committed" we
can never read beyond the LSO, and the same seems to apply for queuing."
I don't understand the connection here - what does skipping aborted records
have to do with the LSO? As you said, aborted message filtering is done on
the client side (in consumers, yes, but not sure if it has to be the same
for queues), but being blocked on the LSO is the responsibility of the
broker, isn't it? My thought was that the broker could act differently when
working with queues and read_committed isolation.
Thanks,
Daniel

On Thu, Jul 6, 2023 at 7:26 PM Matthias J. Sax  wrote:


Thanks for the KIP.

It seems we are in very early stage, and some very important sections in
the KIP are still marked as TODO. In particular, I am curious about the
protocol changes, how the "queuing state" will be represented and made
durable, and all the error edge case / fail-over / fencing
(broker/clients) that we need to put in place.


A few other comments/question from my side:

(1) Fetch from follower: this was already touched on, but the point is
really that the consumer does not decide about it, but the broker does.
When a consumer sends it's first fetch request it will always go to the
leader, and the broker would reply to the consumer "go and fetch from
this other broker". -- I think it's ok to exclude fetch from follower in
the first version of the KIP, but it would need a broker change such
that the broker knows it's a "queue fetch" request. -- It would also be
worth to explore how fetch from follow could work in the future and
ensure that our initial design allows for it and is future proof.


(2) Why do we not allow pattern subscription and what happens if
different consumers subscribe to different topics? It's not fully
explained in the KIP.


(3) auto.offset.reset and SPSO/SPSE -- I don't understand why we would
not allow auto.offset.reset? In the discussion, you mentioned that
"first consumer would win, if two consumers have a different config" --
while this is correct, it's the same for a consumer group right now.
Maybe we should not try to solve a "non problem"? -- In general, my
impression is that we are going to do Kafkaeque Queuing, what is fine,
but it might be to our advantage to carry over as many established
concepts as we can? And if not, have a very good reason not to.

In the end, it find if very clumsy to only have an admin API to change
the starting point of a consumer.

(3B) What happens if lag grows and data is purged broker side?

(3C) What happens if the broker released records (based on "timeout /
exceeding deliver count), and the "ack/reject" comes afterwards?

(3D) How to find out what records got archived but where not acked (ie,
lost) for re-processing/debugging purpose? The question was already
asked and the answer was "not supported", but I think it would be
must-have before the feature is usable in production? We can of course
also only do it in a future release and not the first "MVP"
implementation, but the KIP should address it. In the end, the overall
group monitoring story is missing.


(4) I am also wondering about the overall design with regard to "per
record" vs "per batch" granularity. In the end, queuing usually aims for
"per records" semantics, but "per record" implies to keep track of a lot
of metadata. Kafka is designed on a "per batch" granularity, and it's
unclear to me how both will go together?

(4A) Do we keep "ack/reject/..." state per-record, or per batch? It
seems per record, but it would require to hold a lot of meta-data. Also,
how does it work for the current protocol, is a batch is partially