Re: Question about KRaft
For questions related to confluent, I think you'd better ask in their channel. Luke On Fri, Mar 10, 2023 at 12:54 PM sunil chaudhari < sunilmchaudhar...@gmail.com> wrote: > Hi Luke, > This docu is good. > Does it apply for confluent as well? > > > > On Fri, 10 Mar 2023 at 8:47 AM, Luke Chen wrote: > > > Hi Zhenyu, > > > > Answering your question: > > > > > Should I simply > > 1. download 3.4 binary > > 2. stop ZK & Kafka service > > 3. upgrade Kafka to 3.4 > > 4. start only Kafka service with KRaft server.properties > > > > That is not migrating, actually. That is just creating another kafka > > cluster in KRaft mode. > > The point for migration is to move metadata in ZK into KRaft controllers. > > You can follow the guide here to do migration: > > https://kafka.apache.org/documentation/#kraft_zk_migration > > > > Thank you. > > Luke > > > > On Tue, Mar 7, 2023 at 11:07 PM Zhenyu Wang > > wrote: > > > > > Hi Sunil, > > > > > > As mentioned earlier in my question, I have only one "combined" node as > > > both controller and broker, and I totally accept downtime (stop > service) > > > > > > So just want to ask for my case, single node, if I want to upgrade to > 3.4 > > > then start service under KRaft (get rid of ZK), what would be the > steps? > > > > > > Thanks~ > > > > > > On Mon, Mar 6, 2023 at 11:49 PM sunil chaudhari < > > > sunilmchaudhar...@gmail.com> > > > wrote: > > > > > > > How will you achieve zero downtime of you stop zookeeper and kafka? > > > > There must be some standard steps so that stop zookeeper one by one > and > > > > start kraft same time so that it will be migrated gradually. > > > > > > > > > > > > > > > > On Tue, 7 Mar 2023 at 9:26 AM, Zhenyu Wang > > > wrote: > > > > > > > > > Hi team, > > > > > > > > > > Here is a question about KRaft from normal user, who starts to use > > and > > > > > learn Kafka since 3.2 > > > > > > > > > > Last month Kafka 3.4, the first bridge release was available, and I > > am > > > > > considering to have a plan to use KRaft (get rid of ZK) since this > > > > version > > > > > > > > > > Currently I am using 3.3.2 (upgrade from 3.2) with only one node, > > which > > > > is > > > > > both controller & broker, even ZK is installed on this node too > > (sorry > > > I > > > > > know it is not distributed and I will try to improve it with more > > > > knowledge > > > > > learned in future) > > > > > > > > > > When I read KIP-866, ZK to KRaft migration, from section Migration > > > > > Overview, seems like the document is for multi-nodes with no or > > almost > > > no > > > > > downtime, enable KRaft node by node; however my case accepts > downtime > > > > (one > > > > > node -_-!!), just want to have Kafka upgrade to 3.4 then start > > service > > > > > under KRaft mode, make sure everything works well and no log lost > > > > > > > > > > Should I simply > > > > > 1. download 3.4 binary > > > > > 2. stop ZK & Kafka service > > > > > 3. upgrade Kafka to 3.4 > > > > > 4. start only Kafka service with KRaft server.properties > > > > > > > > > > Or any other thing I need to pay attention to? > > > > > > > > > > If there is a documentation as guide that would be quite helpful > > > > > > > > > > Really appreciate > > > > > > > > > > > > > > >
Re: Question about KRaft
Hi Luke, This docu is good. Does it apply for confluent as well? On Fri, 10 Mar 2023 at 8:47 AM, Luke Chen wrote: > Hi Zhenyu, > > Answering your question: > > > Should I simply > 1. download 3.4 binary > 2. stop ZK & Kafka service > 3. upgrade Kafka to 3.4 > 4. start only Kafka service with KRaft server.properties > > That is not migrating, actually. That is just creating another kafka > cluster in KRaft mode. > The point for migration is to move metadata in ZK into KRaft controllers. > You can follow the guide here to do migration: > https://kafka.apache.org/documentation/#kraft_zk_migration > > Thank you. > Luke > > On Tue, Mar 7, 2023 at 11:07 PM Zhenyu Wang > wrote: > > > Hi Sunil, > > > > As mentioned earlier in my question, I have only one "combined" node as > > both controller and broker, and I totally accept downtime (stop service) > > > > So just want to ask for my case, single node, if I want to upgrade to 3.4 > > then start service under KRaft (get rid of ZK), what would be the steps? > > > > Thanks~ > > > > On Mon, Mar 6, 2023 at 11:49 PM sunil chaudhari < > > sunilmchaudhar...@gmail.com> > > wrote: > > > > > How will you achieve zero downtime of you stop zookeeper and kafka? > > > There must be some standard steps so that stop zookeeper one by one and > > > start kraft same time so that it will be migrated gradually. > > > > > > > > > > > > On Tue, 7 Mar 2023 at 9:26 AM, Zhenyu Wang > > wrote: > > > > > > > Hi team, > > > > > > > > Here is a question about KRaft from normal user, who starts to use > and > > > > learn Kafka since 3.2 > > > > > > > > Last month Kafka 3.4, the first bridge release was available, and I > am > > > > considering to have a plan to use KRaft (get rid of ZK) since this > > > version > > > > > > > > Currently I am using 3.3.2 (upgrade from 3.2) with only one node, > which > > > is > > > > both controller & broker, even ZK is installed on this node too > (sorry > > I > > > > know it is not distributed and I will try to improve it with more > > > knowledge > > > > learned in future) > > > > > > > > When I read KIP-866, ZK to KRaft migration, from section Migration > > > > Overview, seems like the document is for multi-nodes with no or > almost > > no > > > > downtime, enable KRaft node by node; however my case accepts downtime > > > (one > > > > node -_-!!), just want to have Kafka upgrade to 3.4 then start > service > > > > under KRaft mode, make sure everything works well and no log lost > > > > > > > > Should I simply > > > > 1. download 3.4 binary > > > > 2. stop ZK & Kafka service > > > > 3. upgrade Kafka to 3.4 > > > > 4. start only Kafka service with KRaft server.properties > > > > > > > > Or any other thing I need to pay attention to? > > > > > > > > If there is a documentation as guide that would be quite helpful > > > > > > > > Really appreciate > > > > > > > > > >
Re: Question about KRaft
Hi Zhenyu, Answering your question: > Should I simply 1. download 3.4 binary 2. stop ZK & Kafka service 3. upgrade Kafka to 3.4 4. start only Kafka service with KRaft server.properties That is not migrating, actually. That is just creating another kafka cluster in KRaft mode. The point for migration is to move metadata in ZK into KRaft controllers. You can follow the guide here to do migration: https://kafka.apache.org/documentation/#kraft_zk_migration Thank you. Luke On Tue, Mar 7, 2023 at 11:07 PM Zhenyu Wang wrote: > Hi Sunil, > > As mentioned earlier in my question, I have only one "combined" node as > both controller and broker, and I totally accept downtime (stop service) > > So just want to ask for my case, single node, if I want to upgrade to 3.4 > then start service under KRaft (get rid of ZK), what would be the steps? > > Thanks~ > > On Mon, Mar 6, 2023 at 11:49 PM sunil chaudhari < > sunilmchaudhar...@gmail.com> > wrote: > > > How will you achieve zero downtime of you stop zookeeper and kafka? > > There must be some standard steps so that stop zookeeper one by one and > > start kraft same time so that it will be migrated gradually. > > > > > > > > On Tue, 7 Mar 2023 at 9:26 AM, Zhenyu Wang > wrote: > > > > > Hi team, > > > > > > Here is a question about KRaft from normal user, who starts to use and > > > learn Kafka since 3.2 > > > > > > Last month Kafka 3.4, the first bridge release was available, and I am > > > considering to have a plan to use KRaft (get rid of ZK) since this > > version > > > > > > Currently I am using 3.3.2 (upgrade from 3.2) with only one node, which > > is > > > both controller & broker, even ZK is installed on this node too (sorry > I > > > know it is not distributed and I will try to improve it with more > > knowledge > > > learned in future) > > > > > > When I read KIP-866, ZK to KRaft migration, from section Migration > > > Overview, seems like the document is for multi-nodes with no or almost > no > > > downtime, enable KRaft node by node; however my case accepts downtime > > (one > > > node -_-!!), just want to have Kafka upgrade to 3.4 then start service > > > under KRaft mode, make sure everything works well and no log lost > > > > > > Should I simply > > > 1. download 3.4 binary > > > 2. stop ZK & Kafka service > > > 3. upgrade Kafka to 3.4 > > > 4. start only Kafka service with KRaft server.properties > > > > > > Or any other thing I need to pay attention to? > > > > > > If there is a documentation as guide that would be quite helpful > > > > > > Really appreciate > > > > > >
Re: [ANNOUNCE] New Kafka PMC Member: Chris Egerton
Congrats! On 3/9/23 2:59 PM, José Armando García Sancio wrote: Congrats Chris. On Thu, Mar 9, 2023 at 2:01 PM Kowshik Prakasam wrote: Congrats Chris! On Thu, Mar 9, 2023 at 1:33 PM Divij Vaidya wrote: Congratulations Chris! I am in awe with the amount of effort you put in code reviews and helping out the community members. Very well deserved. -- Divij Vaidya On Thu, Mar 9, 2023 at 9:49 PM Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: So well deserved! Congratulations Chris!!! On Thu, 9 Mar 2023 at 22:09, Lucas Brutschy wrote: Congratulations! On Thu, Mar 9, 2023 at 8:48 PM Roman Schmitz wrote: Congratulations Chris! Am Do., 9. März 2023 um 20:33 Uhr schrieb Chia-Ping Tsai < chia7...@gmail.com : Congratulations Chris! Mickael Maison 於 2023年3月10日 上午2:21 寫道: Congratulations Chris! On Thu, Mar 9, 2023 at 7:17 PM Bill Bejeck wrote: Congratulations Chris! On Thu, Mar 9, 2023 at 1:12 PM Jun Rao wrote: Hi, Everyone, Chris Egerton has been a Kafka committer since July 2022. He has been very instrumental to the community since becoming a committer. It's my pleasure to announce that Chris is now a member of Kafka PMC. Congratulations Chris! Jun on behalf of Apache Kafka PMC
Re: [ANNOUNCE] New Kafka PMC Member: David Arthur
Congrats! On 3/9/23 2:59 PM, José Armando García Sancio wrote: Congrats David! On Thu, Mar 9, 2023 at 2:00 PM Kowshik Prakasam wrote: Congrats David! On Thu, Mar 9, 2023 at 12:09 PM Lucas Brutschy wrote: Congratulations! On Thu, Mar 9, 2023 at 8:37 PM Manikumar wrote: Congrats David! On Fri, Mar 10, 2023 at 12:24 AM Josep Prat Congrats David! ——— Josep Prat Aiven Deutschland GmbH Alexanderufer 3-7, 10117 Berlin Amtsgericht Charlottenburg, HRB 209739 B Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen m: +491715557497 w: aiven.io e: josep.p...@aiven.io On Thu, Mar 9, 2023, 19:22 Mickael Maison wrote: Congratulations David! On Thu, Mar 9, 2023 at 7:20 PM Chris Egerton wrote: Congrats David! On Thu, Mar 9, 2023 at 1:17 PM Bill Bejeck wrote: Congratulations David! On Thu, Mar 9, 2023 at 1:12 PM Jun Rao wrote: Hi, Everyone, David Arthur has been a Kafka committer since 2013. He has been very instrumental to the community since becoming a committer. It's my pleasure to announce that David is now a member of Kafka PMC. Congratulations David! Jun on behalf of Apache Kafka PMC
Re: Exactly once kafka connect query
Hi Chris, We have a use case to commit previous successful records and stop the processing of the current file and move on with the next file. To achieve that I called commitTransaction when I reach the first error record, but commit is not happening for me. Kafka connect tries to abort the transaction automatically, I checked the _transaction_state topic and states marked as PrepareAbort and CompleteAbort. Do you know why kafka connect automatically invokes abort instead of the implicit commit I called? Then as a result, when I tries to parse the next file - say ABC, I saw the logs "Aborting incomplete transaction" and ERROR: "Failed to sent record to topic", and we lost the first batch of records from the current transaction in the file ABC. Is it possible that there's a case where an abort is being requested while the current transaction is empty (i.e., the task hasn't returned any records from SourceTask::poll since the last transaction was committed/aborted)? --- Yes, that case is possible for us. There is a case where the first record itself an error record. Thanks, Nitty On Thu, Mar 9, 2023 at 3:48 PM Chris Egerton wrote: > Hi Nitty, > > Thanks for the code examples and the detailed explanations, this is really > helpful! > > > Say if I have a file with 5 records and batch size is 2, and in my 3rd > batch I have one error record then in that batch, I dont have a valid > record to call commit or abort. But I want to commit all the previous > batches that were successfully parsed. How do I do that? > > An important thing to keep in mind with the TransactionContext API is that > all records that a task returns from SourceTask::poll are implicitly > included in a transaction. Invoking SourceTaskContext::transactionContext > doesn't alter this or cause transactions to start being used; everything is > already in a transaction, and the Connect runtime automatically begins > transactions for any records it sees from the task if it hasn't already > begun one. It's also valid to return a null or empty list of records from > SourceTask::poll. So in this case, you can invoke > transactionContext.commitTransaction() (the no-args variant) and return an > empty batch from SourceTask::poll, which will cause the transaction > containing the 4 valid records that were returned in the last 2 batches to > be committed. > > FWIW, I would be a little cautious about this approach. Many times it's > better to fail fast on invalid data; it might be worth it to at least allow > users to configure whether the connector fails on invalid data, or silently > skips over it (which is what happens when transactions are aborted). > > > Why is abort not working without adding the last record to the list? > > Is it possible that there's a case where an abort is being requested while > the current transaction is empty (i.e., the task hasn't returned any > records from SourceTask::poll since the last transaction was > committed/aborted)? I think this may be a bug in the Connect framework > where we don't check to see if a transaction is already open when a task > requests that a transaction be aborted, which can cause tasks to fail (see > https://issues.apache.org/jira/browse/KAFKA-14799 for more details). > > Cheers, > > Chris > > > On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY wrote: > > > Hi Chris, > > > > I am not sure if you are able to see the images I shared with you . > > Copying the code snippet below, > > > > if (expectedRecordCount >= 0) { > > int missingCount = expectedRecordCount - (int) this. > > recordOffset() - 1; > > if (missingCount > 0) { > > if (transactionContext != null) { > > isMissedRecords = true; > > } else { > > throw new DataException(String.format("Missing %d records > > (expecting %d, actual %d)", missingCount, expectedRecordCount, this. > > recordOffset())); > > } > > } else if (missingCount < 0) { > > if (transactionContext != null) { > > isMissedRecords = true; > > } else { > > throw new DataException(String.format("Too many records > > (expecting %d, actual %d)", expectedRecordCount, this.recordOffset())); > > } > > } > > } > > addLastRecord(records, null, value); > > } > > > > > > > > //asn1 or binary abort > > if((config.parseErrorThreshold != null && parseErrorCount >= > > config.parseErrorThreshold > > && lastbatch && transactionContext != null) || (isMissedRecords > > && transactionContext != null && lastbatch)) { > > log.info("Transaction is aborting"); > > log.info("records = {}", records); > > if (!records.isEmpty()) { > > log.info("with record"); > > > transactionContext.abortTransaction(records.get(records.size > > ()-1)); > > } else { > > log.info("without record"); > >
Re: Exactly once kafka connect query
Hi Nitty, Thanks for the code examples and the detailed explanations, this is really helpful! > Say if I have a file with 5 records and batch size is 2, and in my 3rd batch I have one error record then in that batch, I dont have a valid record to call commit or abort. But I want to commit all the previous batches that were successfully parsed. How do I do that? An important thing to keep in mind with the TransactionContext API is that all records that a task returns from SourceTask::poll are implicitly included in a transaction. Invoking SourceTaskContext::transactionContext doesn't alter this or cause transactions to start being used; everything is already in a transaction, and the Connect runtime automatically begins transactions for any records it sees from the task if it hasn't already begun one. It's also valid to return a null or empty list of records from SourceTask::poll. So in this case, you can invoke transactionContext.commitTransaction() (the no-args variant) and return an empty batch from SourceTask::poll, which will cause the transaction containing the 4 valid records that were returned in the last 2 batches to be committed. FWIW, I would be a little cautious about this approach. Many times it's better to fail fast on invalid data; it might be worth it to at least allow users to configure whether the connector fails on invalid data, or silently skips over it (which is what happens when transactions are aborted). > Why is abort not working without adding the last record to the list? Is it possible that there's a case where an abort is being requested while the current transaction is empty (i.e., the task hasn't returned any records from SourceTask::poll since the last transaction was committed/aborted)? I think this may be a bug in the Connect framework where we don't check to see if a transaction is already open when a task requests that a transaction be aborted, which can cause tasks to fail (see https://issues.apache.org/jira/browse/KAFKA-14799 for more details). Cheers, Chris On Wed, Mar 8, 2023 at 6:44 PM NITTY BENNY wrote: > Hi Chris, > > I am not sure if you are able to see the images I shared with you . > Copying the code snippet below, > > if (expectedRecordCount >= 0) { > int missingCount = expectedRecordCount - (int) this. > recordOffset() - 1; > if (missingCount > 0) { > if (transactionContext != null) { > isMissedRecords = true; > } else { > throw new DataException(String.format("Missing %d records > (expecting %d, actual %d)", missingCount, expectedRecordCount, this. > recordOffset())); > } > } else if (missingCount < 0) { > if (transactionContext != null) { > isMissedRecords = true; > } else { > throw new DataException(String.format("Too many records > (expecting %d, actual %d)", expectedRecordCount, this.recordOffset())); > } > } > } > addLastRecord(records, null, value); > } > > > > //asn1 or binary abort > if((config.parseErrorThreshold != null && parseErrorCount >= > config.parseErrorThreshold > && lastbatch && transactionContext != null) || (isMissedRecords > && transactionContext != null && lastbatch)) { > log.info("Transaction is aborting"); > log.info("records = {}", records); > if (!records.isEmpty()) { > log.info("with record"); > transactionContext.abortTransaction(records.get(records.size > ()-1)); > } else { > log.info("without record"); > transactionContext.abortTransaction(); > } > > Thanks, > Nitty > > On Wed, Mar 8, 2023 at 11:38 PM NITTY BENNY wrote: > >> Hi Chris, >> Sorry for the typo in my previous email. >> >> Regarding the point 2,* the task returns a batch of records from >> SourceTask::poll (and, if using* >> >> >> *the per-record API provided by the TransactionContext class, includes >> atleast one record that should trigger a transaction commit/abort in >> thatbatch)* >> What if I am using the API without passing a record? We have 2 types of >> use cases, one where on encountering an error record, we want to commit >> previous successful batches and disregard the failed record and upcoming >> batches. In this case we created the transactionContext just before reading >> the file (file is our transaction boundary).Say if I have a file with 5 >> records and batch size is 2, and in my 3rd batch I have one error record >> then in that batch, I dont have a valid record to call commit or abort. But >> I want to commit all the previous batches that were successfully parsed. >> How do I do that? >> >> Second use case is where I want to abort a transaction if the record >> count doesn't match. >> Code Snippet : >> [image: image.png] >> There are no error records in this case. If you see I added the