Kafka Topic partitions losing replication factor

2023-01-23 Thread Dima Brodsky
Hello,

I was wondering if anybody has seen the following:

I have a topic with about 200 partitions and with a replication factor of
3.  Once in a while I am seeing a partition or two lose a replica.  That is
the replica list for those partitions goes from 3 to 2 brokers.  Kafka does
not see anything wrong, all the kafka brokers seem healthy, and I don't see
any errors in the logs.  The topic still has a replication of 3.  To fix
this I have to create a partition reassignment plan and add back the third
replica.

Has anybody seen anything similar?

Thanks!
ttyl
Dima


-- 
ddbrod...@gmail.com

"The price of reliability is the pursuit of the utmost simplicity.
It is a price which the very rich find the most hard to pay."
   (Sir
Antony Hoare, 1980)


Re: Custom Kafka Streams State Restore Logic

2023-01-23 Thread Upesh Desai
Yeah, work around we’ve come up with is to use the Processor.init() method. We 
confirmed that this is only called once all state stores for used by that 
processor have been fully restored. From there, we iterate over the entire 
store and (re)build our tracking map.

It seems to work well and is performant enough thus far! Thanks for your help.

Cheers,
Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Matthias J. Sax 
Date: Monday, January 23, 2023 at 8:05 PM
To: users@kafka.apache.org 
Subject: Re: Custom Kafka Streams State Restore Logic
Thanks.

I agree. Seems your options are limited. The API is not really a good
fix for what you want to do... Sorry.

-Matthias

On 1/18/23 7:48 AM, Upesh Desai wrote:
> Hi Matthias, thanks for your reply! Sure, so the use case is as follows.
>
> We currently store some time series data in the state store, and it is
> stored to a changelog as well. The time series data is bucketed (5
> minutes, 1 hour, and 1 day). Our goal was to always only have a max of 2
> time buckets in the store at once. As we receive new timeseries data, we
> figure out what time bucket it belongs to, and add it to its respective
> bucket. We have a “grace period” which allows for late arriving data to
> be processed even after a time bucket has ended. That’s the reason why
> we have this constraint of 2 time buckets max within the store; 1 for
> the previous bucket in its grace period, 1 for the current bucket.
>
> So we wanted to extend the base state store and add a simple in-memory
> map to track the 2 time buckets per timeseries (that’s the store key). A
> couple reasons why we don’t want to add this as a separate state store
> or the existing store are:
> 1. There is a ton of serialization / deserialization that happens behind
> the scenes
>
> 2. This new time bucket tracking map would only be updated a couple
> times per time bucket, and does not need to be updated on every message
> read.
>
> 3. There’s no API on the included stores that allows us to do so
>
> Therefore, I thought it best to try to use the existing store
> functionality, create a “new state store” that really just instantiates
> one of the included stores within, add this in memory map, and then plug
> into/alter/extend the restore functionality to populate the time bucket
> tracking map during restore time.
>
> It sounds like I will either have to 1) create a custom state store from
> scratch, or 2) see if there is a post-restore hook that can then call a
> method to scan the whole store and build up the time bucket map before
> starting to process.
>
> Any advice on Kafka streams / state store logic would be appreciated!
>
> -Upesh
>
> Upesh Desai​   |   Senior Software Developer|   
> *ude...@itrsgroup.com*
> 
>
> *www.itrsgroup.com* 
>
>
> 
>
>
>
> *From: *Matthias J. Sax 
> *Date: *Wednesday, January 18, 2023 at 12:50 AM
> *To: *users@kafka.apache.org 
> *Subject: *Re: Custom Kafka Streams State Restore Logic
>
> Guess it depends what you actually want to achieve?
>
> Also note: `InMemoryWindowStore` is an internal class, and thus might
> change at any point, and it was never designed to be extended...
>
>
> -Matthias
>
> On 1/13/23 2:55 PM, Upesh Desai wrote:
>> Hello all,
>>
>> I am currently working on creating a new InMemoryWindowStore, by
>> extending the default in memory window store. One of the roadblocks I’ve
>> run into is finding a way to add some custom logic when the state store
>> is being restored from the changelog. I know that this is possible if I
>> completely write the store logic from scratch, but we really only want
>> to add a tiny bit of custom logic, and do not want to have to replicate
>> all the existing logic.
>>
>> Is there a simple way for this to be done? I see the default
>> implementation in the InMemoryWindowStore :
>>
>> context.register(
>>  root,
>>  (RecordBatchingStateRestoreCallback) records -> {
>>  for (final ConsumerRecord record : records) {
>>  put(
>>  Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
>>  record.value(),
>> /extractStoreTimestamp/(record.key())
>>  );
>>
>> ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(
>>  record,
>>  consistencyEnabled,
>>  position
>>  );
>>  }
>>  }
>> );
>>
>> Thanks in advance!
>>
>> Upesh
>>
>> >
>>
>>
>> Upesh Desai​
>> Senior Software Developer
>>
>> *ude...@itrsgroup.com* > >
>> *www.itrsgroup.com* >
>>
>> Internet communications are not secure and therefore the ITRS Group does
>> not accept legal responsibility for the contents of this message. Any

Re: Custom Kafka Streams State Restore Logic

2023-01-23 Thread Matthias J. Sax

Thanks.

I agree. Seems your options are limited. The API is not really a good 
fix for what you want to do... Sorry.


-Matthias

On 1/18/23 7:48 AM, Upesh Desai wrote:

Hi Matthias, thanks for your reply! Sure, so the use case is as follows.

We currently store some time series data in the state store, and it is 
stored to a changelog as well. The time series data is bucketed (5 
minutes, 1 hour, and 1 day). Our goal was to always only have a max of 2 
time buckets in the store at once. As we receive new timeseries data, we 
figure out what time bucket it belongs to, and add it to its respective 
bucket. We have a “grace period” which allows for late arriving data to 
be processed even after a time bucket has ended. That’s the reason why 
we have this constraint of 2 time buckets max within the store; 1 for 
the previous bucket in its grace period, 1 for the current bucket.


So we wanted to extend the base state store and add a simple in-memory 
map to track the 2 time buckets per timeseries (that’s the store key). A 
couple reasons why we don’t want to add this as a separate state store 
or the existing store are:
1. There is a ton of serialization / deserialization that happens behind 
the scenes


2. This new time bucket tracking map would only be updated a couple 
times per time bucket, and does not need to be updated on every message 
read.


3. There’s no API on the included stores that allows us to do so

Therefore, I thought it best to try to use the existing store 
functionality, create a “new state store” that really just instantiates 
one of the included stores within, add this in memory map, and then plug 
into/alter/extend the restore functionality to populate the time bucket 
tracking map during restore time.


It sounds like I will either have to 1) create a custom state store from 
scratch, or 2) see if there is a post-restore hook that can then call a 
method to scan the whole store and build up the time bucket map before 
starting to process.


Any advice on Kafka streams / state store logic would be appreciated!

-Upesh

Upesh Desai​	 | 	Senior Software Developer	 | 	*ude...@itrsgroup.com* 



*www.itrsgroup.com*   






*From: *Matthias J. Sax 
*Date: *Wednesday, January 18, 2023 at 12:50 AM
*To: *users@kafka.apache.org 
*Subject: *Re: Custom Kafka Streams State Restore Logic

Guess it depends what you actually want to achieve?

Also note: `InMemoryWindowStore` is an internal class, and thus might
change at any point, and it was never designed to be extended...


-Matthias

On 1/13/23 2:55 PM, Upesh Desai wrote:

Hello all,

I am currently working on creating a new InMemoryWindowStore, by 
extending the default in memory window store. One of the roadblocks I’ve 
run into is finding a way to add some custom logic when the state store 
is being restored from the changelog. I know that this is possible if I 
completely write the store logic from scratch, but we really only want 
to add a tiny bit of custom logic, and do not want to have to replicate 
all the existing logic.


Is there a simple way for this to be done? I see the default 
implementation in the InMemoryWindowStore :


context.register(
      root,
      (RecordBatchingStateRestoreCallback) records -> {
      for (final ConsumerRecord record : records) {
      put(
      Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
      record.value(),
/extractStoreTimestamp/(record.key())
      );
 
ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(

      record,
      consistencyEnabled,
      position
      );
      }
      }
);

Thanks in advance!

Upesh

>

   
Upesh Desai​

Senior Software Developer

*ude...@itrsgroup.com* >
*www.itrsgroup.com* >

Internet communications are not secure and therefore the ITRS Group does 
not accept legal responsibility for the contents of this message. Any 
view or opinions presented are solely those of the author and do not 
necessarily represent those of the ITRS Group unless otherwise 
specifically stated.


[itrs.email.signature]





Kafka Broker (with no leader partitions) : JVM heap usage fluctuates wildly in less than a minute.

2023-01-23 Thread Neeraj Vaidya
Hi All,I've posted this question on SO about JVM heap usage wildly fluctuating 
for a Broker which has no leader partitions.If anyone has a clue, please assist.
Kafka Broker JVM heap usage fluctuates wildly

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Kafka Broker JVM heap usage fluctuates wildly

I have 8 kafka brokers in a production environment. We have set the following 
flag to false for some reason.aut...
 |

 |

 |



Regards,Neeraj




[ANNOUNCE] Apache Kafka 3.3.2

2023-01-23 Thread Chris Egerton
The Apache Kafka community is pleased to announce the release for Apache
Kafka 3.3.2

Apache Kafka 3.3.2 is a bugfix release and it contains, among other things,
fixes for 20 issues reported since 3.3.1.

All of the changes in this release can be found in the release notes:
https://www.apache.org/dist/kafka/3.3.2/RELEASE_NOTES.html


You can download the source and binary release (Scala 2.12 and 2.13) from:
https://kafka.apache.org/downloads#3.3.2

---


Apache Kafka is a distributed streaming platform with four core APIs:


** The Producer API allows an application to publish a stream of records to
one or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more
topics and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming an input stream from one or more topics and producing an
output stream to one or more output topics, effectively transforming the
input streams to output streams.

** The Connector API allows building and running reusable producers or
consumers that connect Kafka topics to existing applications or data
systems. For example, a connector to a relational database might
capture every change to a table.


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data
between systems or applications.

** Building real-time streaming applications that transform or react
to the streams of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
Target, The New York Times, Uber, Yelp, and Zalando, among others.

A big thank you for the following 39 contributors to this release!

A. Sophie Blee-Goldman, Alyssa Huang, Artem Livshits, Bill Bejeck, Calvin
Liu, Chia-Ping Tsai, Chris Egerton, Christo Lolov, Colin Patrick McCabe,
Dan Stelljes, David Arthur, David Jacot, Divij Vaidya, FUNKYE, Greg Harris,
Huilin Shi, Igor Soarez, Ismael Juma, Jason Gustafson, Jeff Kim, Jorge
Esteban Quilcate Otoya, José Armando García Sancio, Justine Olshan, Kirk
True, liuzhuang2017, Lucas Brutschy, Luke Chen, Matthias J. Sax, Mickael
Maison, Niket, Pratim SC, Purshotam Chauhan, Rohan, Ron Dagostino, Shawn,
srishti-saraswat, Sushant Mahajan, Vicky Papavasileiou, zou shengfu

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
https://kafka.apache.org/

Thank you!


Regards,

Chris


Re: JIRA access

2023-01-23 Thread Chris Egerton
Hi Titouan,

I've added you to our Jira project; you should be good to go now.

Cheers,

Chris

On Mon, Jan 23, 2023 at 10:41 AM Titouan Chary 
wrote:

> Hi,
>
> It seems that JIRA access are disabled by default. Is it the right email to
> reach out in order to get a new JIRA account. I would like to participate
> and give additional data points to the following ticket:
> https://issues.apache.org/jira/browse/KAFKA-13077
>
> Thanks in advance,
> Regards,
> Titouan Chary
>


JIRA access

2023-01-23 Thread Titouan Chary
Hi,

It seems that JIRA access are disabled by default. Is it the right email to
reach out in order to get a new JIRA account. I would like to participate
and give additional data points to the following ticket:
https://issues.apache.org/jira/browse/KAFKA-13077

Thanks in advance,
Regards,
Titouan Chary


Re: Kafka issues related 3.2.3 version zookeeper less cluster

2023-01-23 Thread Kunal Jadhav
Ok, thanks Divij, I will check it.

---
Thanks & Regards,
Kunal Jadhav


On Mon, Jan 16, 2023 at 7:47 PM Divij Vaidya 
wrote:

> Hey Kunal
>
> The stack trace you provided for the broker is from a non-controller node.
> The actual error occurs at the controller while processing the
> AllocateProduceIds API. Could you please check the controller at that time
> and let us know what kind of errors you see over there which are correlated
> with this time frame?
>
> Thanks!
>
> Regards,
> Divij Vaidya
>
>
>
> On Thu, Jan 12, 2023 at 3:13 PM Kunal Jadhav
>  wrote:
>
> > Hello All,
> >
> > We are facing some issues in the 3.2.3 kafka version cluster. We have
> > implemented 3 brokers cluster on a single node server in the kubernetes
> > environment. After every week are facing issues which are listed below.
> So,
> > please help us to resolve these issues. Thanks to all in advance.
> >
> > *Issues we faced on brokers :*
> >
> > [2023-01-12 04:43:52,101] ERROR [KafkaApi-0] Unexpected error handling
> > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4,
> >
> >
> clientId=dev-prp-clz-com-v2.1.105-MHM_CLZ_COM-DEV_123-3e0cf839-9c81-400c-94d2-975d330be75c-StreamThread-1-producer,
> > correlationId=2) -- InitProducerIdRequestData(transactionalId=null,
> > transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1) with
> > context RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID,
> > apiVersion=4,
> >
> clientId=dev-prp-clz-com-v2.1.105-MHM_CLZ_COM-DEV_123-3e0cf839-9c81-400c-94d2-975d330be75c-StreamThread-1-producer,
> > correlationId=2),
> > connectionId='10.233.80.1:9092-10.233.104.6:35952-208286',
> > clientAddress=/10.233.104.6, principal=User:ANONYMOUS,
> > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT,
> > clientInformation=ClientInformation(softwareName=apache-kafka-java,
> > softwareVersion=3.2.3), fromPrivilegedListener=true,
> >
> >
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@3a6b516c
> > ])
> > (kafka.server.KafkaApis)
> > org.apache.kafka.common.errors.UnknownServerException: The server
> > experienced an unexpected error when processing the request.
> > [2023-01-12 04:43:52,159] WARN [RPC ProducerId Manager 0]: Had an
> > unknown error from the controller, giving up.
> > (kafka.coordinator.transaction.RPCProducerIdManager)
> > [2023-01-12 04:43:52,159] ERROR [KafkaApi-0] Unexpected error handling
> > request RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=4,
> >
> >
> clientId=dev-prp-clz-com-v2.1.105-SCN_CLZ_COM_POST_PROCESS-DEV_123-4a353282-d8cf-4ac9-b2ea-a9c0e1163587-StreamThread-1-producer,
> > correlationId=2) -- InitProducerIdRequestData(transactionalId=null,
> > transactionTimeoutMs=2147483647, producerId=-1, producerEpoch=-1) with
> > context RequestContext(header=RequestHeader(apiKey=INIT_PRODUCER_ID,
> > apiVersion=4,
> >
> clientId=dev-prp-clz-com-v2.1.105-SCN_CLZ_COM_POST_PROCESS-DEV_123-4a353282-d8cf-4ac9-b2ea-a9c0e1163587-StreamThread-1-producer,
> > correlationId=2),
> > connectionId='10.233.80.1:9092-10.233.104.6:35946-208286',
> > clientAddress=/10.233.104.6, principal=User:ANONYMOUS,
> > listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT,
> > clientInformation=ClientInformation(softwareName=apache-kafka-java,
> > softwareVersion=3.2.3), fromPrivilegedListener=true,
> >
> >
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@3042f6ba
> > ])
> > (kafka.server.KafkaApis)
> > org.apache.kafka.common.errors.UnknownServerException: The server
> > experienced an unexpected error when processing the request.
> > [2023-01-12 04:43:52,194] WARN [RPC ProducerId Manager 0]: Had an
> > unknown error from the controller, giving up.
> > (kafka.coordinator.transaction.RPCProducerIdManager)
> >
> >
> > *Issues we faced on client :*
> >
> > 06:09:22.752 ERROR org.apache.kafka.clients.producer.internals.Sender
> > - [Producer clientId=javaproducer11675630941604] Aborting producer
> > batches due to fatal error
> > org.apache.kafka.common.KafkaException: Unexpected error in
> > InitProducerIdResponse; The server experienced an unexpected error
> > when processing the request.
> > at
> >
> org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1418)
> > ~[kafka-clients-3.2.3.jar:?]
> > at
> >
> org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1322)
> > ~[kafka-clients-3.2.3.jar:?]
> > at
> >
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
> > ~[kafka-clients-3.2.3.jar:?]
> > at
> >
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
> > ~[kafka-clients-3.2.3.jar:?]
> > at
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
> > ~[kafka-clients-3.2.3.jar:?]
> > at
> >
> org.apache.kaf