Re: Query regarding kafka controller shutdown

2022-06-05 Thread Liam Clarke-Hutchinson
Off the top of my head, it looks like it lost network connectivity to some
extent.

Question - what settings were used for topics like efGamePlay? What is min
insync replicas, replication factor, and what acks settings is the producer
using?

Cheers,

Liam

On Fri, 3 Jun 2022 at 22:55, dhiraj prajapati  wrote:

> Hi all,
> Recently we faced an issue with one of our production kafka clusters:
>  - It is a 3 node cluster
>  - kafka server version is 1.0
>
> *Issue*:
> One of the brokers had some problem resulting in the following:
> 1. The broker lost leadership of all of the topic-parttions
> 2. However the kafka server process did *NOT* get stopped/killed.
>
> I have attached the exception traces from controller and server logs at
> that time at the end of this email.
>
> *Questions:*
> 1. What could be the reason behind this happening?
> 2. How can I reproduce this exact scenario in our test environment?
>
> P.S. We did not see any GC logs, or general network blips at around that
> time.
>
> *Exception trace in controller log:*
>
> [2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2] Starting
> (kafka.controller.RequestSendThread)
> [2022-05-21 09:07:45,914] INFO [RequestSendThread controllerId=2] Starting
> (kafka.controller.RequestSendThread)
> [2022-05-21 09:07:48,354] ERROR [Controller id=2] Error while electing or
> becoming controller on broker 2 (kafka.controller.KafkaController)
> kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either
> before or while waiting for connection
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:233)
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:221)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at
>
> kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:221)
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:215)
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215)
> at
>
> kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:215)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at
>
> kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:214)
> at
>
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1461)
> at
>
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1437)
> at
> kafka.zk.KafkaZkClient.getPartitionReassignment(KafkaZkClient.scala:671)
> at
>
> kafka.controller.KafkaController.initializePartitionReassignment(KafkaController.scala:698)
> at
>
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:655)
> at
>
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:232)
> at
>
> kafka.controller.KafkaController.kafka$controller$KafkaController$$elect(KafkaController.scala:1203)
> at
>
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1479)
> at
>
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
> at
>
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at
>
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
> at
>
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> [2022-05-21 09:07:48,358] DEBUG [Controller id=2] Resigning
> (kafka.controller.KafkaController)
> [2022-05-21 09:07:48,358] DEBUG [Controller id=2] Unregister
> BrokerModifications handler for Set(2, 1)
> (kafka.controller.KafkaController)
> [2022-05-21 09:07:48,360] INFO [PartitionStateMachine controllerId=2]
> Stopped partition state machine (kafka.controller.PartitionStateMachine)
> [2022-05-21 09:07:48,361] INFO [ReplicaStateMachine controllerId=2] Stopped
> replica state machine (kafka.controller.ReplicaStateMachine)
> [2022-05-21 09:07:48,361] INFO [RequestSendThread controllerId=2] Shutting
> down (kafka.controller.RequestSendThread)
> [2022-05-21 09:07:48,364] INFO [RequestSendThread controllerId=2] Stopped
> (kafka.controller.RequestSendThread)
> [2022-05-21 09:07:48,364] INFO [RequestSendThread 

Re: Event Grouping

2022-05-02 Thread Liam Clarke-Hutchinson
Hi Dave,

Okay, fair enough. Given that you've got a key to group on, and a
sensors_per_cycle to tell you how many events to expect, contingent on data
volumes you could easily aggregate in a regular ol Python map of key to a
list of events using Python clients to consume them and then produce the
aggregated results. You'd have to answer the question of "what if the
expected number of sensor readings in the cycle don't occur", and evict as
needed.

Alternatively, if the scale of volume is such that you need to go towards a
distributed solution, both Spark Streaming and Flink have Python APIs :)

Cheers,

Liam



On Tue, 3 May 2022 at 15:24, Dave Greeko 
wrote:

>  Hi Liam,
> Thanks for the answer and you are right; the question is how to aggregate
> input events into another topic. I have looked into streams but my initial
> understanding is that the stream API can be done in Java only. Our
> development environment is strictly Python and while at it, we also looked
> into Faust but we're preferring something that is pure Kafka without Java
> development.
>
>
>
>  On Monday, May 2, 2022, 07:11:55 PM PDT, Liam Clarke-Hutchinson <
> lclar...@redhat.com> wrote:
>
>  Hi Dave,
>
> I think you forgot to ask your question :D  However, if you're looking to
> group those events from the input topic and then putting the aggregated
> event onto another topic, have you looked into Kafka Streams?
> https://kafka.apache.org/31/documentation/streams/
>
> Cheers,
>
> Liam
>
> On Tue, 3 May 2022 at 12:05, Dave Greeko 
> wrote:
>
> > Hi there,
> > We have a number of IoT devices that send multiple related events in
> > separate TCP packets to a Kafka broker. The  IoT devices have access to a
> > PLC system that monitors a set of sensors for an industrial steam boiler.
> > The PLC raises multiple independent events for a random number of sensors
> > per cycle where each cycle is uniquely identified by an id.  Each PLC
> cycle
> > may have a reading from 1 to 10 different sensors each of which is sent
> > separately to an IoT device. The IoT device simply relay this event (one
> > per sensor) to a Kafka broker. It’s a one-to-many relation (1 PLC cycle
> > with a unique id = many events from multiple sensors) our goal is group
> the
> > events per IoT device and plc_cycle_id and the order is constraint by a
> > sequence number.
> >
> > Here is a data sample:
> > TCP packet #1:
> > {plc_ cycle _id:”AABB”, sensors_per_cycle:3, seq_number:1,
> > iot_device_id:1, sensor_data: {name:”pressure”, data:{value:16.3,
> > si_unit:”bar”}}
> >
> > TCP packet #2:
> > { plc_ cycle _id:” AABB”, sensors_per_cycle:3, seq_number:2,
> > iot_device_id:1, sensor_data: {name:”watter_level”, data:{value:3.8,
> > si_unit:”m”}}
> >
> > TCP packet #3:
> > { plc_ cycle _id:” AABB”, sensors_per_cycle:3, seq_number:3,
> > iot_device_id:1, sensor_data: {name:”steam_drum_temp”, data:{value:99.6,
> > si_unit:”c”}}
> >
> > Each kafka event has a reference to the total events to be grouped
> > (sensors_per_cycle) and the order for which those events will be grouped
> > by, is taken from seq_number.
> >
> > Regards,
> > Dave
> >
> >
>


Re: Event Grouping

2022-05-02 Thread Liam Clarke-Hutchinson
Hi Dave,

I think you forgot to ask your question :D  However, if you're looking to
group those events from the input topic and then putting the aggregated
event onto another topic, have you looked into Kafka Streams?
https://kafka.apache.org/31/documentation/streams/

Cheers,

Liam

On Tue, 3 May 2022 at 12:05, Dave Greeko 
wrote:

> Hi there,
> We have a number of IoT devices that send multiple related events in
> separate TCP packets to a Kafka broker. The  IoT devices have access to a
> PLC system that monitors a set of sensors for an industrial steam boiler.
> The PLC raises multiple independent events for a random number of sensors
> per cycle where each cycle is uniquely identified by an id.  Each PLC cycle
> may have a reading from 1 to 10 different sensors each of which is sent
> separately to an IoT device. The IoT device simply relay this event (one
> per sensor) to a Kafka broker. It’s a one-to-many relation (1 PLC cycle
> with a unique id = many events from multiple sensors) our goal is group the
> events per IoT device and plc_cycle_id and the order is constraint by a
> sequence number.
>
> Here is a data sample:
> TCP packet #1:
> {plc_ cycle _id:”AABB”, sensors_per_cycle:3, seq_number:1,
> iot_device_id:1, sensor_data: {name:”pressure”, data:{value:16.3,
> si_unit:”bar”}}
>
> TCP packet #2:
> { plc_ cycle _id:” AABB”, sensors_per_cycle:3, seq_number:2,
> iot_device_id:1, sensor_data: {name:”watter_level”, data:{value:3.8,
> si_unit:”m”}}
>
> TCP packet #3:
> { plc_ cycle _id:” AABB”, sensors_per_cycle:3, seq_number:3,
> iot_device_id:1, sensor_data: {name:”steam_drum_temp”, data:{value:99.6,
> si_unit:”c”}}
>
> Each kafka event has a reference to the total events to be grouped
> (sensors_per_cycle) and the order for which those events will be grouped
> by, is taken from seq_number.
>
> Regards,
> Dave
>
>


Re: Kafka Question

2022-05-02 Thread Liam Clarke-Hutchinson
Hi Emily,

Nope, Kafka doesn't have that capability built in, it's just a distributed
log that's great for streaming events. However, you can easily write a
program that consumes those events from Kafka and then does what you want
:)

Cheers,

Liam

On Tue, 3 May 2022 at 06:30, Emily Schepisi
 wrote:

> Hello,
>
> I have a question about Kafka. If I put an upper and lower control limit on
> the data, and the log records an event where the upper or lower control
> limit is breached, will Kafka be able to send a notification via email or
> text message to the user?
>
> Example: I'm tracking the daily temperature and set the upper control limit
> at 80 degrees and the lower control limit at 50 degrees. The event log on
> Kafka recorded the temperature on Monday at 90 degrees, so it's higher than
> the upper control limit. Does Kafka have the capability to send a text
> message or email to let me know that the temperature is outside of the
> control limit?
>
> Thank you,
>
> Emily
>


Re: kafka and Kube,

2022-05-02 Thread Liam Clarke-Hutchinson
Hi David,

There's a couple of operators you can use that I'm aware of:

Confluent for K8s:
https://docs.confluent.io/operator/current/co-deploy-cfk.html
Strimzi: https://strimzi.io/

I haven't used CFK, and big disclaimer, I work on Strimzi, so any
particular opinion I'd have on it is totally biased. But in general, I've
grown to quite like using an operator to deploy and manage Kafka in K8s, it
reduces maintenance overhead without going all the way to paying for a
managed Kafka service.

Cheers,

Liam



On Tue, 3 May 2022 at 08:30, David Ballano Fernandez <
dfernan...@demonware.net> wrote:

> Hi guys,
>
> I have a question, do any of you run kafka in Kubernetes? Are you using a
> kafka operator?
>
> If so, what are you running and what's your opinion?
>
> I'm trying to evaluate a kafka operator so any tips would be greatly
> appreciated.
>
> thanks!!
>


Re: How to deal with the Error: KAFKA_STORAGE_ERROR

2022-04-28 Thread Liam Clarke-Hutchinson
Hi Peter,

Firstly, I'd check disk health, then I'd check owners and permissions on
files in your log dir, eliminate those as issues.
Secondly, are you tracking metrics on offline log dirs?

Cheers,

Liam

On Thu, 28 Apr 2022 at 15:55, Dong, Peter A. (NSB - CN/Qingdao) <
peter.a.d...@nokia-sbell.com> wrote:

>
> Greetings, Kafka specialists
>
> A strange issue in my Kafka instance that blocked my for a couple of days.
>
> 1.   Cannot produce event to [topic-test] due to a KAFKA_STORAGE_ERROR
>
> 2.   It seems to me the log segment file is not having any problem.
>
> I can dump log, index, timeindex with kafka-dump-log without seeing any
> error.
>
> 3.   No error when I tried to produce events to other topics. Kafka
> log files are in a same disk partition.
>
> 4.   Restart Kafka instance and zookeeper instance did not help.
>
> 5.   I cannot find useful information about the error from server.log
> with TRACE level.
>
>
>
> Could you please let me know whether similar issue ever happened before?
> Where shall I go to dig further and continue my investigation?
>
> Thanks a lot!
>
> Peter
>
>
>
>
> The kafka client log
> kafka-console-producer --bootstrap-server 135.251.236.162:9092 --topic
> topic-test
>
> >[2022-04-28 11:12:10,925] WARN [Producer clientId=console-producer] Got
> error produce response with correlation id 5 on topic-partition
> topic-test-0, retrying (2 attempts left). Error: KAFKA_STORAGE_ERROR
> (org.apache.kafka.clients.producer.internals.Sender)
> [2022-04-28 11:12:10,925] WARN [Producer clientId=console-producer]
> Received invalid metadata error in produce request on partition
> topic-test-0 due to org.apache.kafka.common.errors.KafkaStorageException:
> Disk error when trying to access log file on the disk.. Going to request
> metadata update now (org.apache.kafka.clients.producer.internals.Sender)
> [2022-04-28 11:12:11,024] WARN [Producer clientId=console-producer] Got
> error produce response with correlation id 7 on topic-partition
> topic-test-0, retrying (1 attempts left). Error: KAFKA_STORAGE_ERROR
> (org.apache.kafka.clients.producer.internals.Sender)
> [2022-04-28 11:12:11,024] WARN [Producer clientId=console-producer]
> Received invalid metadata error in produce request on partition
> topic-test-0 due to org.apache.kafka.common.errors.KafkaStorageException:
> Disk error when trying to access log file on the disk.. Going to request
> metadata update now (org.apache.kafka.clients.producer.internals.Sender)
> [2022-04-28 11:12:11,127] WARN [Producer clientId=console-producer] Got
> error produce response with correlation id 9 on topic-partition
> topic-test-0, retrying (0 attempts left). Error: KAFKA_STORAGE_ERROR
> (org.apache.kafka.clients.producer.internals.Sender)
> [2022-04-28 11:12:11,127] WARN [Producer clientId=console-producer]
> Received invalid metadata error in produce request on partition
> topic-test-0 due to org.apache.kafka.common.errors.KafkaStorageException:
> Disk error when trying to access log file on the disk.. Going to request
> metadata update now (org.apache.kafka.clients.producer.internals.Sender)
> [2022-04-28 11:12:11,231] ERROR Error when sending message to topic
> topic-test with key: null, value: 0 bytes with error:
> (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
> org.apache.kafka.common.errors.KafkaStorageException: Disk error when
> trying to access log file on the disk.
> [2022-04-28 11:12:11,233] WARN [Producer clientId=console-producer]
> Received invalid metadata error in produce request on partition
> topic-test-0 due to org.apache.kafka.common.errors.KafkaStorageException:
> Disk error when trying to access log file on the disk.. Going to request
> metadata update now (org.apache.kafka.clients.producer.internals.Sender)
>
>
> The server.log
> [2022-04-27 07:41:51,203] TRACE [KafkaApi-0] Handling
> request:RequestHeader(apiKey=METADATA, apiVersion=11,
> clientId=console-producer, correlationId=9) --
> MetadataRequestData(topics=[MetadataRequestTopic(topicId=AA,
> name='topic-test')], allowAutoTopicCreation=true,
> includeClusterAuthorizedOperations=false,
> includeTopicAuthorizedOperations=false) from connection 135.251.236.162:9092
> -135.251.236.162:44194-2;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
> (kafka.server.KafkaApis)
> [2022-04-27 07:41:51,203] TRACE [KafkaApi-0] Sending topic metadata
> MetadataResponseTopic(errorCode=0, name='topic-tst',
> topicId=mgS7D7-9RZSgeEUJ3XXErw, isInternal=false,
> partitions=[MetadataResponsePartition(errorCode=0, partitionIndex=0,
> leaderId=0, leaderEpoch=0, replicaNodes=[0], isrNodes=[0],
> offlineReplicas=[])], topicAuthorizedOperations=-2147483648) and brokers
> baijin162-vnfprov:9092 (id: 0 rack: null) for correlation id 9 to client
> console-producer (kafka.server.KafkaApis)
> [2022-04-27 07:41:51,297] TRACE [KafkaApi-0] Handling
> request:RequestHeader(apiKey=PRODUCE, apiVersion=9,
> clientId=console-producer, 

Re: Requesting Pricing for Kafka Support

2022-04-26 Thread Liam Clarke-Hutchinson
Hi Lavanya,

Apache Kafka is a free open source software project and as such, doesn't
provide paid support. However, numerous companies do offer a supported
Kafka product, you will be able to find them via googling "kafka support"
or similar.

Cheers,

Liam

On Wed, 27 Apr 2022 at 02:20, Lavanya Voruganti 
wrote:

> Dear Kafka Team,
>
> *Greetings From Dhanyaayai..!!!*
>
> One of our client is having a requirement of  Kafka Support with Quantity -
> 04.
>
> Could you please help me with the reseller discounts for the same.
>
> Regards
> Lavanya Reddy
> +91 9701328158
>


Re: Thank you!

2022-04-19 Thread Liam Clarke-Hutchinson
Hi Matt,

Apache Kafka is a free open source software project, not too much swag on
hand ;)

Cheers,

Liam

On Tue, 19 Apr 2022 at 02:39, Matt Gilbert 
wrote:

>  I just wanted to reach out and thank you for all you do. I am a Senior
> Test Engineer and wanted to reach out to see if you had any swag or gear, I
> would love to represent your company!
>


Fwd: RuntimeMBeanException: getMBeanInfo threw RuntimeException caused by ConcurrentModificationException

2022-04-06 Thread Liam Clarke-Hutchinson
Hi Peter,

Are you running the Prometheus exporter on a broker? Or on a Kafka client?

Also, what version?

Cheers,

Liam Clarke-Hutchinson

On Wed, 6 Apr 2022 at 19:14, Peter Schrott (Extern)
 wrote:

> Hi users,
>
> I am getting  this following exceptions occasionally:
> JMX scrape failed: javax.management.RuntimeMBeanException: getMBeanInfo
> threw RuntimeException
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBeanInfo(DefaultMBeanServerInterceptor.java:1384)
> at
> com.sun.jmx.mbeanserver.JmxMBeanServer.getMBeanInfo(JmxMBeanServer.java:920)
> at io.prometheus.jmx.shaded.io
> .prometheus.jmx.JmxScraper.scrapeBean(JmxScraper.java:130)
> at io.prometheus.jmx.shaded.io
> .prometheus.jmx.JmxScraper.doScrape(JmxScraper.java:117)
> at io.prometheus.jmx.shaded.io
> .prometheus.jmx.JmxCollector.collect(JmxCollector.java:542)
> at io.prometheus.jmx.shaded.io
> .prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.findNextElement(CollectorRegistry.java:190)
> at io.prometheus.jmx.shaded.io
> .prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:223)
> at io.prometheus.jmx.shaded.io
> .prometheus.client.CollectorRegistry$MetricFamilySamplesEnumeration.nextElement(CollectorRegistry.java:144)
> at io.prometheus.jmx.shaded.io
> .prometheus.client.exporter.common.TextFormat.write004(TextFormat.java:22)
> at io.prometheus.jmx.shaded.io
> .prometheus.client.exporter.HTTPServer$HTTPMetricHandler.handle(HTTPServer.java:68)
> at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
> at sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:83)
> at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:82)
> at
> sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:675)
> at com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:79)
> at sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:647)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.ConcurrentModificationException
> at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
> at java.util.HashMap$EntryIterator.next(HashMap.java:1479)
> at java.util.HashMap$EntryIterator.next(HashMap.java:1477)
> at
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getMBeanInfo(JmxReporter.java:279)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getMBeanInfo(DefaultMBeanServerInterceptor.java:1378)
> ... 18 more
>
> Looking at the code here:
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java#L279
> it is obviously happening while looping over the private final Map KafkaMetric> metrics. The metrics attribute is not „locked“ or of type
> ConcurrentHashMap.
> Prometheus JMX Exporter tries to scrape the Beans while (I assume) Kafka
> Consumer is updating them.
>
> Am I getting something wrong or am I holding it wrong? Anyone an idea?
>
> Thanks & Best Peter
>
>
> Mapp Digital Germany GmbH with registered offices at Dachauer, Str. 63,
> 80335 München.
> Registered with the District Court München HRB 226181
> Managing Directors: Frasier, Christopher & Warren, Steve
> This e-mail is from Mapp Digital and its international legal entities and
> may contain information that is confidential or proprietary.
> If you are not the intended recipient, do not read, copy or distribute the
> e-mail or any attachments. Instead, please notify the sender and delete the
> e-mail and any attachments.
> Please consider the environment before printing. Thank you.
>


Re: Request was for a topic or partition that does not exist on this broker error when using Kafka in Kraft mode

2022-04-04 Thread Liam Clarke-Hutchinson
Hi Francis,

The rebalancing is your consumer group, it looks like your consumer is
repeatedly entering and leaving when hitting that error. Do you have logs
from the consumer side?

Cheers,

Liam

On Tue, 5 Apr 2022 at 11:30, Francis Chuang 
wrote:

> Not sure if this is due to the broker rebalancing multiple times every
> second for the topic. Relevant logs for the 3 brokers below:
>
> Broker 1:
> [2022-04-04 23:14:24,232] INFO Sent auto-creation request for
> Set(test-bounded-context-1-6cdaff21-6562-418a-92e8-1f2d8fa0c704) to the
> active controller. (kafka.server.DefaultAutoTopicCreationManager)
> [2022-04-04 23:14:24,234] INFO [Controller 2] CreateTopics result(s):
> CreatableTopic(name='test-bounded-context-1-6cdaff21-6562-418a-92e8-1f2d8fa0c704',
>
> numPartitions=1, replicationFactor=1, assignments=[], configs=[]):
> SUCCESS (org.apache.kafka.controller.ReplicationControlManager)
> [2022-04-04 23:14:24,234] INFO [Controller 2] Created topic
> test-bounded-context-1-6cdaff21-6562-418a-92e8-1f2d8fa0c704 with topic
> ID JujvHxuwSF65PjxapwU4KA.
> (org.apache.kafka.controller.ReplicationControlManager)
> [2022-04-04 23:14:24,235] INFO [Controller 2] Created partition
> test-bounded-context-1-6cdaff21-6562-418a-92e8-1f2d8fa0c704-0 with topic
> ID JujvHxuwSF65PjxapwU4KA and PartitionRegistration(replicas=[3],
> isr=[3], removingReplicas=[], addingReplicas=[], leader=3,
> leaderEpoch=0, partitionEpoch=0).
> (org.apache.kafka.controller.ReplicationControlManager)
>
> Broker 2 (there are a lot more rebalancing entries in the log, so this
> is just a sample):
> [2022-04-04 23:14:23,787] INFO [GroupCoordinator 3]: Dynamic Member with
> unknown member id joins group
> test-bounded-context-1-c42b0ec0-7646-4f9e-b82c-17f380e07f71 in Empty
> state. Created a new member id
> sarama-f36324b2-ec03-4c44-a8dc-67188c1986c8 for this member and add to
> the group. (kafka.coordinator.group.GroupCoordinator)
> [2022-04-04 23:14:23,799] INFO [GroupCoordinator 3]: Preparing to
> rebalance group
> test-bounded-context-1-c42b0ec0-7646-4f9e-b82c-17f380e07f71 in state
> PreparingRebalance with old generation 0 (__consumer_offsets-15)
> (reason: Adding new member sarama-f36324b2-ec03-4c44-a8dc-67188c1986c8
> with group instance id None) (kafka.coordinator.group.GroupCoordinator)
> [2022-04-04 23:14:23,808] INFO [GroupCoordinator 3]: Stabilized group
> test-bounded-context-1-c42b0ec0-7646-4f9e-b82c-17f380e07f71 generation 1
> (__consumer_offsets-15) with 1 members
> (kafka.coordinator.group.GroupCoordinator)
> [2022-04-04 23:14:23,818] INFO [GroupCoordinator 3]: Assignment received
> from leader sarama-f36324b2-ec03-4c44-a8dc-67188c1986c8 for group
> test-bounded-context-1-c42b0ec0-7646-4f9e-b82c-17f380e07f71 for
> generation 1. The group has 1 members, 0 of which are static.
> (kafka.coordinator.group.GroupCoordinator)
> [2022-04-04 23:14:24,030] INFO [Controller 3] Created topic
> test-bounded-context-1-c42b0ec0-7646-4f9e-b82c-17f380e07f71 with topic
> ID zVBVJQfOSrGmpjlYPEvv0w.
> (org.apache.kafka.controller.ReplicationControlManager)
> [2022-04-04 23:14:24,030] INFO [Controller 3] Created partition
> test-bounded-context-1-c42b0ec0-7646-4f9e-b82c-17f380e07f71-0 with topic
> ID zVBVJQfOSrGmpjlYPEvv0w and PartitionRegistration(replicas=[2],
> isr=[2], removingReplicas=[], addingReplicas=[], leader=2,
> leaderEpoch=0, partitionEpoch=0).
> (org.apache.kafka.controller.ReplicationControlManager)
> [2022-04-04 23:14:24,208] INFO [GroupCoordinator 3]: Preparing to
> rebalance group
> test-bounded-context-1-c42b0ec0-7646-4f9e-b82c-17f380e07f71 in state
> PreparingRebalance with old generation 1 (__consumer_offsets-15)
> (reason: Removing member sarama-f36324b2-ec03-4c44-a8dc-67188c1986c8 on
> LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
> [2022-04-04 23:14:24,210] INFO [GroupCoordinator 3]: Group
> test-bounded-context-1-c42b0ec0-7646-4f9e-b82c-17f380e07f71 with
> generation 2 is now empty (__consumer_offsets-15)
> (kafka.coordinator.group.GroupCoordinator)
> [2022-04-04 23:14:24,216] INFO [GroupCoordinator 3]: Member
> MemberMetadata(memberId=sarama-f36324b2-ec03-4c44-a8dc-67188c1986c8,
> groupInstanceId=None, clientId=sarama, clientHost=/172.21.0.6,
> sessionTimeoutMs=1, rebalanceTimeoutMs=6,
> supportedProtocols=List(range)) has left group
> test-bounded-context-1-c42b0ec0-7646-4f9e-b82c-17f380e07f71 through
> explicit `LeaveGroup` request (kafka.coordinator.group.GroupCoordinator)
> [2022-04-04 23:14:24,494] INFO [GroupCoordinator 3]: Dynamic Member with
> unknown member id joins group
> test-bounded-context-1-6cdaff21-6562-418a-92e8-1f2d8fa0c704 in Empty
> state. Created a new member id
> sarama-9affaa64-3233-4deb-978e-771fc790cf34 for this member and add to
> the group. (kafka.coordinator.group.GroupCoordinator)
> [2022-04-04 23:14:24,495] INFO [GroupCoordinator 3]: Preparing to
> rebalance group
> test-bounded-context-1-6cdaff21-6562-418a-92e8-1f2d8fa0c704 in state
> PreparingRebalance with old generation 0 

Re: kafka producer exception due to TimeoutException: Expiring records for topic 120000ms has passed since batch creation

2022-04-04 Thread Liam Clarke-Hutchinson
Hi Pushkar,

Could be a lot of things. What's your linger.ms configured for?

Cheers,

Liam

On Tue, 5 Apr 2022 at 05:39, Pushkar Deole  wrote:

> Hi All,
>
> We are intermittently seeing KafkaProducerException. The nested exception
> is as below:
>
> org.springframework.kafka.core.KafkaProducerException: Failed to send;
> nested exception is org.apache.kafka.common.errors.TimeoutException:
> Expiring 10 record(s) for analytics.mpe.passthrough-0:12 ms has passed
> since batch creation\n\tat
> org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(
> KafkaTemplate.java:602 )
>
> Kafka version is 2.5
> Can someone give some ideas as to what would cause this and how can this be
> resolved?
>


Re: Kafka Producer Record Error Rate

2022-04-04 Thread Liam Clarke-Hutchinson
Hi Neeraj,

Sorry, I was wrong, the error rate is only records that failed to send
after all retries were exhausted (or if retries weren't possible). So if
that's gone up, then records have been dropped by the producer's sender.

Cheers,

Liam



On Tue, 5 Apr 2022 at 10:35, Neeraj Vaidya
 wrote:

>  Thanks Liam.
> Yes, I do believe the following should really help:
> A producer metric which shows records which did not make their way to the
> topic because of retries being exhausted or timeout being exhausted .
>
> If the metric is at a batch level, then we will need to work out the math
> to calculate exactly how many records were dropped.
>
> Regards,
> Neeraj
>  On Tuesday, 5 April, 2022, 07:47:21 am GMT+10, Liam Clarke-Hutchinson
>  wrote:
>
>  Hi Neeraj,
>
> However, I am unclear what the record-error-rate|total metric for a
> > producer means,
> > Does the metric get incremented only when the record could not make it to
> > the topic or even when there was a transient/retriable error trying to
> send
> > the message to the topic ?
>
>
> The latter - so in your example, the error rate and retry rate metrics
> would both show an increase, but the records were eventually successfully
> sent. Would a metric for "batches that exhausted retries and so were
> dropped" be of any use to you? If so, I can propose adding one, and see
> what people think.
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Mon, 4 Apr 2022 at 19:29, Neeraj Vaidya
>  wrote:
>
> >  Thank you David and Liam for your excellent responses.
> > Checking in the consumer will be extremely difficult.
> > However, I am unclear what the record-error-rate|total metric for a
> > producer means,
> > Does the metric get incremented only when the record could not make it to
> > the topic or even when there was a transient/retriable error trying to
> send
> > the message to the topic ?
> >
> > I am posting below the producer properties that I am using.
> >
> >  acks = -1
> >  batch.size = 16384
> >  bootstrap.servers = [##MASKED##]
> >  buffer.memory = 23622320128
> >  client.dns.lookup = use_all_dns_ips
> >  client.id = producer-1
> >  compression.type = none
> >  connections.max.idle.ms = 54
> >  delivery.timeout.ms = 288
> >  enable.idempotence = true
> >  interceptor.classes = []
> >  internal.auto.downgrade.txn.commit = false
> >  key.serializer = class
> > org.apache.kafka.common.serialization.StringSerializer
> >  linger.ms = 0
> >  max.block.ms = 144
> >  max.in.flight.requests.per.connection = 5
> >  max.request.size = 1048576
> >  metadata.max.age.ms = 720
> >  metadata.max.idle.ms = 720
> >  metric.reporters = []
> >  metrics.num.samples = 2
> >  metrics.recording.level = INFO
> >  metrics.sample.window.ms = 3
> >  partitioner.class = class
> > org.apache.kafka.clients.producer.internals.DefaultPartitioner
> >  receive.buffer.bytes = 32768
> >  reconnect.backoff.max.ms = 1000
> >  reconnect.backoff.ms = 50
> >  request.timeout.ms = 3
> >  retries = 2147483647
> >  retry.backoff.ms = 100
> >  sasl.client.callback.handler.class = null
> >  sasl.jaas.config = null
> >  sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >  sasl.kerberos.min.time.before.relogin = 6
> >  sasl.kerberos.service.name = null
> >  sasl.kerberos.ticket.renew.jitter = 0.05
> >  sasl.kerberos.ticket.renew.window.factor = 0.8
> >  sasl.login.callback.handler.class = null
> >  sasl.login.class = null
> >  sasl.login.refresh.buffer.seconds = 300
> >  sasl.login.refresh.min.period.seconds = 60
> >  sasl.login.refresh.window.factor = 0.8
> >  sasl.login.refresh.window.jitter = 0.05
> >  sasl.mechanism = GSSAPI
> >  security.protocol = PLAINTEXT
> >  security.providers = null
> >  send.buffer.bytes = 131072
> >  ssl.cipher.suites = null
> >  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
> >  ssl.endpoint.identification.algorithm = https
> >  ssl.engine.factory.class = null
> >  ssl.key.password = null
> >  ssl.keymanager.algorithm = SunX509
> >  ssl.keystore.location = null
> >  ssl.keystore.password = null
> >  ssl.keystore.type = JKS
> >  ssl.protocol = TLSv1.3
> >  ssl.provider = null
> >  ssl.secure.random.implementation = null
> >  ssl.trustmanager.algorithm = PKIX
> >  ssl.truststore.location = null
> >  ssl.truststore.password = null
> >  ssl.truststore.type = JKS
> >  transaction.timeout.ms = 6
> >  transactional.id = null
> >  value.serializer = clas

Re: Kafka Producer Record Error Rate

2022-04-04 Thread Liam Clarke-Hutchinson
Hi Neeraj,

However, I am unclear what the record-error-rate|total metric for a
> producer means,
> Does the metric get incremented only when the record could not make it to
> the topic or even when there was a transient/retriable error trying to send
> the message to the topic ?


The latter - so in your example, the error rate and retry rate metrics
would both show an increase, but the records were eventually successfully
sent. Would a metric for "batches that exhausted retries and so were
dropped" be of any use to you? If so, I can propose adding one, and see
what people think.

Cheers,

Liam Clarke-Hutchinson

On Mon, 4 Apr 2022 at 19:29, Neeraj Vaidya
 wrote:

>  Thank you David and Liam for your excellent responses.
> Checking in the consumer will be extremely difficult.
> However, I am unclear what the record-error-rate|total metric for a
> producer means,
> Does the metric get incremented only when the record could not make it to
> the topic or even when there was a transient/retriable error trying to send
> the message to the topic ?
>
> I am posting below the producer properties that I am using.
>
>  acks = -1
>  batch.size = 16384
>  bootstrap.servers = [##MASKED##]
>  buffer.memory = 23622320128
>  client.dns.lookup = use_all_dns_ips
>  client.id = producer-1
>  compression.type = none
>  connections.max.idle.ms = 54
>  delivery.timeout.ms = 288
>  enable.idempotence = true
>  interceptor.classes = []
>  internal.auto.downgrade.txn.commit = false
>  key.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
>  linger.ms = 0
>  max.block.ms = 144
>  max.in.flight.requests.per.connection = 5
>  max.request.size = 1048576
>  metadata.max.age.ms = 720
>  metadata.max.idle.ms = 720
>  metric.reporters = []
>  metrics.num.samples = 2
>  metrics.recording.level = INFO
>  metrics.sample.window.ms = 3
>  partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>  receive.buffer.bytes = 32768
>  reconnect.backoff.max.ms = 1000
>  reconnect.backoff.ms = 50
>  request.timeout.ms = 3
>  retries = 2147483647
>  retry.backoff.ms = 100
>  sasl.client.callback.handler.class = null
>  sasl.jaas.config = null
>  sasl.kerberos.kinit.cmd = /usr/bin/kinit
>  sasl.kerberos.min.time.before.relogin = 6
>  sasl.kerberos.service.name = null
>  sasl.kerberos.ticket.renew.jitter = 0.05
>  sasl.kerberos.ticket.renew.window.factor = 0.8
>  sasl.login.callback.handler.class = null
>  sasl.login.class = null
>  sasl.login.refresh.buffer.seconds = 300
>  sasl.login.refresh.min.period.seconds = 60
>  sasl.login.refresh.window.factor = 0.8
>  sasl.login.refresh.window.jitter = 0.05
>  sasl.mechanism = GSSAPI
>  security.protocol = PLAINTEXT
>  security.providers = null
>  send.buffer.bytes = 131072
>  ssl.cipher.suites = null
>  ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
>  ssl.endpoint.identification.algorithm = https
>  ssl.engine.factory.class = null
>  ssl.key.password = null
>  ssl.keymanager.algorithm = SunX509
>  ssl.keystore.location = null
>  ssl.keystore.password = null
>  ssl.keystore.type = JKS
>  ssl.protocol = TLSv1.3
>  ssl.provider = null
>  ssl.secure.random.implementation = null
>  ssl.trustmanager.algorithm = PKIX
>  ssl.truststore.location = null
>  ssl.truststore.password = null
>  ssl.truststore.type = JKS
>  transaction.timeout.ms = 6
>  transactional.id = null
>  value.serializer = class
> io.vertx.kafka.client.serialization.JsonObjectSerializer
>
> Regards,
> Neeraj On Monday, 4 April, 2022, 03:19:08 pm GMT+10, David Finnie <
> david.fin...@infrasoft.com.au> wrote:
>
>  Hi Neeraj,
>
> I don't know what might be causing the first Produce error. Is the
> OUT_OF_ORDER_SEQUENCE_NUMBER the first Produce error? From the error
> that you included (Invalid sequence number for new epoch) it would seem
> that the broker doesn't (yet) know about the Producer's epoch - possibly
> because it is still catching up after you restarted it? Note that the
> first sequence number for a new epoch must be 0, so if the broker thinks
> that it is a new epoch, but the sequence number is 3, it will cause this
> error.
>
> I can explain more about the relationship of Producer ID, Producer Epoch
> and Sequence Number if you want.
>
> With 5 in-flight requests per connection, if any Produce is rejected,
> all other in-flight Produce requests will be rejected with
> OUT_OF_ORDER_SEQUENCE_NUMBER because the first rejected Produce
> request's sequence number range never got stored, so all subsequent
> in-flight Produce requests are out of sequence. (e.g. if a Produce with
> sequence number 2 is rejected, 

Re: Kafka Producer Record Error Rate

2022-04-03 Thread Liam Clarke-Hutchinson
Thanks Neeraj,

>From reading code, I am reasonably certain that no data loss occurred - the
producer reset the batch sequence numbers, and then tried again.

I refer you to this comment in the code of the producer's Sender:

// tell the user the result of their request. We only
adjust sequence numbers if the batch didn't exhaust
// its retries -- if it did, we don't know whether the
sequence number was accepted or not, and
// thus it is not safe to reassign the sequence.

So yeah, the fact that you see it tweaking sequence numbers is because the
batch hasn't exhausted its retry limit, so the sequence number is adjusted,
and then sending the batch is tried again.

That said, if you want to be sure, and can replicate this issue, you could
perhaps modify the code and use the producer send method overload that
accepts a callback which advises of success or failure when a batch is
completed successfully or fatally, and then log any exceptions occurring
there.

Disclaimer - I'm unfamiliar with these internals of producers, so my
reading of the code could be wrong, so I strongly welcome correction from
other mailing list members if that's the case.

Cheers,

Liam Clarke


On Mon, 4 Apr 2022 at 16:29, Neeraj Vaidya
 wrote:

>  Hi Liam,
> Brokers are on Apache Kafka v2.7.0
> However, the Producer client is using the v2.6 libraries.
>
> Regards,
> Neeraj On Monday, 4 April, 2022, 02:17:42 pm GMT+10, Liam
> Clarke-Hutchinson  wrote:
>
>  Hi Neeraj,
>
> Not sure just yet, I'm diving into the code to find out. Oh, what version
> Kafka are you running please?
>
> Cheers,
>
> Liam
>
> On Mon, 4 Apr 2022 at 14:50, Neeraj Vaidya
>  wrote:
>
> >  Hi Liam,
> > Thanks for getting back.
> >
> > 1) Producer settings ( I am guessing these are the ones you are
> interested
> > in)
> > enable.idempotence=true
> > max.in.flight.requests.per.connection=5
> >
> > 2) Sample broker logs corresponding to the timestamp in the application
> > logs of the Producer
> >
> > [2022-04-03 15:56:39,587] ERROR [ReplicaManager broker=5] Error
> processing
> > append operation on partition input-topic-114
> (kafka.server.ReplicaManager)
> > org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid
> > sequence number for new epoch at offset 967756 in partition
> > input-topic-114: 158 (request epoch), 3 (seq. number)
> >
> > Do the producer errors indicate that these messages never made it to the
> > Kafka topic at all ?
> >
> > Regards,
> > Neeraj
> >  On Monday, 4 April, 2022, 12:23:30 pm GMT+10, Liam
> Clarke-Hutchinson <
> > lclar...@redhat.com> wrote:
> >
> >  Hi Neeraj,
> >
> > First off, what are your producer settings?
> > Secondly, do you have brokers logs for the leaders of some of your
> affected
> > topics on hand at all?
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> > On Mon, 4 Apr 2022 at 14:04, Neeraj Vaidya
> >  wrote:
> >
> > > Hi All,
> > > For one of the Kafka producers that I have, I see that the Producer
> > Record
> > > Error rate is non-zero i.e. out of the expected 3000 messages per
> second
> > > which I a expect to be producing to the topic, I can see that this
> metric
> > > shows a rate of about 200.
> > > Does this indicate that the records failed to be sent to the Kafka
> topic,
> > > or does this metric show up even for each retry in the Producer.Send
> > > operation ?
> > >
> > > Notes :
> > > 1) I have distributed  8 brokers equally across 2 sites. Using
> > > rack-awareness, I am making Kafka position replicas equally across both
> > > sites. My min.isr=2 and replication factor = 4. This makes 2 replicas
> to
> > be
> > > located in each site.
> > > 2) The scenario I am testing is that of shutting down a set of 4
> brokers
> > > in one site (out of 8) for an extended period of time and then bringing
> > > them back up after say 2 hours. This causes the the follower replicas
> on
> > > those brokers to try and catch-up with the leader replicas on the other
> > > brokers. The error rate that I am referring to shows up under this
> > scenario
> > > of restarting the brokers. It does not show up when I have just the
> other
> > > set of (4) brokers.
> > >
> > > To be specific, here are the errors that I see in the Kafka producer
> log
> > > file:
> > >
> > > 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
> > > o.a.k.c.p.i.Sender

Re: Kafka Producer Record Error Rate

2022-04-03 Thread Liam Clarke-Hutchinson
Hi Neeraj,

Not sure just yet, I'm diving into the code to find out. Oh, what version
Kafka are you running please?

Cheers,

Liam

On Mon, 4 Apr 2022 at 14:50, Neeraj Vaidya
 wrote:

>  Hi Liam,
> Thanks for getting back.
>
> 1) Producer settings ( I am guessing these are the ones you are interested
> in)
> enable.idempotence=true
> max.in.flight.requests.per.connection=5
>
> 2) Sample broker logs corresponding to the timestamp in the application
> logs of the Producer
>
> [2022-04-03 15:56:39,587] ERROR [ReplicaManager broker=5] Error processing
> append operation on partition input-topic-114 (kafka.server.ReplicaManager)
> org.apache.kafka.common.errors.OutOfOrderSequenceException: Invalid
> sequence number for new epoch at offset 967756 in partition
> input-topic-114: 158 (request epoch), 3 (seq. number)
>
> Do the producer errors indicate that these messages never made it to the
> Kafka topic at all ?
>
> Regards,
> Neeraj
>  On Monday, 4 April, 2022, 12:23:30 pm GMT+10, Liam Clarke-Hutchinson <
> lclar...@redhat.com> wrote:
>
>  Hi Neeraj,
>
> First off, what are your producer settings?
> Secondly, do you have brokers logs for the leaders of some of your affected
> topics on hand at all?
>
> Cheers,
>
> Liam Clarke-Hutchinson
>
> On Mon, 4 Apr 2022 at 14:04, Neeraj Vaidya
>  wrote:
>
> > Hi All,
> > For one of the Kafka producers that I have, I see that the Producer
> Record
> > Error rate is non-zero i.e. out of the expected 3000 messages per second
> > which I a expect to be producing to the topic, I can see that this metric
> > shows a rate of about 200.
> > Does this indicate that the records failed to be sent to the Kafka topic,
> > or does this metric show up even for each retry in the Producer.Send
> > operation ?
> >
> > Notes :
> > 1) I have distributed  8 brokers equally across 2 sites. Using
> > rack-awareness, I am making Kafka position replicas equally across both
> > sites. My min.isr=2 and replication factor = 4. This makes 2 replicas to
> be
> > located in each site.
> > 2) The scenario I am testing is that of shutting down a set of 4 brokers
> > in one site (out of 8) for an extended period of time and then bringing
> > them back up after say 2 hours. This causes the the follower replicas on
> > those brokers to try and catch-up with the leader replicas on the other
> > brokers. The error rate that I am referring to shows up under this
> scenario
> > of restarting the brokers. It does not show up when I have just the other
> > set of (4) brokers.
> >
> > To be specific, here are the errors that I see in the Kafka producer log
> > file:
> >
> > 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
> > o.a.k.c.p.i.Sender  : [Producer clientId=producer-1]
> > Got error produce response with correlation id 16512434 on
> topic-partition
> > input-topic-114, retrying (2147483646 attempts left). Error:
> > OUT_OF_ORDER_SEQUENCE_NUMBER
> > 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
> > o.a.k.c.p.i.Sender  : [Producer clientId=producer-1]
> > Got error produce response with correlation id 16512434 on
> topic-partition
> > input-topic-58, retrying (2147483646 attempts left). Error:
> > OUT_OF_ORDER_SEQUENCE_NUMBER
> > 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> > o.a.k.c.p.i.TransactionManager  : [Producer clientId=producer-1]
> > ProducerId set to 2040 with epoch 159
> > 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> > o.a.k.c.p.i.ProducerBatch: Resetting sequence number of
> > batch with current sequence 3 for partition input-topic-114 to 0
> > 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> > o.a.k.c.p.i.ProducerBatch: Resetting sequence number of
> > batch with current sequence 5 for partition input-topic-114 to 2
> > 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> > o.a.k.c.p.i.ProducerBatch: Resetting sequence number of
> > batch with current sequence 6 for partition input-topic-114 to 3
> > 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> > o.a.k.c.p.i.ProducerBatch: Resetting sequence number of
> > batch with current sequence 1 for partition input-topic-58 to 0
> > 2022-04-03 15:56:39.739  WARN --- [-thread | producer-1]
> > o.a.k.c.p.i.Sender  : [Producer clientId=producer-1]
> > Got error produce response with correlation id 16512436 on
> topic-partition
> > input-topic-82, retrying (2147483646 attempts left). Error:
> > OUT_OF_ORDER_SEQUENCE_NUMBER
> >
> > Regards,
> > Neeraj
> >
> >
>


Re: Kafka Producer Record Error Rate

2022-04-03 Thread Liam Clarke-Hutchinson
Hi Neeraj,

First off, what are your producer settings?
Secondly, do you have brokers logs for the leaders of some of your affected
topics on hand at all?

Cheers,

Liam Clarke-Hutchinson

On Mon, 4 Apr 2022 at 14:04, Neeraj Vaidya
 wrote:

> Hi All,
> For one of the Kafka producers that I have, I see that the Producer Record
> Error rate is non-zero i.e. out of the expected 3000 messages per second
> which I a expect to be producing to the topic, I can see that this metric
> shows a rate of about 200.
> Does this indicate that the records failed to be sent to the Kafka topic,
> or does this metric show up even for each retry in the Producer.Send
> operation ?
>
> Notes :
> 1) I have distributed  8 brokers equally across 2 sites. Using
> rack-awareness, I am making Kafka position replicas equally across both
> sites. My min.isr=2 and replication factor = 4. This makes 2 replicas to be
> located in each site.
> 2) The scenario I am testing is that of shutting down a set of 4 brokers
> in one site (out of 8) for an extended period of time and then bringing
> them back up after say 2 hours. This causes the the follower replicas on
> those brokers to try and catch-up with the leader replicas on the other
> brokers. The error rate that I am referring to shows up under this scenario
> of restarting the brokers. It does not show up when I have just the other
> set of (4) brokers.
>
> To be specific, here are the errors that I see in the Kafka producer log
> file:
>
> 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
> o.a.k.c.p.i.Sender   : [Producer clientId=producer-1]
> Got error produce response with correlation id 16512434 on topic-partition
> input-topic-114, retrying (2147483646 attempts left). Error:
> OUT_OF_ORDER_SEQUENCE_NUMBER
> 2022-04-03 15:56:39.613  WARN --- [-thread | producer-1]
> o.a.k.c.p.i.Sender   : [Producer clientId=producer-1]
> Got error produce response with correlation id 16512434 on topic-partition
> input-topic-58, retrying (2147483646 attempts left). Error:
> OUT_OF_ORDER_SEQUENCE_NUMBER
> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> o.a.k.c.p.i.TransactionManager   : [Producer clientId=producer-1]
> ProducerId set to 2040 with epoch 159
> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> o.a.k.c.p.i.ProducerBatch: Resetting sequence number of
> batch with current sequence 3 for partition input-topic-114 to 0
> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> o.a.k.c.p.i.ProducerBatch: Resetting sequence number of
> batch with current sequence 5 for partition input-topic-114 to 2
> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> o.a.k.c.p.i.ProducerBatch: Resetting sequence number of
> batch with current sequence 6 for partition input-topic-114 to 3
> 2022-04-03 15:56:39.613  INFO --- [-thread | producer-1]
> o.a.k.c.p.i.ProducerBatch: Resetting sequence number of
> batch with current sequence 1 for partition input-topic-58 to 0
> 2022-04-03 15:56:39.739  WARN --- [-thread | producer-1]
> o.a.k.c.p.i.Sender   : [Producer clientId=producer-1]
> Got error produce response with correlation id 16512436 on topic-partition
> input-topic-82, retrying (2147483646 attempts left). Error:
> OUT_OF_ORDER_SEQUENCE_NUMBER
>
> Regards,
> Neeraj
>
>


Re: Newbie looking for a connector I can configure on my mac

2022-03-30 Thread Liam Clarke-Hutchinson
Thanks Andy, and glad I could help somewhat :) I'll start writing something
up over the next week, and will share the link here for feedback :)

Just a caveat, the Filestream Connector could be considered a security risk
in a production environment given its direct access to the filesystem. Not
that you're looking to use this in prod, but just wanted to be sure I
mentioned it.

On Thu, 31 Mar 2022 at 09:24, andrew davidson 
wrote:

> Thanks Liam.
>
> What is 'FOSS Kafka'? google did not find any useful definitions
>
> A tutorial would be great! I would be happy to provide feed back
>
>
>
> https://docs.confluent.io/platform/current/connect/filestream_connector.html
> looks really helpful.
>
> Your list of connectors is very helpful. I wonder if your email would be a
> good thing to add to the kafka documentation quick start guide or FAQ?
>
> Kind regards
>
> Andy
>
> On 3/29/22, 5:27 PM, "Liam Clarke-Hutchinson" 
> wrote:
>
> Hi Andrew,
>
> So if you've downloaded Apache Kafka, you can run a standalone connect
> instance using the bin/connect-standalone.sh script mentioned. And
> while a
> lot of the connector documentation is on the Confluent website, you can
> still use them with FOSS Kafka so long as you're in line with the
> Confluent
> Community Licence (basically, IIRC, you can use them for free, but not
> to
> run a SAAS or similar that competes with Confluent, but IANAL).
>
> I agree that there's not much useful documentation for your use case. I
> will look into writing a tutorial for your use case, would you be
> happy to
> give me feedback on it as I go?
>
> The most important configuration initially is the plugin.path, where
> your
> standalone KC process will look for those JARs. You can see an example
> properties file for standalone Connect under the config/ dir in the
> Kafka
> you downloaded. Note that it has the plugin path commented out
> initially.
>
> So, Kafka ships with a connector that exposes a file source and file
> sink,
> which is good for testing out KC and getting used to it. You can either
> build it from source, or download it from here:
> https://mvnrepository.com/artifact/org.apache.kafka/connect-file -
> choose
> the version that matches the version of Kafka you've downloaded, and
> then
> you can download the JAR under files up the top. This documentation
> from
> Confluent is useful:
>
> https://docs.confluent.io/platform/current/connect/filestream_connector.html
>
> Note that if you don't provide a file property, (this isn't documented
> either(!)) it will use standard input for the file source, and standard
> output for the file sink. You can see example configs for this
> connector
> reading from a file or console under that same config/ directory, and
> ditto
> for writing.
>
> These connectors might also be useful for playing with KC, and are all
> free
> and downloadable:
> https://www.confluent.io/hub/confluentinc/kafka-connect-datagen <-
> generates a stream of test data
> https://www.confluent.io/hub/jcustenborder/kafka-connect-twitter <-
> disregard, I saw you mentioned not having Twitter
> https://www.confluent.io/hub/C0urante/kafka-connect-reddit <- I
> haven't
> used this, but could be interesting?
>
>
> I hope this helps you get started, and please let me know if I can help
> with anything else :)
>
> Cheers,
>
> Liam Clarke
>
>
>
> On Wed, 30 Mar 2022 at 11:54, andrew davidson <
> a...@santacruzanalytics.com>
> wrote:
>
> > I found the quick start https://kafka.apache.org/quickstart example
> very
> > helpful. It made it really easy to understand how download, start up,
> > create topic, push some data through the Kafka. I did not find
> > https://kafka.apache.org/quickstart#quickstart_kafkaconnect useful.
> >
> > I am looking for something very simple to  learning how to configure
> and
> > use connectors using Apache Kafka distribution, not Confluent. I can
> run on
> > my mac or Linux server. Being a newbie I want to keep things super
> simple.
> > I do not want to have to debug firewalls, ACL, …
> >
> > I do not have a data base, access to twitter, …
> >
> > I thought maybe something some sort source/sink using the local file
> > system?
> >
> > Any suggestions?
> >
> > Kind regards
> >
> > Andy
> >
> > p.s. I have read a lot of documentation most of it is very high
> level. Can
> > anyone recommend a “hand on” tutorial?
> >
> >
> >
>


Re: Newbie looking for a connector I can configure on my mac

2022-03-29 Thread Liam Clarke-Hutchinson
Hi Andrew Otto (Andrew 2? :D ),

Well noted. I should have worded it as "You don't need a Confluent
subscription to use them" :)

Cheers,

Liam


On Wed, 30 Mar 2022 at 13:49, Andrew Otto  wrote:

> > And while a lot of the connector documentation is on the Confluent
> website, you can still use them with FOSS Kafka so long as you're in line
> with the Confluent Community Licence
>
> *Drive by unhelpful comment:*
> While this is true legally, the fact that (most?) actual connector
> implementations are CCL and not FOSS, means that organizations that use
> purely FOSS software (like the Wikimedia Foundation) makes Kafka Connect
> effectively unusable.
>
> Okay carry on! :)
>
> - Andrew Otto
>
>
>
>
>
> On Tue, Mar 29, 2022 at 8:27 PM Liam Clarke-Hutchinson <
> lclar...@redhat.com>
> wrote:
>
> > Hi Andrew,
> >
> > So if you've downloaded Apache Kafka, you can run a standalone connect
> > instance using the bin/connect-standalone.sh script mentioned. And while
> a
> > lot of the connector documentation is on the Confluent website, you can
> > still use them with FOSS Kafka so long as you're in line with the
> Confluent
> > Community Licence (basically, IIRC, you can use them for free, but not to
> > run a SAAS or similar that competes with Confluent, but IANAL).
> >
> > I agree that there's not much useful documentation for your use case. I
> > will look into writing a tutorial for your use case, would you be happy
> to
> > give me feedback on it as I go?
> >
> > The most important configuration initially is the plugin.path, where your
> > standalone KC process will look for those JARs. You can see an example
> > properties file for standalone Connect under the config/ dir in the Kafka
> > you downloaded. Note that it has the plugin path commented out initially.
> >
> > So, Kafka ships with a connector that exposes a file source and file
> sink,
> > which is good for testing out KC and getting used to it. You can either
> > build it from source, or download it from here:
> > https://mvnrepository.com/artifact/org.apache.kafka/connect-file -
> choose
> > the version that matches the version of Kafka you've downloaded, and then
> > you can download the JAR under files up the top. This documentation from
> > Confluent is useful:
> >
> >
> https://docs.confluent.io/platform/current/connect/filestream_connector.html
> >
> > Note that if you don't provide a file property, (this isn't documented
> > either(!)) it will use standard input for the file source, and standard
> > output for the file sink. You can see example configs for this connector
> > reading from a file or console under that same config/ directory, and
> ditto
> > for writing.
> >
> > These connectors might also be useful for playing with KC, and are all
> free
> > and downloadable:
> > https://www.confluent.io/hub/confluentinc/kafka-connect-datagen <-
> > generates a stream of test data
> > https://www.confluent.io/hub/jcustenborder/kafka-connect-twitter <-
> > disregard, I saw you mentioned not having Twitter
> > https://www.confluent.io/hub/C0urante/kafka-connect-reddit <- I haven't
> > used this, but could be interesting?
> >
> >
> > I hope this helps you get started, and please let me know if I can help
> > with anything else :)
> >
> > Cheers,
> >
> > Liam Clarke
> >
> >
> >
> > On Wed, 30 Mar 2022 at 11:54, andrew davidson <
> a...@santacruzanalytics.com
> > >
> > wrote:
> >
> > > I found the quick start https://kafka.apache.org/quickstart example
> very
> > > helpful. It made it really easy to understand how download, start up,
> > > create topic, push some data through the Kafka. I did not find
> > > https://kafka.apache.org/quickstart#quickstart_kafkaconnect useful.
> > >
> > > I am looking for something very simple to  learning how to configure
> and
> > > use connectors using Apache Kafka distribution, not Confluent. I can
> run
> > on
> > > my mac or Linux server. Being a newbie I want to keep things super
> > simple.
> > > I do not want to have to debug firewalls, ACL, …
> > >
> > > I do not have a data base, access to twitter, …
> > >
> > > I thought maybe something some sort source/sink using the local file
> > > system?
> > >
> > > Any suggestions?
> > >
> > > Kind regards
> > >
> > > Andy
> > >
> > > p.s. I have read a lot of documentation most of it is very high level.
> > Can
> > > anyone recommend a “hand on” tutorial?
> > >
> > >
> > >
> >
>


Re: Newbie looking for a connector I can configure on my mac

2022-03-29 Thread Liam Clarke-Hutchinson
Hi Andrew,

So if you've downloaded Apache Kafka, you can run a standalone connect
instance using the bin/connect-standalone.sh script mentioned. And while a
lot of the connector documentation is on the Confluent website, you can
still use them with FOSS Kafka so long as you're in line with the Confluent
Community Licence (basically, IIRC, you can use them for free, but not to
run a SAAS or similar that competes with Confluent, but IANAL).

I agree that there's not much useful documentation for your use case. I
will look into writing a tutorial for your use case, would you be happy to
give me feedback on it as I go?

The most important configuration initially is the plugin.path, where your
standalone KC process will look for those JARs. You can see an example
properties file for standalone Connect under the config/ dir in the Kafka
you downloaded. Note that it has the plugin path commented out initially.

So, Kafka ships with a connector that exposes a file source and file sink,
which is good for testing out KC and getting used to it. You can either
build it from source, or download it from here:
https://mvnrepository.com/artifact/org.apache.kafka/connect-file - choose
the version that matches the version of Kafka you've downloaded, and then
you can download the JAR under files up the top. This documentation from
Confluent is useful:
https://docs.confluent.io/platform/current/connect/filestream_connector.html

Note that if you don't provide a file property, (this isn't documented
either(!)) it will use standard input for the file source, and standard
output for the file sink. You can see example configs for this connector
reading from a file or console under that same config/ directory, and ditto
for writing.

These connectors might also be useful for playing with KC, and are all free
and downloadable:
https://www.confluent.io/hub/confluentinc/kafka-connect-datagen <-
generates a stream of test data
https://www.confluent.io/hub/jcustenborder/kafka-connect-twitter <-
disregard, I saw you mentioned not having Twitter
https://www.confluent.io/hub/C0urante/kafka-connect-reddit <- I haven't
used this, but could be interesting?


I hope this helps you get started, and please let me know if I can help
with anything else :)

Cheers,

Liam Clarke



On Wed, 30 Mar 2022 at 11:54, andrew davidson 
wrote:

> I found the quick start https://kafka.apache.org/quickstart example very
> helpful. It made it really easy to understand how download, start up,
> create topic, push some data through the Kafka. I did not find
> https://kafka.apache.org/quickstart#quickstart_kafkaconnect useful.
>
> I am looking for something very simple to  learning how to configure and
> use connectors using Apache Kafka distribution, not Confluent. I can run on
> my mac or Linux server. Being a newbie I want to keep things super simple.
> I do not want to have to debug firewalls, ACL, …
>
> I do not have a data base, access to twitter, …
>
> I thought maybe something some sort source/sink using the local file
> system?
>
> Any suggestions?
>
> Kind regards
>
> Andy
>
> p.s. I have read a lot of documentation most of it is very high level. Can
> anyone recommend a “hand on” tutorial?
>
>
>


Re: Setting up the CooperativeStickyAssignor in Java

2022-03-19 Thread Liam Clarke-Hutchinson
Hi Richard,

Yeah, the old 2.8.1 version of Kafka clients used by trunk fs2-kafka is
what I think might be the issue, not the wrapper itself, sorry if I was
unclear on that.

Please let us know how your testing with the latest fs2-kafka that's using
3.1.0 goes. :)

Kind regards,

Liam Clarke-Hutchinson

On Sun, 20 Mar 2022 at 07:19, Richard Ney  wrote:

> I am using the kafka-clients through the fs2-kafka wrapper. Thou the log
> message I posted and copied again here
>
> Notifying assignor about the new Assignment(partitions=[
> platform-data.appquery-platform.aoav3.backfill-28,
> platform-data.appquery-platform.aoav3.backfill-31,
> platform-data.appquery-platform.aoav3.backfill-34,
> platform-data.appquery-platform.aoav3.backfill-37,
> platform-data.appquery-platform.aoav3.backfill-40,
> platform-data.appquery-platform.aoav3.backfill-43,
> platform-data.appquery-platform.aoav3.backfill-46,
>
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>
> is not generated by the fs2-kafka wrapper. Based on the kafka-client code;
> these are the log messages generated during the initial partition
> assignment when my application connects to the Kafka brokers. If this list
> contained the full list of partitions listed in the kafka-consumer-groups
> output but the lag was increasing on 2 partitions, I would immediately
> suspect the fs2-kafka wrapper as the issue. The fact that the notification
> messages from the kafka-clients library to the fs2-kafka library are
> missing two partitions makes me suspect the issue is in the kafka-clients
> library. In this occurrence this happened on 2 of the 5 consumer instances.
> The version of the kafka-clients library used by the version of the
> fs2-kafka library for this test is *2.8.1*. I'm currently running another
> test with the latest fs2-kafka library which is consuming the *3.1.0*
> version of the kafka-clients library. Initial partition assignment was
> successful. On Monday I'll do a large number of scale-up/scale-down tests
> to force rebalancing of partitions to see if I can replicate the issue
> using the latest version.
>
> On Sat, Mar 19, 2022 at 2:06 AM Liam Clarke-Hutchinson <
> lclar...@redhat.com>
> wrote:
>
> > So to clarify, you're using kafka-clients directly? Or via fx2-kafka? If
> > it's kafka-clients directly, what version please?
> >
> > On Sat, 19 Mar 2022 at 19:59, Richard Ney 
> wrote:
> >
> > > Hi Liam,
> > >
> > > Sorry for the mis-identification in the last email. The fun of
> answering
> > an
> > > email on a phone instead of a desktop. I confirmed the upper log
> > messages I
> > > included in the message come from this location in the `kafka-clients`
> > > library
> > >
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L422
> > >
> > > And it's only including 8 of the 10 partitions that were assigned to
> that
> > > consumer instance.
> > >
> > > -Richard
> > >
> > > On Fri, Mar 18, 2022 at 11:15 PM Richard Ney 
> > > wrote:
> > >
> > > > Hi Ngā mihi,
> > > >
> > > > I believe the log entry I included was from the underlying
> > kafka-clients
> > > > library given that the logger identified is
> > > > “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”.
> I’ll
> > > > admit at first I thought it also might be the fs2-kafka wrapper given
> > > that
> > > > the 2.4.0 version is the first version that has correct support for
> the
> > > > messaging from the ConsumerCoordinator. I am planning to do a test
> with
> > > the
> > > > 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients
> > > library
> > > > and will let the list know.
> > > >
> > > > -Richard Ney
> > > >
> > > > Sent from my iPhone
> > > >
> > > > > On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <
> > > > lclar...@redhat.com> wrote:
> > > > >
> > > > > Kia ora Richard,
> > > > >
> > > > > I see support for the Cooperative Sticky Assignor in fs2-kafka is
> > quite
> > > > > new. Have you discussed this issue with the community of that
> client
> > at
> > > > > a

Re: Setting up the CooperativeStickyAssignor in Java

2022-03-19 Thread Liam Clarke-Hutchinson
So to clarify, you're using kafka-clients directly? Or via fx2-kafka? If
it's kafka-clients directly, what version please?

On Sat, 19 Mar 2022 at 19:59, Richard Ney  wrote:

> Hi Liam,
>
> Sorry for the mis-identification in the last email. The fun of answering an
> email on a phone instead of a desktop. I confirmed the upper log messages I
> included in the message come from this location in the `kafka-clients`
> library
>
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L422
>
> And it's only including 8 of the 10 partitions that were assigned to that
> consumer instance.
>
> -Richard
>
> On Fri, Mar 18, 2022 at 11:15 PM Richard Ney 
> wrote:
>
> > Hi Ngā mihi,
> >
> > I believe the log entry I included was from the underlying kafka-clients
> > library given that the logger identified is
> > “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”. I’ll
> > admit at first I thought it also might be the fs2-kafka wrapper given
> that
> > the 2.4.0 version is the first version that has correct support for the
> > messaging from the ConsumerCoordinator. I am planning to do a test with
> the
> > 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients
> library
> > and will let the list know.
> >
> > -Richard Ney
> >
> > Sent from my iPhone
> >
> > > On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <
> > lclar...@redhat.com> wrote:
> > >
> > > Kia ora Richard,
> > >
> > > I see support for the Cooperative Sticky Assignor in fs2-kafka is quite
> > > new. Have you discussed this issue with the community of that client at
> > > all? I ask because I see on GitHub that fs2-kafka is using
> kafka-clients
> > > 2.8.1 as the underlying client, and there's been a fair few bugfixes
> > around
> > > the cooperative sticky assignor since that version.
> > >
> > > Could you perhaps try overriding the kafka-clients dependency of
> > fs2-kafka
> > > and try a higher version, perhaps 3.1.0, and see if the issue remains?
> > I'm
> > > not sure how well that'll work, but might help narrow down the issue.
> > >
> > > Ngā mihi,
> > >
> > > Liam Clarke-Hutchinson
> > >
> > >> On Sat, 19 Mar 2022 at 14:24, Richard Ney 
> > wrote:
> > >>
> > >> Thanks for the additional information Bruno. Does this look like a
> > possible
> > >> bug in the CooperativeStickyAssignor? I have 5 consumers reading from
> a
> > 50
> > >> partition topic. Based on the log messages this application instance
> is
> > >> only getting assigned 8 partitions but when I ask the consumer group
> for
> > >> LAG information the consumer group thinks the correct number of 10
> > >> partitions were assigned but as should 2 partitions aren't getting
> read
> > due
> > >> to the application not knowing it has them.
> > >>
> > >> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
> > >> instanceId=i-0e89c9bee06f71f68,
> > >>
> > clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > >> groupId=app-query-platform-aoa-backfill-v7] Updating assignment
> with\n\t
> > >> Assigned partitions: [
> > >> platform-data.appquery-platform.aoav3.backfill-28,
> > >> platform-data.appquery-platform.aoav3.backfill-43,
> > >> platform-data.appquery-platform.aoav3.backfill-31,
> > >> platform-data.appquery-platform.aoav3.backfill-46,
> > >> platform-data.appquery-platform.aoav3.backfill-34,
> > >> platform-data.appquery-platform.aoav3.backfill-49,
> > >> platform-data.appquery-platform.aoav3.backfill-40,
> > >> platform-data.appquery-platform.aoav3.backfill-37] \n\t
> > >> Current owned partitions:  []\n\t
> > >>
> > >> Added partitions (assigned - owned):   [
> > >> platform-data.appquery-platform.aoav3.backfill-28,
> > >> platform-data.appquery-platform.aoav3.backfill-43,
> > >> platform-data.appquery-platform.aoav3.backfill-31,
> > >> platform-data.appquery-platform.aoav3.backfill-46,
> > >> platform-data.appquery-platform.aoav3.backfill-34,
> > >> platform-data.appquery-platform.aoav3.backfill-49,
> > >> platform-data.appquery-platform.aoav3.backfill-40,
> > >> platform-data.appquery-platform.aoav

Re: Setting up the CooperativeStickyAssignor in Java

2022-03-18 Thread Liam Clarke-Hutchinson
Kia ora Richard,

I see support for the Cooperative Sticky Assignor in fs2-kafka is quite
new. Have you discussed this issue with the community of that client at
all? I ask because I see on GitHub that fs2-kafka is using kafka-clients
2.8.1 as the underlying client, and there's been a fair few bugfixes around
the cooperative sticky assignor since that version.

Could you perhaps try overriding the kafka-clients dependency of fs2-kafka
and try a higher version, perhaps 3.1.0, and see if the issue remains? I'm
not sure how well that'll work, but might help narrow down the issue.

Ngā mihi,

Liam Clarke-Hutchinson

On Sat, 19 Mar 2022 at 14:24, Richard Ney  wrote:

> Thanks for the additional information Bruno. Does this look like a possible
> bug in the CooperativeStickyAssignor? I have 5 consumers reading from a 50
> partition topic. Based on the log messages this application instance is
> only getting assigned 8 partitions but when I ask the consumer group for
> LAG information the consumer group thinks the correct number of 10
> partitions were assigned but as should 2 partitions aren't getting read due
> to the application not knowing it has them.
>
> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
> instanceId=i-0e89c9bee06f71f68,
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> groupId=app-query-platform-aoa-backfill-v7] Updating assignment with\n\t
> Assigned partitions: [
>  platform-data.appquery-platform.aoav3.backfill-28,
>  platform-data.appquery-platform.aoav3.backfill-43,
>  platform-data.appquery-platform.aoav3.backfill-31,
>  platform-data.appquery-platform.aoav3.backfill-46,
>  platform-data.appquery-platform.aoav3.backfill-34,
>  platform-data.appquery-platform.aoav3.backfill-49,
>  platform-data.appquery-platform.aoav3.backfill-40,
>  platform-data.appquery-platform.aoav3.backfill-37] \n\t
> Current owned partitions:  []\n\t
>
> Added partitions (assigned - owned):   [
> platform-data.appquery-platform.aoav3.backfill-28,
> platform-data.appquery-platform.aoav3.backfill-43,
> platform-data.appquery-platform.aoav3.backfill-31,
> platform-data.appquery-platform.aoav3.backfill-46,
> platform-data.appquery-platform.aoav3.backfill-34,
> platform-data.appquery-platform.aoav3.backfill-49,
> platform-data.appquery-platform.aoav3.backfill-40,
> platform-data.appquery-platform.aoav3.backfill-37]\n\t
>
> Revoked partitions (owned - assigned):
>
> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>
>
> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
> instanceId=i-0e89c9bee06f71f68,
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> groupId=app-query-platform-aoa-backfill-v7]
>
> Notifying assignor about the new Assignment(partitions=[
> platform-data.appquery-platform.aoav3.backfill-28,
> platform-data.appquery-platform.aoav3.backfill-31,
> platform-data.appquery-platform.aoav3.backfill-34,
> platform-data.appquery-platform.aoav3.backfill-37,
> platform-data.appquery-platform.aoav3.backfill-40,
> platform-data.appquery-platform.aoav3.backfill-43,
> platform-data.appquery-platform.aoav3.backfill-46,
>
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>
> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
> instanceId=i-0e89c9bee06f71f68,
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> groupId=app-query-platform-aoa-backfill-v7]
>
> Adding newly assigned partitions:
> platform-data.appquery-platform.aoav3.backfill-28,
> platform-data.appquery-platform.aoav3.backfill-43,
> platform-data.appquery-platform.aoav3.backfill-31,
> platform-data.appquery-platform.aoav3.backfill-46,
> platform-data.appquery-platform.aoav3.backfill-34,
> platform-data.appquery-platform.aoav3.backfill-49,
> platform-data.appquery-platform.aoav3.backfill-40,
>
> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>
> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*
>
> GROUP  TOPIC
>PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> CONSUMER-ID 

Re: Compression - producer vs topic?

2022-03-15 Thread Liam Clarke-Hutchinson
Oh, and meant to say, zstd is a good compromise between CPU and compression
ratio, IIRC it was far less costly on CPU than gzip.

So yeah, I generally recommend setting your topic's compression to
"producer", and then going from there.

On Wed, 16 Mar 2022 at 11:49, Liam Clarke-Hutchinson 
wrote:

> Sounds like a goer then :) Those strings in the protobuf always get ya,
> can't use clever encodings for them like you can with numbers.
>
> On Wed, 16 Mar 2022 at 11:29, Dan Hill  wrote:
>
>> We're using protos but there are still a bunch of custom fields where
>> clients specify redundant strings.
>>
>> My local test is showing 75% reduction in size if I use zstd or gzip.  I
>> care the most about Kafka storage costs right now.
>>
>> On Tue, Mar 15, 2022 at 2:25 PM Liam Clarke-Hutchinson <
>> lclar...@redhat.com>
>> wrote:
>>
>> > Hi Dan,
>> >
>> > Okay, so if you're looking for low latency, I'm guessing that you're
>> using
>> > a very low linger.ms in the producers? Also, what format are the
>> records?
>> > If they're already in a binary format like Protobuf or Avro, unless
>> they're
>> > composed largely of strings, compression may offer little benefit.
>> >
>> > With your small records, I'd suggest running some tests with your
>> current
>> > config with different compression settings - none, snappy, lz4, (don't
>> > bother with gzip unless that's all you have) and checking producer
>> metrics
>> > (available via JMX if you're using the Java clients) for avg-batch-size
>> and
>> > compression-ratio.
>> >
>> > You may just wish to start with no compression, and then consider
>> moving to
>> > it if/when network bandwidth becomes a bottleneck.
>> >
>> > Regards,
>> >
>> > Liam
>> >
>> > On Tue, 15 Mar 2022 at 17:05, Dan Hill  wrote:
>> >
>> > > Thanks, Liam!
>> > >
>> > > I have a mixture of Kafka record size.  10% are large (>100kbs) and
>> 90%
>> > of
>> > > the records are smaller than 1kb.  I'm working on a streaming
>> analytics
>> > > solution that streams impressions, user actions and serving info and
>> > > combines them together.  End-to-end latency is more important than
>> > storage
>> > > size.
>> > >
>> > >
>> > > On Mon, Mar 14, 2022 at 3:27 PM Liam Clarke-Hutchinson <
>> > > lclar...@redhat.com>
>> > > wrote:
>> > >
>> > > > Hi Dan,
>> > > >
>> > > > Decompression generally only happens in the broker if the topic has
>> a
>> > > > particular compression algorithm set, and the producer is using a
>> > > different
>> > > > one - then the broker will decompress records from the producer,
>> then
>> > > > recompress it using the topic's configured algorithm. (The
>> LogCleaner
>> > > will
>> > > > also decompress then recompress records when compacting compressed
>> > > topics).
>> > > >
>> > > > The consumer decompresses compressed record batches it receives.
>> > > >
>> > > > In my opinion, using topic compression instead of producer
>> compression
>> > > > would only make sense if the overhead of a few more CPU cycles
>> > > compression
>> > > > uses was not tolerable for the producing app. In all of my use
>> cases,
>> > > > network throughput becomes a bottleneck long before producer
>> > compression
>> > > > CPU cost does.
>> > > >
>> > > > For your "if X, do Y" formulation I'd say - if your producer is
>> sending
>> > > > tiny batches, do some analysis of compressed vs. uncompressed size
>> for
>> > > your
>> > > > given compression algorithm - you may find that compression overhead
>> > > > increases batch size for tiny batches.
>> > > >
>> > > > If you're sending a large amount of data, do tune your batching and
>> use
>> > > > compression to reduce data being sent over the wire.
>> > > >
>> > > > If you can tell us more about what your problem domain, there might
>> be
>> > > more
>> > > > advice that's applicable :)
>> > > >
>> > > > Cheers,
>> > > >
>> > > > Liam Clarke-Hutchinson
&

Re: Compression - producer vs topic?

2022-03-15 Thread Liam Clarke-Hutchinson
Sounds like a goer then :) Those strings in the protobuf always get ya,
can't use clever encodings for them like you can with numbers.

On Wed, 16 Mar 2022 at 11:29, Dan Hill  wrote:

> We're using protos but there are still a bunch of custom fields where
> clients specify redundant strings.
>
> My local test is showing 75% reduction in size if I use zstd or gzip.  I
> care the most about Kafka storage costs right now.
>
> On Tue, Mar 15, 2022 at 2:25 PM Liam Clarke-Hutchinson <
> lclar...@redhat.com>
> wrote:
>
> > Hi Dan,
> >
> > Okay, so if you're looking for low latency, I'm guessing that you're
> using
> > a very low linger.ms in the producers? Also, what format are the
> records?
> > If they're already in a binary format like Protobuf or Avro, unless
> they're
> > composed largely of strings, compression may offer little benefit.
> >
> > With your small records, I'd suggest running some tests with your current
> > config with different compression settings - none, snappy, lz4, (don't
> > bother with gzip unless that's all you have) and checking producer
> metrics
> > (available via JMX if you're using the Java clients) for avg-batch-size
> and
> > compression-ratio.
> >
> > You may just wish to start with no compression, and then consider moving
> to
> > it if/when network bandwidth becomes a bottleneck.
> >
> > Regards,
> >
> > Liam
> >
> > On Tue, 15 Mar 2022 at 17:05, Dan Hill  wrote:
> >
> > > Thanks, Liam!
> > >
> > > I have a mixture of Kafka record size.  10% are large (>100kbs) and 90%
> > of
> > > the records are smaller than 1kb.  I'm working on a streaming analytics
> > > solution that streams impressions, user actions and serving info and
> > > combines them together.  End-to-end latency is more important than
> > storage
> > > size.
> > >
> > >
> > > On Mon, Mar 14, 2022 at 3:27 PM Liam Clarke-Hutchinson <
> > > lclar...@redhat.com>
> > > wrote:
> > >
> > > > Hi Dan,
> > > >
> > > > Decompression generally only happens in the broker if the topic has a
> > > > particular compression algorithm set, and the producer is using a
> > > different
> > > > one - then the broker will decompress records from the producer, then
> > > > recompress it using the topic's configured algorithm. (The LogCleaner
> > > will
> > > > also decompress then recompress records when compacting compressed
> > > topics).
> > > >
> > > > The consumer decompresses compressed record batches it receives.
> > > >
> > > > In my opinion, using topic compression instead of producer
> compression
> > > > would only make sense if the overhead of a few more CPU cycles
> > > compression
> > > > uses was not tolerable for the producing app. In all of my use cases,
> > > > network throughput becomes a bottleneck long before producer
> > compression
> > > > CPU cost does.
> > > >
> > > > For your "if X, do Y" formulation I'd say - if your producer is
> sending
> > > > tiny batches, do some analysis of compressed vs. uncompressed size
> for
> > > your
> > > > given compression algorithm - you may find that compression overhead
> > > > increases batch size for tiny batches.
> > > >
> > > > If you're sending a large amount of data, do tune your batching and
> use
> > > > compression to reduce data being sent over the wire.
> > > >
> > > > If you can tell us more about what your problem domain, there might
> be
> > > more
> > > > advice that's applicable :)
> > > >
> > > > Cheers,
> > > >
> > > > Liam Clarke-Hutchinson
> > > >
> > > > On Tue, 15 Mar 2022 at 10:05, Dan Hill 
> wrote:
> > > >
> > > > > Hi.  I looked around for advice about Kafka compression.  I've seen
> > > mixed
> > > > > and conflicting advice.
> > > > >
> > > > > Is there any sorta "if X, do Y" type of documentation around Kafka
> > > > > compression?
> > > > >
> > > > > Any advice?  Any good posts to read that talk about this trade off?
> > > > >
> > > > > *Detailed comments*
> > > > > I tried looking for producer vs topic compression.  I didn't find
> > much.
> > > > > Some of the information I see is back from 2011 (which I'm guessing
> > is
> > > > > pretty stale).
> > > > >
> > > > > I can guess some potential benefits but I don't know if they are
> > > actually
> > > > > real.  I've also seen some sites claim certain trade offs but it's
> > > > unclear
> > > > > if they're true.
> > > > >
> > > > > It looks like I can modify an existing topic's compression.  I
> don't
> > > know
> > > > > if that actually works.  I'd assume it'd just impact data going
> > > forward.
> > > > >
> > > > > I've seen multiple sites say that decompression happens in the
> broker
> > > and
> > > > > multiple that say it happens in the consumer.
> > > > >
> > > >
> > >
> >
>


Re: Compression - producer vs topic?

2022-03-15 Thread Liam Clarke-Hutchinson
Hi Dan,

Okay, so if you're looking for low latency, I'm guessing that you're using
a very low linger.ms in the producers? Also, what format are the records?
If they're already in a binary format like Protobuf or Avro, unless they're
composed largely of strings, compression may offer little benefit.

With your small records, I'd suggest running some tests with your current
config with different compression settings - none, snappy, lz4, (don't
bother with gzip unless that's all you have) and checking producer metrics
(available via JMX if you're using the Java clients) for avg-batch-size and
compression-ratio.

You may just wish to start with no compression, and then consider moving to
it if/when network bandwidth becomes a bottleneck.

Regards,

Liam

On Tue, 15 Mar 2022 at 17:05, Dan Hill  wrote:

> Thanks, Liam!
>
> I have a mixture of Kafka record size.  10% are large (>100kbs) and 90% of
> the records are smaller than 1kb.  I'm working on a streaming analytics
> solution that streams impressions, user actions and serving info and
> combines them together.  End-to-end latency is more important than storage
> size.
>
>
> On Mon, Mar 14, 2022 at 3:27 PM Liam Clarke-Hutchinson <
> lclar...@redhat.com>
> wrote:
>
> > Hi Dan,
> >
> > Decompression generally only happens in the broker if the topic has a
> > particular compression algorithm set, and the producer is using a
> different
> > one - then the broker will decompress records from the producer, then
> > recompress it using the topic's configured algorithm. (The LogCleaner
> will
> > also decompress then recompress records when compacting compressed
> topics).
> >
> > The consumer decompresses compressed record batches it receives.
> >
> > In my opinion, using topic compression instead of producer compression
> > would only make sense if the overhead of a few more CPU cycles
> compression
> > uses was not tolerable for the producing app. In all of my use cases,
> > network throughput becomes a bottleneck long before producer compression
> > CPU cost does.
> >
> > For your "if X, do Y" formulation I'd say - if your producer is sending
> > tiny batches, do some analysis of compressed vs. uncompressed size for
> your
> > given compression algorithm - you may find that compression overhead
> > increases batch size for tiny batches.
> >
> > If you're sending a large amount of data, do tune your batching and use
> > compression to reduce data being sent over the wire.
> >
> > If you can tell us more about what your problem domain, there might be
> more
> > advice that's applicable :)
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> > On Tue, 15 Mar 2022 at 10:05, Dan Hill  wrote:
> >
> > > Hi.  I looked around for advice about Kafka compression.  I've seen
> mixed
> > > and conflicting advice.
> > >
> > > Is there any sorta "if X, do Y" type of documentation around Kafka
> > > compression?
> > >
> > > Any advice?  Any good posts to read that talk about this trade off?
> > >
> > > *Detailed comments*
> > > I tried looking for producer vs topic compression.  I didn't find much.
> > > Some of the information I see is back from 2011 (which I'm guessing is
> > > pretty stale).
> > >
> > > I can guess some potential benefits but I don't know if they are
> actually
> > > real.  I've also seen some sites claim certain trade offs but it's
> > unclear
> > > if they're true.
> > >
> > > It looks like I can modify an existing topic's compression.  I don't
> know
> > > if that actually works.  I'd assume it'd just impact data going
> forward.
> > >
> > > I've seen multiple sites say that decompression happens in the broker
> and
> > > multiple that say it happens in the consumer.
> > >
> >
>


Re: Compression - producer vs topic?

2022-03-14 Thread Liam Clarke-Hutchinson
Hi Dan,

Decompression generally only happens in the broker if the topic has a
particular compression algorithm set, and the producer is using a different
one - then the broker will decompress records from the producer, then
recompress it using the topic's configured algorithm. (The LogCleaner will
also decompress then recompress records when compacting compressed topics).

The consumer decompresses compressed record batches it receives.

In my opinion, using topic compression instead of producer compression
would only make sense if the overhead of a few more CPU cycles compression
uses was not tolerable for the producing app. In all of my use cases,
network throughput becomes a bottleneck long before producer compression
CPU cost does.

For your "if X, do Y" formulation I'd say - if your producer is sending
tiny batches, do some analysis of compressed vs. uncompressed size for your
given compression algorithm - you may find that compression overhead
increases batch size for tiny batches.

If you're sending a large amount of data, do tune your batching and use
compression to reduce data being sent over the wire.

If you can tell us more about what your problem domain, there might be more
advice that's applicable :)

Cheers,

Liam Clarke-Hutchinson

On Tue, 15 Mar 2022 at 10:05, Dan Hill  wrote:

> Hi.  I looked around for advice about Kafka compression.  I've seen mixed
> and conflicting advice.
>
> Is there any sorta "if X, do Y" type of documentation around Kafka
> compression?
>
> Any advice?  Any good posts to read that talk about this trade off?
>
> *Detailed comments*
> I tried looking for producer vs topic compression.  I didn't find much.
> Some of the information I see is back from 2011 (which I'm guessing is
> pretty stale).
>
> I can guess some potential benefits but I don't know if they are actually
> real.  I've also seen some sites claim certain trade offs but it's unclear
> if they're true.
>
> It looks like I can modify an existing topic's compression.  I don't know
> if that actually works.  I'd assume it'd just impact data going forward.
>
> I've seen multiple sites say that decompression happens in the broker and
> multiple that say it happens in the consumer.
>


Re: Few partitions stuck in under replication

2022-03-07 Thread Liam Clarke-Hutchinson
Hi Dhirendra, so after looking into your stack trace further, it shows that
the AlterISRRequest is failing when trying to interact with ZooKeeper. It
doesn't give us more information as to why currently.

Can you please set some loggers to DEBUG level to help analyse the issue
further?

These ones:

- kafka.zk.KafkaZkClient
- kafka.utils.ReplicationUtils
- kafka.server.ZkIsrManager


If you can do that and then share any log output that would be great :)

Cheers,

Liam

On Tue, 8 Mar 2022 at 17:50, Dhirendra Singh  wrote:

> Hi Thomas,
> I see the IllegalStateException but as i pasted earlier it is
> java.lang.IllegalStateException: Failed to enqueue `AlterIsr` request
> with state LeaderAndIsr(leader=1, leaderEpoch=2728, isr=List(1,
> 2), zkVersion=4719) for partition __consumer_offsets-4
>
> I upgraded to version 2.8.1 but issue is not resolved.
>
> Thanks,
> Dhirendra.
>
> On Mon, Mar 7, 2022 at 5:45 AM Liam Clarke-Hutchinson  >
> wrote:
>
> > Ah, I may have seen this error before. Dhirendra Singh, If you grep your
> > logs, you may find an IllegalStateException or two.
> >
> > https://issues.apache.org/jira/browse/KAFKA-12948
> >
> > You need to upgrade to 2.7.2 if this is the issue you're hitting.
> >
> > Kind regards,
> >
> > Liam Clarke-Hutchinson
> >
> > On Sun, 6 Mar 2022 at 04:30, Mailbox - Dhirendra Kumar Singh <
> > dhirendr...@gmail.com> wrote:
> >
> > > Let me rephrase my issue.
> > > Issue occur when broker loose connectivity to zookeeper server.
> > > Connectivity loss can happen due to many reasons…zookeeper servers
> > getting
> > > bounced, due to some network glitch etc…
> > >
> > > After the brokers reconnect to zookeeper server I expect the kafka
> > cluster
> > > to come back in stable state by itself without any manual intervention.
> > > but instead few partitions remain under replicated due to the error I
> > > pasted earlier.
> > >
> > > I feel this is some kind of bug. I am going to file a bug.
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Dhirendra.
> > >
> > >
> > >
> > > From: Thomas Cooper 
> > > Sent: Friday, March 4, 2022 7:01 PM
> > > To: Dhirendra Singh 
> > > Cc: users@kafka.apache.org
> > > Subject: Re: Few partitions stuck in under replication
> > >
> > >
> > >
> > > Do you roll the controller last?
> > >
> > > I suspect this is more to do with the way you are rolling the cluster
> > > (which I am still not clear on the need for) rather than some kind of
> bug
> > > in Kafka (though that could of course be the case).
> > >
> > > Tom
> > >
> > > On 04/03/2022 01:59, Dhirendra Singh wrote:
> > >
> > > Hi Tom,
> > > During the rolling restart we check for under replicated partition
> count
> > > to be zero in the readiness probe before restarting the next POD in
> > order.
> > > This issue never occurred before. It started after we upgraded kafka
> > > version from 2.5.0 to 2.7.1.
> > > So i suspect some bug introduced in the version after 2.5.0.
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Dhirendra.
> > >
> > >
> > >
> > > On Thu, Mar 3, 2022 at 11:09 PM Thomas Cooper  >  > > c...@tomcooper.dev> > wrote:
> > >
> > > I suspect this nightly rolling will have something to do with your
> > issues.
> > > If you are just rolling the stateful set in order, with no dependence
> on
> > > maintaining minISR and other Kafka considerations you are going to hit
> > > issues.
> > >
> > > If you are running on Kubernetes I would suggest using an Operator like
> > > Strimzi <https://strimzi.io/>  which will do a lot of the Kafka admin
> > > tasks like this for you automatically.
> > >
> > > Tom
> > >
> > > On 03/03/2022 16:28, Dhirendra Singh wrote:
> > >
> > > Hi Tom,
> > >
> > > Doing the nightly restart is the decision of the cluster admin. I have
> no
> > > control on it.
> > > We have implementation using stateful set. restart is triggered by
> > > updating a annotation in the pod.
> > > Issue is not triggered by kafka cluster restart but the zookeeper
> servers
> > > restart.
> > >
> > > Thanks,
> > >
> > > Dhirendra.
> > >
> > >
> > >
> > > O

Re: consumer hpa autoscaling

2022-03-06 Thread Liam Clarke-Hutchinson
> I was trying to see what the goals of enabling Hpa on the consumer would
be. Since like you say there is a partition upper limit which will limit
the consumer throughput. so in the end you have to tweak partitions on
kafka and then reassess the maxReplicas config of hpa.
It seems hpa in this scenario would help more around costs than operations
around the app.

Yep, the goal of using an HPA with 1 to N instances of a consuming app is
to scale consumers out at peak load, and then scale them down when load's a
lot lower.
It helps meet any data timeliness requirements you might have during high
load, and as you said, reduces costs during low load.

On Sat, 5 Mar 2022 at 07:09, David Ballano Fernandez <
dfernan...@demonware.net> wrote:

> HI Liam,
>
> I was trying to see what the goals of enabling Hpa on the consumer would
> be. Since like you say there is a partition upper limit which will limit
> the consumer throughput. so in the end you have to tweak partitions on
> kafka and then reassess the maxReplicas config of hpa.
> It seems hpa in this scenario would help more around costs than operations
> around the app.
>
> Maybe there is a way to build your own algorithm to figure out max/min
> replicas and other fanciness depending on partitions (via an operator) etc.
>
> But I wonder if you would still end up in the same boat, plus does it make
> sense to over engineer this when in the end you might have to add
> partitions manually? That is why I like HPA, since it's "simple" and you
> can easily understand the behaviour.
> The behaviour of this app like you say is seasonal. so it has peaks and
> troughs everyday so there are some benefits to running Hpa there.
>
> About consumer group rebalances, yeah I get what you mean. I did tweak some
> scale up/down policies to make it smoother. The app seems fine but I might
> enable cooperative-sticky just to see if that helps a bit more. but so far
> I am not seeing a negative impact on the app.
>
> this is what i am using on hpa so far, nothing complex:
>
> spec:
>   scaleTargetRef:
> apiVersion: apps/v1
> kind: Deployment
> name: app-staging-test
>   minReplicas: 56
>   maxReplicas: 224
>   behavior:
> scaleUp:
>   stabilizationWindowSeconds: 60
>   policies:
>   - type: Percent
> value: 100
> periodSeconds: 60
>   metrics:
> - resource:
> name: cpu
>     target:
>   averageUtilization: 30
>   type: Utilization
>   type: Resource
>
>
>
> Thanks!
>
> On Wed, Mar 2, 2022 at 12:15 AM Liam Clarke-Hutchinson <
> lclar...@redhat.com>
> wrote:
>
> > Hi David,
> >
> > Scaling on CPU can be fine, what you scale on depends on what resource
> > constrains your consuming application. CPU is a good proxy for "I'm
> working
> > really hard", so not a bad one to start with.
> >
> > Main thing to be aware of is tuning the HPA to minimise scaling that
> causes
> > "stop-the-world" consumer group rebalances, the documentation I linked
> > earlier offers good advice. But you'll need to determine what is the best
> > way to configure your HPA based on your particular workloads - in other
> > words, a lot of trial and error. :)
> >
> > In terms of "everything is tied to partition number", there is an obvious
> > upper limit when scaling consumers in a consumer group - if you have 20
> > partitions on a topic, a consumer group consuming from that topic will
> only
> > increase throughput when scaling up to 20 instances. If you have 30
> > instances, 10 instances won't be assigned partitions unless some of the
> > other instances fail.
> >
> > However, the real advantage of an HPA is in reducing cost / load,
> > especially in a cloud environment - if the throughput on a given topic is
> > low, and one consumer can easily handle all 20 partitions, then you're
> > wasting money running 19 other instances. But if throughput suddenly
> > increases, the HPA will let your consumer instances scale up
> automatically,
> > and then scal down when the throughput drops again.
> >
> > It really depends on how throughput on your topic varies - if you're
> > working in a domain where throughtput shows high seasonality over the day
> > (e.g., at 4am in the morning, no-one is using your website, at 8pm,
> > everyone is using it) then an HPA approach is ideal. But, as I said,
> you'll
> > need to tune how your HPA scales to prevent repeated scaling up and down
> > that interferes with the consumer group over all.
> >
> > If you have any more details on what problem you're try

Re: Few partitions stuck in under replication

2022-03-06 Thread Liam Clarke-Hutchinson
Ah, I may have seen this error before. Dhirendra Singh, If you grep your
logs, you may find an IllegalStateException or two.

https://issues.apache.org/jira/browse/KAFKA-12948

You need to upgrade to 2.7.2 if this is the issue you're hitting.

Kind regards,

Liam Clarke-Hutchinson

On Sun, 6 Mar 2022 at 04:30, Mailbox - Dhirendra Kumar Singh <
dhirendr...@gmail.com> wrote:

> Let me rephrase my issue.
> Issue occur when broker loose connectivity to zookeeper server.
> Connectivity loss can happen due to many reasons…zookeeper servers getting
> bounced, due to some network glitch etc…
>
> After the brokers reconnect to zookeeper server I expect the kafka cluster
> to come back in stable state by itself without any manual intervention.
> but instead few partitions remain under replicated due to the error I
> pasted earlier.
>
> I feel this is some kind of bug. I am going to file a bug.
>
>
>
> Thanks,
>
> Dhirendra.
>
>
>
> From: Thomas Cooper 
> Sent: Friday, March 4, 2022 7:01 PM
> To: Dhirendra Singh 
> Cc: users@kafka.apache.org
> Subject: Re: Few partitions stuck in under replication
>
>
>
> Do you roll the controller last?
>
> I suspect this is more to do with the way you are rolling the cluster
> (which I am still not clear on the need for) rather than some kind of bug
> in Kafka (though that could of course be the case).
>
> Tom
>
> On 04/03/2022 01:59, Dhirendra Singh wrote:
>
> Hi Tom,
> During the rolling restart we check for under replicated partition count
> to be zero in the readiness probe before restarting the next POD in order.
> This issue never occurred before. It started after we upgraded kafka
> version from 2.5.0 to 2.7.1.
> So i suspect some bug introduced in the version after 2.5.0.
>
>
>
> Thanks,
>
> Dhirendra.
>
>
>
> On Thu, Mar 3, 2022 at 11:09 PM Thomas Cooper  c...@tomcooper.dev> > wrote:
>
> I suspect this nightly rolling will have something to do with your issues.
> If you are just rolling the stateful set in order, with no dependence on
> maintaining minISR and other Kafka considerations you are going to hit
> issues.
>
> If you are running on Kubernetes I would suggest using an Operator like
> Strimzi <https://strimzi.io/>  which will do a lot of the Kafka admin
> tasks like this for you automatically.
>
> Tom
>
> On 03/03/2022 16:28, Dhirendra Singh wrote:
>
> Hi Tom,
>
> Doing the nightly restart is the decision of the cluster admin. I have no
> control on it.
> We have implementation using stateful set. restart is triggered by
> updating a annotation in the pod.
> Issue is not triggered by kafka cluster restart but the zookeeper servers
> restart.
>
> Thanks,
>
> Dhirendra.
>
>
>
> On Thu, Mar 3, 2022 at 7:19 PM Thomas Cooper  c...@tomcooper.dev> > wrote:
>
> Hi Dhirenda,
>
> Firstly, I am interested in why are you restarting the ZK and Kafka
> cluster every night?
>
> Secondly, how are you doing the restarts. For example, in [Strimzi](
> https://strimzi.io/), when we roll the Kafka cluster we leave the
> designated controller broker until last. For each of the other brokers we
> wait until all the partitions they are leaders for are above their minISR
> and then we roll the broker. In this way we maintain availability and make
> sure leadership can move off the rolling broker temporarily.
>
> Cheers,
>
> Tom Cooper
>
> [@tomncooper](https://twitter.com/tomncooper) | https://tomcooper.dev
>
> On 03/03/2022 07:38, Dhirendra Singh wrote:
>
> > Hi All,
> >
> > We have kafka cluster running in kubernetes. kafka version we are using
> is
> > 2.7.1.
> > Every night zookeeper servers and kafka brokers are restarted.
> > After the nightly restart of the zookeeper servers some partitions remain
> > stuck in under replication. This happens randomly but not at every
> nightly
> > restart.
> > Partitions remain under replicated until kafka broker with the partition
> > leader is restarted.
> > For example partition 4 of consumer_offsets topic remain under replicated
> > and we see following error in the log...
> >
> > [2022-02-28 04:01:20,217] WARN [Partition __consumer_offsets-4 broker=1]
> > Controller failed to update ISR to PendingExpandIsr(isr=Set(1),
> > newInSyncReplicaId=2) due to unexpected UNKNOWN_SERVER_ERROR. Retrying.
> > (kafka.cluster.Partition)
> > [2022-02-28 04:01:20,217] ERROR [broker-1-to-controller] Uncaught error
> in
> > request completion: (org.apache.kafka.clients.NetworkClient)
> > java.lang.IllegalStateException: Failed to enqueue `AlterIsr` request
> with
> > state LeaderAndIsr

Re: consumer hpa autoscaling

2022-03-02 Thread Liam Clarke-Hutchinson
Hi David,

Scaling on CPU can be fine, what you scale on depends on what resource
constrains your consuming application. CPU is a good proxy for "I'm working
really hard", so not a bad one to start with.

Main thing to be aware of is tuning the HPA to minimise scaling that causes
"stop-the-world" consumer group rebalances, the documentation I linked
earlier offers good advice. But you'll need to determine what is the best
way to configure your HPA based on your particular workloads - in other
words, a lot of trial and error. :)

In terms of "everything is tied to partition number", there is an obvious
upper limit when scaling consumers in a consumer group - if you have 20
partitions on a topic, a consumer group consuming from that topic will only
increase throughput when scaling up to 20 instances. If you have 30
instances, 10 instances won't be assigned partitions unless some of the
other instances fail.

However, the real advantage of an HPA is in reducing cost / load,
especially in a cloud environment - if the throughput on a given topic is
low, and one consumer can easily handle all 20 partitions, then you're
wasting money running 19 other instances. But if throughput suddenly
increases, the HPA will let your consumer instances scale up automatically,
and then scal down when the throughput drops again.

It really depends on how throughput on your topic varies - if you're
working in a domain where throughtput shows high seasonality over the day
(e.g., at 4am in the morning, no-one is using your website, at 8pm,
everyone is using it) then an HPA approach is ideal. But, as I said, you'll
need to tune how your HPA scales to prevent repeated scaling up and down
that interferes with the consumer group over all.

If you have any more details on what problem you're trying to solve, I
might be able to give more specific advice.

TL;DR - I've found using HPAs to scale applications in the same consumer
group is very useful, but it needs to be tuned to minimise scaling that can
cause pauses in consumption.

Kind regards,

Liam Clarke-Hutchinson



On Wed, 2 Mar 2022 at 13:14, David Ballano Fernandez <
dfernan...@demonware.net> wrote:

> Thanks Liam,
>
> I am trying hpa but using cpu utilization, but since everything is tied to
> partition number etc i wonder what the benefits of running on hpa really
> are.
>
> thanks!
>
> On Mon, Feb 28, 2022 at 12:59 PM Liam Clarke-Hutchinson <
> lclar...@redhat.com>
> wrote:
>
> > I've used HPAs scaling on lag before by feeding lag metrics from
> Prometheus
> > into the K8s metrics server as custom metrics.
> >
> > That said, you need to carefully control scaling frequency to avoid
> > excessive consumer group rebalances. The cooperative sticky assignor can
> > minimise pauses, but not remove them entirely.
> >
> > There's a lot of knobs you can use to tune HPAs these days:
> >
> >
> https://urldefense.proofpoint.com/v2/url?u=https-3A__kubernetes.io_docs_tasks_run-2Dapplication_horizontal-2Dpod-2Dautoscale_-23configurable-2Dscaling-2Dbehavior=DwIBaQ=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4=p-f3AJg4e4Uk20g_16kSyBtabT4JOB-1GIb23_CxD58=dQzp4x9JZe-7YZcgrSl3YrB3X7PYTM_bS4caOQ59hLLonNXE0x3TveYTXVAFcxco=_NU3o8FG8CwNpe8wl3mVxXkNeEx_9aCD2_md1riEZa0=
> >
> > Good luck :)
> >
> >
> >
> > On Tue, 1 Mar 2022 at 08:49, David Ballano Fernandez <
> > dfernan...@demonware.net> wrote:
> >
> > > Hello Guys,
> > >
> > > I was wondering how you guys do autoscaling of you consumers in
> > kubernetes
> > > if you do any.
> > >
> > > We have a mirrormaker-like app that mirrors data from cluster to
> cluster
> > at
> > > the same time does some topic routing.  I would like to add hpa to the
> > app
> > > in order to scale up/down depending on avg cpu. but as you know  a
> > consumer
> > > app has lots of variables being partitions of topics being consumed  a
> > > pretty important one.
> > >
> > > Since kubernetes checks cpu avg, there are chances that pods/consumers
> > > won't be scaled up to the  number of partitions possibly creating some
> > hot
> > > spots.
> > >
> > > Anyways i would like to know how you deal if you do at all with this.
> > >
> > > thanks!
> > >
> >
>


Re: consumer hpa autoscaling

2022-02-28 Thread Liam Clarke-Hutchinson
I've used HPAs scaling on lag before by feeding lag metrics from Prometheus
into the K8s metrics server as custom metrics.

That said, you need to carefully control scaling frequency to avoid
excessive consumer group rebalances. The cooperative sticky assignor can
minimise pauses, but not remove them entirely.

There's a lot of knobs you can use to tune HPAs these days:
https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#configurable-scaling-behavior

Good luck :)



On Tue, 1 Mar 2022 at 08:49, David Ballano Fernandez <
dfernan...@demonware.net> wrote:

> Hello Guys,
>
> I was wondering how you guys do autoscaling of you consumers in kubernetes
> if you do any.
>
> We have a mirrormaker-like app that mirrors data from cluster to cluster at
> the same time does some topic routing.  I would like to add hpa to the app
> in order to scale up/down depending on avg cpu. but as you know  a consumer
> app has lots of variables being partitions of topics being consumed  a
> pretty important one.
>
> Since kubernetes checks cpu avg, there are chances that pods/consumers
> won't be scaled up to the  number of partitions possibly creating some hot
> spots.
>
> Anyways i would like to know how you deal if you do at all with this.
>
> thanks!
>


Re: Huge latency at consumer side ,testing performance for production and consumption

2022-01-28 Thread Liam Clarke-Hutchinson
Hi Jigar,

Your image attachment didn't come through again.

Thanks,

Liam

On Fri, 28 Jan 2022, 5:35 pm Jigar Shah,  wrote:

> Hello again,
> Could someone please provide feedback on these findings ?
> Thank you in advance for feedback.
>
> *Regards,*
> *Jigar*
>
>
>
> On Mon, 17 Jan 2022 at 13:24, Jigar Shah  wrote:
>
>> Hello again,
>> I had performed a few more tests on producer and consumer again and I
>> observed a pattern in Kafka Producer creating large latency.
>> Could you please confirm that my understanding is correct about the
>> producer protocol?
>>
>> The configurations are the same as above.
>>
>> The producer is continuously producing messages into kafka topic, using
>> the default producer partitioner creating messages in random
>> topic-partitions
>>
>> The workflow of protocol according to my understanding is:
>> 1. First connection from producer to a broker (1 out of 3) in the cluster
>> to fetch metadata.
>> 2. If the partition to produce is located on the same broker then
>>a. Re-use the existing connection to produce messages.
>> 3. Else if the partition to produce is located on one of other brokers
>> then
>>a. Create a new connection
>>b. Fetch again metadata.
>>c. Produce the message using the new connection
>>
>> After analysis, I assume the latency is caused at step *3.a & 3.b *when
>> the partition selected is on the other two brokers.  Such peaks are
>> observed during initial part of test only
>> [image: image.png]
>> Thank you in advance for feedback.
>>
>> *Regards,*
>> *Jigar*
>>
>>
>> On Wed, 15 Dec 2021 at 10:53, Jigar Shah 
>> wrote:
>>
>>> Hello,
>>> I agree with time taken for consumer initialization processes
>>> But actually in the test I am taking care of that and I am waiting for
>>> the consumer to be initiated and only then starting the producer to
>>> discount the initialization delay.
>>> So, are there any more processes happening during the poll of consumers
>>> for the first few messages?
>>>
>>> Thank you
>>>
>>> On Mon, 13 Dec 2021 at 18:33, Luke Chen  wrote:
>>>
>>>> Hi Jigar,
>>>>
>>>> As Liam mentioned, those are necessary consumer initialization
>>>> processes.
>>>> So, I don't think you can speed it up by altering some timeouts/interval
>>>> properties.
>>>> Is there any reason why you need to care about the initial delay?
>>>> If, like you said, the delay won't happen later on, I think the cost
>>>> will
>>>> be amortized.
>>>>
>>>>
>>>> Thank you.
>>>> Luke
>>>>
>>>>
>>>> On Mon, Dec 13, 2021 at 4:59 PM Jigar Shah 
>>>> wrote:
>>>>
>>>> > Hello ,
>>>> > Answering your first mail, indeed I am using consumer groups using
>>>> > group.id
>>>> > , I must have missed to add it in mentioned properties
>>>> > Also, thank you for information regarding the internal processes
>>>> working
>>>> > behind creating a KafkaConsumer.
>>>> > I agree that following steps do add latency during initial connection
>>>> > creation.But can it be somehow optimised(reduced) ,by altering some
>>>> > timeouts/interval properties, could you please suggest those?
>>>> >
>>>> > Thank you
>>>> >
>>>> > On Mon, 13 Dec 2021 at 12:05, Liam Clarke-Hutchinson <
>>>> lclar...@redhat.com>
>>>> > wrote:
>>>> >
>>>> > > I realise that's a silly question, you must be if you're using auto
>>>> > commit.
>>>> > >
>>>> > > When a consumer starts, it needs to do a few things.
>>>> > >
>>>> > > 1) Connect to a bootstrap server
>>>> > >
>>>> > > 2) Join an existing consumer group, or create a new one, if it
>>>> doesn't
>>>> > > exist. This may cause a stop the world rebalance as partitions are
>>>> > > reassigned within the group.
>>>> > >
>>>> > > 3) Acquire metadata - which brokers are the partition leaders for my
>>>> > > assigned partitions on? And what offsets am I consuming from?
>>>> > >
>>>> > > 4) Establish the long lived connections to those brokers.
&g

Re: Huge latency at consumer side ,testing performance for production and consumption

2021-12-12 Thread Liam Clarke-Hutchinson
I realise that's a silly question, you must be if you're using auto commit.

When a consumer starts, it needs to do a few things.

1) Connect to a bootstrap server

2) Join an existing consumer group, or create a new one, if it doesn't
exist. This may cause a stop the world rebalance as partitions are
reassigned within the group.

3) Acquire metadata - which brokers are the partition leaders for my
assigned partitions on? And what offsets am I consuming from?

4) Establish the long lived connections to those brokers.

5) Send fetch requests

(I might not have the order correct)

So yeah, this is why you're seeing that initial delay before consuming
records.

Kind regards,

Liam Clarke-Hutchinson

On Mon, 13 Dec 2021, 7:19 pm Liam Clarke-Hutchinson, 
wrote:

> Hi,
>
> I'm assuming you're using consumer groups? E.g., group.id=X
>
> Cheers,
>
> Liam
>
> On Mon, 13 Dec 2021, 6:30 pm Jigar Shah,  wrote:
>
>> Hello,
>> I am trying to test the latency between message production and message
>> consumption using Java Kafka-Client*(2.7.2)* library.
>> The configuration of cluster is 3 KafkaBrokers*(2.7.2, Scala 2.13)*, 3
>> Zookeeper*(3.5.9)*
>> Here is a pattern what I have observed
>> Reference:
>>  ConsumerReadTimeStamp: Timestamp when record received in Kafka Consumer
>>  ProducerTimeStamp: Timestamp added before producer.send record
>>  RecordTimeStamp: CreateTimeStamp inside the record obtained at consumer
>>
>> [image: kafka1.png]
>>
>> *For 100 Messages*
>>
>> *ConsumerReadTimeStamp-ProducerTimeStamp(ms)*
>>
>> *ConsumerReadTimeStamp-RecordTimeStamp(ms)*
>>
>> *Average*
>>
>> *252.56*
>>
>> *238.85*
>>
>> *Max*
>>
>> *2723*
>>
>> *2016*
>>
>> *Min*
>>
>> *125*
>>
>> *125*
>>
>>
>> On the consumer side it takes too much time for initial few messages but
>> later on it is quite consistent.
>> I have executed the above same test for large number of messages :
>> 100,1000,1,etc. and the pattern seems to be same
>> Here are the configurations, mostly using default properties.
>> Topic:
>>   partitions=16
>>   min.insync.replica=2
>>   replication.factor=3
>>
>>
>> Consumer:
>>   security.protocol=PLAINTEXT
>>   enable.auto.commit=true
>>
>>
>> Producer:
>>   security.protocol=PLAINTEXT
>>   compression.type=gzip
>>   acks=all
>>
>>
>> Is there any reason why there is huge latency at the beginning when a
>> consumer is created please?
>> Also please suggest some way to optimise configurations to have some
>> better consistent results ?
>>
>> Thank you in advance for your feedback.
>>
>


Re: Huge latency at consumer side ,testing performance for production and consumption

2021-12-12 Thread Liam Clarke-Hutchinson
Hi,

I'm assuming you're using consumer groups? E.g., group.id=X

Cheers,

Liam

On Mon, 13 Dec 2021, 6:30 pm Jigar Shah,  wrote:

> Hello,
> I am trying to test the latency between message production and message
> consumption using Java Kafka-Client*(2.7.2)* library.
> The configuration of cluster is 3 KafkaBrokers*(2.7.2, Scala 2.13)*, 3
> Zookeeper*(3.5.9)*
> Here is a pattern what I have observed
> Reference:
>  ConsumerReadTimeStamp: Timestamp when record received in Kafka Consumer
>  ProducerTimeStamp: Timestamp added before producer.send record
>  RecordTimeStamp: CreateTimeStamp inside the record obtained at consumer
>
> [image: kafka1.png]
>
> *For 100 Messages*
>
> *ConsumerReadTimeStamp-ProducerTimeStamp(ms)*
>
> *ConsumerReadTimeStamp-RecordTimeStamp(ms)*
>
> *Average*
>
> *252.56*
>
> *238.85*
>
> *Max*
>
> *2723*
>
> *2016*
>
> *Min*
>
> *125*
>
> *125*
>
>
> On the consumer side it takes too much time for initial few messages but
> later on it is quite consistent.
> I have executed the above same test for large number of messages :
> 100,1000,1,etc. and the pattern seems to be same
> Here are the configurations, mostly using default properties.
> Topic:
>   partitions=16
>   min.insync.replica=2
>   replication.factor=3
>
>
> Consumer:
>   security.protocol=PLAINTEXT
>   enable.auto.commit=true
>
>
> Producer:
>   security.protocol=PLAINTEXT
>   compression.type=gzip
>   acks=all
>
>
> Is there any reason why there is huge latency at the beginning when a
> consumer is created please?
> Also please suggest some way to optimise configurations to have some
> better consistent results ?
>
> Thank you in advance for your feedback.
>


Re: New on security on Apache Kafka

2021-11-14 Thread Liam Clarke-Hutchinson
Hi Giuseppe,

That class was replaced in Kafka 2.4, I think, with
kafka.security.authorizer.AclAuthorizer.

Cheers,

Liam Clarke-Hutchinson

On Sat, Nov 13, 2021 at 1:15 AM Giuseppe Ricci Sysman 
wrote:

> Hi,
>
>
>
> I'm new on security in Apache Kafka. I have Apache Kafka (v. 2.13-3.0.0)
> installed on a remote Ubuntu server. I need to secure the communications
> with producer-kafka broker and kafka broker-consumer.
>
> I try to follow the tutorial on Kafka documentation:
>
>
>
> https://kafka.apache.org/documentation/#security_overview
>
>
>
> and this tutorial which is more detailed:
>
>
>
>
> https://medium.com/egen/securing-kafka-cluster-using-sasl-acl-and-ssl-dec15b
> 439f9d
> <https://medium.com/egen/securing-kafka-cluster-using-sasl-acl-and-ssl-dec15b439f9d>
>
>
>
> but when I try to restart kafka server with the commands:
>
>
>
> export
>
> KAFKA_OPTS=-Djava.security.auth.login.config=/home/kafka/Downloads/kafka_2.1
> 3-3.0.0/config/kafka_server_jaas.conf
>
> ./bin/kafka-server-start.sh ./config/server.properties
>
>
>
> I receive the error:
>
>
>
> kafka@kafka2:~/Downloads/kafka_2.13-3.0.0$
> <mailto:kafka@kafka2:~/Downloads/kafka_2.13-3.0.0$>  sudo
> ./bin/kafka-server-start.sh ./config/server.properties
>
> [2021-11-12 11:45:46,995] INFO Registered kafka:type=kafka.Log4jController
> MBean (kafka.utils.Log4jControllerRegistration$)
>
> [2021-11-12 11:45:47,183] INFO Setting -D
> jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated
> TLS renegotiation (org.apache.zookeeper.common.X509Util)
>
> [2021-11-12 11:45:47,192] ERROR Exiting Kafka due to fatal exception
> (kafka.Kafka$)
>
> java.lang.ClassNotFoundException: kafka.security.auth.SimpleAclAuthorizer
>
> at
>
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoade
> r.java:606)
>
> at
>
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoa
> ders.java:168)
>
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>
> at java.base/java.lang.Class.forName0(Native Method)
>
> at java.base/java.lang.Class.forName(Class.java:468)
>
> at org.apache.kafka.common.utils.Utils.loadClass(Utils.java:417)
>
> at org.apache.kafka.common.utils.Utils.newInstance(Utils.java:406)
>
> at
>
> kafka.security.authorizer.AuthorizerUtils$.createAuthorizer(AuthorizerUtils.
> scala:31)
>
> at kafka.server.KafkaConfig.(KafkaConfig.scala:1583)
>
> at kafka.server.KafkaConfig.(KafkaConfig.scala:1394)
>
> at kafka.Kafka$.buildServer(Kafka.scala:67)
>
> at kafka.Kafka$.main(Kafka.scala:87)
>
> at kafka.Kafka.main(Kafka.scala)
>
>
>
> It seems the class SimpleAclAuthorizer is not found.
>
> Can it be to a wrong configuration?
>
>
>
> These are my SSL configs in the file server.properties:
>
>
>
> ### SECURITY using SCRAM-SHA-512 and SSL ###
>
>
> listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093,SASL_SS
> L://localhost:9094
>
>
> advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9
> 093,SASL_SSL://localhost:9094
>
> security.inter.broker.protocol=SASL_SSL
>
> ssl.endpoint.identification.algorithm=
>
> ssl.client.auth=required
>
> sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
>
> sasl.enabled.mechanisms=SCRAM-SHA-512
>
>
>
> # Broker security settings
>
>
> ssl.truststore.location=/home/kafka/Downloads/kafka_2.13-3.0.0/config/trusts
> tore/kafka.truststore.jks
>
> ssl.truststore.password=giuseppe
>
>
> ssl.keystore.location=/home/kafka/Downloads/kafka_2.13-3.0.0/config/keystore
> /kafka.keystore.jks
>
> ssl.keystore.password=giuseppe
>
> ssl.key.password=giuseppe
>
>
>
> # ACLs
>
> authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
>
> super.users=User:admin
>
>
>
> #zookeeper SASL
>
> zookeeper.set.acl=false
>
> ### SECURITY using SCRAM-SHA-512 and SSL ###
>
>
>
> Any help is appreciated.
>
> Thanks.
>
>
>
> PhD Giuseppe Ricci
>
> R Senior Software Developer
>
> Sysman Progetti & Servizi S.r.l.
>
>
> <
> https://eur04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.sys-ma
>
> n.it%2F=02%7C01%7Cdaniele.verardi%40angelcompany.com%7C82656c3d0932496c
>
> c0d408d86abc2751%7Cc187ee014e4e40c8b342f82c8d699421%7C0%7C0%7C63737670364758
> <https://eur04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fwww.sys-man.it%2F=02%7C01%7Cdaniele.verardi%40angelcompany.com%7C82656c3d0932496cc0d408d86abc2751%7Cc187ee014e4e40c8b342f82c8d699421%7C0%7C0%7C63737670364758>
> 9425=A1KwJWF8PrbDASmFQ92NPgMtQV2c0ciHWfYqt4PujQM%3D=0>
> http://www.sys-man.it
>
>
>
> e-mail:<mailto:ri...@sys-man.it> ri...@sys-man.it
>
>
>
>
>
> --
> Questa email è stata esaminata alla ricerca di virus da AVG.
> http://www.avg.com
>


Re: topic replicas unbalanced

2021-11-10 Thread Liam Clarke-Hutchinson
Good luck :)

On Thu, Nov 11, 2021 at 12:50 PM David Ballano Fernandez <
dfernan...@demonware.net> wrote:

> Thanks Liam, I will try my best but due to some prod shenanigans I won't be
> able to test this until next week, i will reply once I have more info
> thanks for your help!
>
> On Wed, Nov 10, 2021 at 3:22 PM Liam Clarke-Hutchinson <
> lclar...@redhat.com>
> wrote:
>
> > Hi David, those log messages are INFO level logged to controller.log when
> > the cluster starts up and selects a broker to act as a controller, or a
> new
> > controller is elected.
> >
> > Reason I'm asking about those log messages is that they reflect the
> cached
> > state of "alive" brokers that the controller knows about. When a topic is
> > created, this cached state is used to assign replicas in a rather
> > straightforward (when there's no rack awareness involved) round robin
> > fashion across all brokers the controller knows about.
> >
> > But when you run a replica reassignment, it requires you to explicitly
> > identify which broker id a replica should move to, and looking at the
> code,
> > this forcibly updates the cache of broker metadata for each broker id you
> > specify.  So I'm wondering if the cached "alive" broker state when you
> > initially created the topic doesn't reflect all the actual brokers in
> your
> > cluster.
> >
> > So, if you are able to a) set the logging level for
> > kafka.controller.KafkaController (at the very least) to INFO and b) stop
> > then restart your entire cluster, those logging messages would confirm or
> > eliminate the question of that cached broker state being a factor.
> >
> > Admittedly I could be barking up an entirely wrong tree, and if anyone
> who
> > understands the replica assignment algorithm better than I is reading,
> > please do correct me!
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> > On Thu, 11 Nov 2021, 5:16 am David Ballano Fernandez, <
> > dfernan...@demonware.net> wrote:
> >
> > > Hi Liam,
> > >
> > > I tried set all loggers to  DEBUG on the controller
> > >
> > > this are the only messages that i can see when i create a topic,
> > couldn't
> > > find the logs you mention but got this:
> > >
> > > ==> controller.log <==
> > > [2021-11-10 05:06:19,042] INFO [Controller id=103] New topics:
> > > [HashSet(davidballano20)], deleted topics: [HashSet()], new partition
> > > replica assignment [HashMap(davidballano20-3 ->
> > > ReplicaAssignment(replicas=112,111,121, addingReplicas=,
> > > removingReplicas=), davidballano20-1 ->
> > > ReplicaAssignment(replicas=107,101,116, addingReplicas=,
> > > removingReplicas=), davidballano20-2 ->
> > > ReplicaAssignment(replicas=113,116,111, addingReplicas=,
> > > removingReplicas=), davidballano20-4 ->
> > > ReplicaAssignment(replicas=120,121,122, addingReplicas=,
> > > removingReplicas=), davidballano20-0 ->
> > > ReplicaAssignment(replicas=100,106,101, addingReplicas=,
> > > removingReplicas=))] (kafka.controller.KafkaController)
> > > [2021-11-10 05:06:19,042] INFO [Controller id=103] New partition
> creation
> > > callback for
> > >
> > >
> >
> davidballano20-3,davidballano20-1,davidballano20-2,davidballano20-4,davidballano20-0
> > > (kafka.controller.KafkaController)
> > > ...
> > > ...
> > > ==> state-change.log <==
> > > [2021-11-10 05:06:19,054] INFO [Controller id=103 epoch=11] Sending
> > > LeaderAndIsr request to broker 122 with 0 become-leader and 1
> > > become-follower partitions (state.change.logger)
> > > [2021-11-10 05:06:19,054] INFO [Controller id=103 epoch=11] Sending
> > > UpdateMetadata request to brokers HashSet(100, 101, 102, 103, 104, 105,
> > > 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119,
> > 120,
> > > 121, 122, 123) for 5 partitions (state.change.logger)
> > > ...
> > > ...
> > >
> > > thanks!
> > >
> > > On Tue, Nov 9, 2021 at 5:04 PM Liam Clarke-Hutchinson <
> > lclar...@redhat.com
> > > >
> > > wrote:
> > >
> > > > Sorry forgot to mention they'll usually be under $KAFKA_DIR/logs.
> > > >
> > > > On Wed, 10 Nov 2021, 1:53 pm Liam Clarke-Hutchinson, <
> > > lclar...@redhat.com>
> > > > wrote:
> > > >
> > > > > Thanks :)
> >

Re: topic replicas unbalanced

2021-11-10 Thread Liam Clarke-Hutchinson
Hi David, those log messages are INFO level logged to controller.log when
the cluster starts up and selects a broker to act as a controller, or a new
controller is elected.

Reason I'm asking about those log messages is that they reflect the cached
state of "alive" brokers that the controller knows about. When a topic is
created, this cached state is used to assign replicas in a rather
straightforward (when there's no rack awareness involved) round robin
fashion across all brokers the controller knows about.

But when you run a replica reassignment, it requires you to explicitly
identify which broker id a replica should move to, and looking at the code,
this forcibly updates the cache of broker metadata for each broker id you
specify.  So I'm wondering if the cached "alive" broker state when you
initially created the topic doesn't reflect all the actual brokers in your
cluster.

So, if you are able to a) set the logging level for
kafka.controller.KafkaController (at the very least) to INFO and b) stop
then restart your entire cluster, those logging messages would confirm or
eliminate the question of that cached broker state being a factor.

Admittedly I could be barking up an entirely wrong tree, and if anyone who
understands the replica assignment algorithm better than I is reading,
please do correct me!

Cheers,

Liam Clarke-Hutchinson

On Thu, 11 Nov 2021, 5:16 am David Ballano Fernandez, <
dfernan...@demonware.net> wrote:

> Hi Liam,
>
> I tried set all loggers to  DEBUG on the controller
>
> this are the only messages that i can see when i create a topic,  couldn't
> find the logs you mention but got this:
>
> ==> controller.log <==
> [2021-11-10 05:06:19,042] INFO [Controller id=103] New topics:
> [HashSet(davidballano20)], deleted topics: [HashSet()], new partition
> replica assignment [HashMap(davidballano20-3 ->
> ReplicaAssignment(replicas=112,111,121, addingReplicas=,
> removingReplicas=), davidballano20-1 ->
> ReplicaAssignment(replicas=107,101,116, addingReplicas=,
> removingReplicas=), davidballano20-2 ->
> ReplicaAssignment(replicas=113,116,111, addingReplicas=,
> removingReplicas=), davidballano20-4 ->
> ReplicaAssignment(replicas=120,121,122, addingReplicas=,
> removingReplicas=), davidballano20-0 ->
> ReplicaAssignment(replicas=100,106,101, addingReplicas=,
> removingReplicas=))] (kafka.controller.KafkaController)
> [2021-11-10 05:06:19,042] INFO [Controller id=103] New partition creation
> callback for
>
> davidballano20-3,davidballano20-1,davidballano20-2,davidballano20-4,davidballano20-0
> (kafka.controller.KafkaController)
> ...
> ...
> ==> state-change.log <==
> [2021-11-10 05:06:19,054] INFO [Controller id=103 epoch=11] Sending
> LeaderAndIsr request to broker 122 with 0 become-leader and 1
> become-follower partitions (state.change.logger)
> [2021-11-10 05:06:19,054] INFO [Controller id=103 epoch=11] Sending
> UpdateMetadata request to brokers HashSet(100, 101, 102, 103, 104, 105,
> 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120,
> 121, 122, 123) for 5 partitions (state.change.logger)
> ...
> ...
>
> thanks!
>
> On Tue, Nov 9, 2021 at 5:04 PM Liam Clarke-Hutchinson  >
> wrote:
>
> > Sorry forgot to mention they'll usually be under $KAFKA_DIR/logs.
> >
> > On Wed, 10 Nov 2021, 1:53 pm Liam Clarke-Hutchinson, <
> lclar...@redhat.com>
> > wrote:
> >
> > > Thanks :)
> > >
> > > If you grep for "broker epochs cache" in the controller.log.* files,
> are
> > > you seeing all of your brokers listed?
> > > Should see log messages like "Initialized|Updated broker epochs cache:
> > > HashMap( -> epoch,  -> epoch...)"
> > >
> > > This is to check if the controller knows that all of your brokers are
> > live
> > > at the time of topic creation. If their id is in that hashmap, they're
> > > alive.
> > >
> > > Cheers,
> > >
> > > Liam
> > >
> > > On Wed, Nov 10, 2021 at 1:21 PM David Ballano Fernandez <
> > > dfernan...@demonware.net> wrote:
> > >
> > >> We are using Kafka with zookeeper
> > >>
> > >> On Tue, Nov 9, 2021 at 4:12 PM Liam Clarke-Hutchinson <
> > >> lclar...@redhat.com>
> > >> wrote:
> > >>
> > >> > Yeah, it's broker side, just wanted to eliminate the obscure edge
> > case.
> > >> >
> > >> > Oh, and are you using Zookeeper or KRaft?
> > >> >
> > >> > Cheers,
> > >> >
> > >> > Liam
> > >> >
> > >> > On Wed, Nov 10, 2

Re: Compatibility issue between kafka-client 2.4.1 and Kafka cluster 2.6.1

2021-11-10 Thread Liam Clarke-Hutchinson
Hi Davide, your inline image didn't arrive intact. Are you able to upload
it to an image host and provide a link?

On Wed, Nov 10, 2021 at 10:26 PM Davide Restivo
 wrote:

> Hi,
>
> I'm currently facing an integration issue between two different kafka
> versions and I was wondering if someone has already faced a similar problem.
>
> I have a kafka client (version 2.4.1) connecting to a Kafka cluster
> (version 2.6.1).  After a successful SSL handshake I get the following
> error (sorry but I had to attach a screenshot because I do not have the
> logs in text version anymore, my apologies for this):
>
> [image: Inline image]
>
>
> Has anyone faced a similar issue?
>
> Thanks in adavance and regards,
> Davide
>


Re: Please add me to JIRA contributor list

2021-11-09 Thread Liam Clarke-Hutchinson
Thanks Matthias :)

On Wed, 10 Nov 2021, 5:31 pm Matthias J. Sax,  wrote:

> Done.
>
> On 11/9/21 5:06 PM, Liam Clarke-Hutchinson wrote:
> > Hi,
> >
> > My JIRA username is lclarkenz.
> >
> > Many thanks,
> >
> > Liam Clarke-Hutchinson
> >
>
>


Please add me to JIRA contributor list

2021-11-09 Thread Liam Clarke-Hutchinson
Hi,

My JIRA username is lclarkenz.

Many thanks,

Liam Clarke-Hutchinson


Re: topic replicas unbalanced

2021-11-09 Thread Liam Clarke-Hutchinson
Sorry forgot to mention they'll usually be under $KAFKA_DIR/logs.

On Wed, 10 Nov 2021, 1:53 pm Liam Clarke-Hutchinson, 
wrote:

> Thanks :)
>
> If you grep for "broker epochs cache" in the controller.log.* files, are
> you seeing all of your brokers listed?
> Should see log messages like "Initialized|Updated broker epochs cache:
> HashMap( -> epoch,  -> epoch...)"
>
> This is to check if the controller knows that all of your brokers are live
> at the time of topic creation. If their id is in that hashmap, they're
> alive.
>
> Cheers,
>
> Liam
>
> On Wed, Nov 10, 2021 at 1:21 PM David Ballano Fernandez <
> dfernan...@demonware.net> wrote:
>
>> We are using Kafka with zookeeper
>>
>> On Tue, Nov 9, 2021 at 4:12 PM Liam Clarke-Hutchinson <
>> lclar...@redhat.com>
>> wrote:
>>
>> > Yeah, it's broker side, just wanted to eliminate the obscure edge case.
>> >
>> > Oh, and are you using Zookeeper or KRaft?
>> >
>> > Cheers,
>> >
>> > Liam
>> >
>> > On Wed, Nov 10, 2021 at 1:00 PM David Ballano Fernandez <
>> > dfernan...@demonware.net> wrote:
>> >
>> > > I don't seem to have that config in any of our clusters. Is that
>> broker
>> > > config?
>> > >
>> > >
>> > > On Tue, Nov 9, 2021 at 3:50 PM Liam Clarke-Hutchinson <
>> > lclar...@redhat.com
>> > > >
>> > > wrote:
>> > >
>> > > > Thanks David,
>> > > >
>> > > > Hmm, is the property create.topic.policy.class.name set in
>> > > > server.properties at all?
>> > > >
>> > > > Cheers,
>> > > >
>> > > > Liam
>> > > >
>> > > > On Wed, Nov 10, 2021 at 12:21 PM David Ballano Fernandez <
>> > > > dfernan...@demonware.net> wrote:
>> > > >
>> > > > > Hi Liam,
>> > > > >
>> > > > > I did a test creating topics with kafka-topics.sh and admin API
>> from
>> > > > > confluent kafka python.
>> > > > > The same happened for both.
>> > > > >
>> > > > > thanks!
>> > > > >
>> > > > > On Tue, Nov 9, 2021 at 2:58 PM Liam Clarke-Hutchinson <
>> > > > lclar...@redhat.com
>> > > > > >
>> > > > > wrote:
>> > > > >
>> > > > > > Hi David,
>> > > > > >
>> > > > > > What tool(s) are you using to create new topics? Is it the
>> > > > > kafka-topics.sh
>> > > > > > that ships with Apache Kafka?
>> > > > > >
>> > > > > > Cheers,
>> > > > > >
>> > > > > > Liam Clarke-Hutchinson
>> > > > > >
>> > > > > > On Wed, Nov 10, 2021 at 11:41 AM David Ballano Fernandez <
>> > > > > > dfernan...@demonware.net> wrote:
>> > > > > >
>> > > > > > > Hi All,
>> > > > > > > Trying to figure out why my brokers have some disk imbalance I
>> > have
>> > > > > found
>> > > > > > > that Kafka (maybe this is the way it is supposed to work?) is
>> not
>> > > > > > spreading
>> > > > > > > all replicas to all available brokers.
>> > > > > > >
>> > > > > > > I have been trying to figure out how a topic with 5 partitions
>> > with
>> > > > > > > replication_factor=3  (15 replicas) could endup having all
>> > replicas
>> > > > > > spread
>> > > > > > > over 9 brokers instead of 15, especially when there are more
>> > > brokers
>> > > > > than
>> > > > > > > the total replicas for that specific topic.
>> > > > > > >
>> > > > > > > cluster has 48 brokers.
>> > > > > > >
>> > > > > > > # topics.py describe -topic topic1
>> > > > > > > {145: 1, 148: 2, *101: 3*, 146: 1, 102: 2, 147: 1, 103: 2,
>> 104:
>> > 2,
>> > > > 105:
>> > > > > > 1}
>> > > > > > > the keys are the brokerid and the values is how many replicas
>> > they
>> > > > > have.
>> > > > > > >
>> > > > > > > As you can see brokerid 101 has 3 replicas. which make the
>> disk
>> > > > > > unbalanced
>> > > > > > > compared to other brokers.
>> > > > > > >
>> > > > > > > I created a brand new topic in a test cluster with 24 brokers.
>> > > topic
>> > > > > has
>> > > > > > 5
>> > > > > > > partitions with replication factor 3
>> > > > > > > topics.py describe -topic test
>> > > > > > > {119: 1, 103: 1, 106: 2, 109: 1, 101: 2, 114: 1, 116: 2, 118:
>> 1,
>> > > 111:
>> > > > > 2,
>> > > > > > > 104: 1, 121: 1}
>> > > > > > >
>> > > > > > > This time kafka decided to spread the replicas over 11 brokers
>> > > > instead
>> > > > > of
>> > > > > > > 15.
>> > > > > > > just for fun i ran a partition reassignment  for  topic test,
>> > > > spreading
>> > > > > > all
>> > > > > > > replicas to all brokers, result:
>> > > > > > >
>> > > > > > > # topics.py describe -topic test
>> > > > > > > {110: 1, 111: 1, 109: 1, 108: 1, 112: 1, 103: 1, 107: 1, 105:
>> 1,
>> > > 104:
>> > > > > 1,
>> > > > > > > 106: 1, 102: 1, 118: 1, 116: 1, 113: 1, 117: 1}
>> > > > > > >
>> > > > > > > Now all replicas are spread across 15 brokers.
>> > > > > > >
>> > > > > > > Is there something I am missing? Maybe the reason is to keep
>> > > network
>> > > > > > > chatter down?. By the way, I don't have any rack awareness
>> > > > configured.
>> > > > > > > Thanks!
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: topic replicas unbalanced

2021-11-09 Thread Liam Clarke-Hutchinson
Thanks :)

If you grep for "broker epochs cache" in the controller.log.* files, are
you seeing all of your brokers listed?
Should see log messages like "Initialized|Updated broker epochs cache:
HashMap( -> epoch,  -> epoch...)"

This is to check if the controller knows that all of your brokers are live
at the time of topic creation. If their id is in that hashmap, they're
alive.

Cheers,

Liam

On Wed, Nov 10, 2021 at 1:21 PM David Ballano Fernandez <
dfernan...@demonware.net> wrote:

> We are using Kafka with zookeeper
>
> On Tue, Nov 9, 2021 at 4:12 PM Liam Clarke-Hutchinson  >
> wrote:
>
> > Yeah, it's broker side, just wanted to eliminate the obscure edge case.
> >
> > Oh, and are you using Zookeeper or KRaft?
> >
> > Cheers,
> >
> > Liam
> >
> > On Wed, Nov 10, 2021 at 1:00 PM David Ballano Fernandez <
> > dfernan...@demonware.net> wrote:
> >
> > > I don't seem to have that config in any of our clusters. Is that broker
> > > config?
> > >
> > >
> > > On Tue, Nov 9, 2021 at 3:50 PM Liam Clarke-Hutchinson <
> > lclar...@redhat.com
> > > >
> > > wrote:
> > >
> > > > Thanks David,
> > > >
> > > > Hmm, is the property create.topic.policy.class.name set in
> > > > server.properties at all?
> > > >
> > > > Cheers,
> > > >
> > > > Liam
> > > >
> > > > On Wed, Nov 10, 2021 at 12:21 PM David Ballano Fernandez <
> > > > dfernan...@demonware.net> wrote:
> > > >
> > > > > Hi Liam,
> > > > >
> > > > > I did a test creating topics with kafka-topics.sh and admin API
> from
> > > > > confluent kafka python.
> > > > > The same happened for both.
> > > > >
> > > > > thanks!
> > > > >
> > > > > On Tue, Nov 9, 2021 at 2:58 PM Liam Clarke-Hutchinson <
> > > > lclar...@redhat.com
> > > > > >
> > > > > wrote:
> > > > >
> > > > > > Hi David,
> > > > > >
> > > > > > What tool(s) are you using to create new topics? Is it the
> > > > > kafka-topics.sh
> > > > > > that ships with Apache Kafka?
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Liam Clarke-Hutchinson
> > > > > >
> > > > > > On Wed, Nov 10, 2021 at 11:41 AM David Ballano Fernandez <
> > > > > > dfernan...@demonware.net> wrote:
> > > > > >
> > > > > > > Hi All,
> > > > > > > Trying to figure out why my brokers have some disk imbalance I
> > have
> > > > > found
> > > > > > > that Kafka (maybe this is the way it is supposed to work?) is
> not
> > > > > > spreading
> > > > > > > all replicas to all available brokers.
> > > > > > >
> > > > > > > I have been trying to figure out how a topic with 5 partitions
> > with
> > > > > > > replication_factor=3  (15 replicas) could endup having all
> > replicas
> > > > > > spread
> > > > > > > over 9 brokers instead of 15, especially when there are more
> > > brokers
> > > > > than
> > > > > > > the total replicas for that specific topic.
> > > > > > >
> > > > > > > cluster has 48 brokers.
> > > > > > >
> > > > > > > # topics.py describe -topic topic1
> > > > > > > {145: 1, 148: 2, *101: 3*, 146: 1, 102: 2, 147: 1, 103: 2, 104:
> > 2,
> > > > 105:
> > > > > > 1}
> > > > > > > the keys are the brokerid and the values is how many replicas
> > they
> > > > > have.
> > > > > > >
> > > > > > > As you can see brokerid 101 has 3 replicas. which make the disk
> > > > > > unbalanced
> > > > > > > compared to other brokers.
> > > > > > >
> > > > > > > I created a brand new topic in a test cluster with 24 brokers.
> > > topic
> > > > > has
> > > > > > 5
> > > > > > > partitions with replication factor 3
> > > > > > > topics.py describe -topic test
> > > > > > > {119: 1, 103: 1, 106: 2, 109: 1, 101: 2, 114: 1, 116: 2, 118:
> 1,
> > > 111:
> > > > > 2,
> > > > > > > 104: 1, 121: 1}
> > > > > > >
> > > > > > > This time kafka decided to spread the replicas over 11 brokers
> > > > instead
> > > > > of
> > > > > > > 15.
> > > > > > > just for fun i ran a partition reassignment  for  topic test,
> > > > spreading
> > > > > > all
> > > > > > > replicas to all brokers, result:
> > > > > > >
> > > > > > > # topics.py describe -topic test
> > > > > > > {110: 1, 111: 1, 109: 1, 108: 1, 112: 1, 103: 1, 107: 1, 105:
> 1,
> > > 104:
> > > > > 1,
> > > > > > > 106: 1, 102: 1, 118: 1, 116: 1, 113: 1, 117: 1}
> > > > > > >
> > > > > > > Now all replicas are spread across 15 brokers.
> > > > > > >
> > > > > > > Is there something I am missing? Maybe the reason is to keep
> > > network
> > > > > > > chatter down?. By the way, I don't have any rack awareness
> > > > configured.
> > > > > > > Thanks!
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: topic replicas unbalanced

2021-11-09 Thread Liam Clarke-Hutchinson
Yeah, it's broker side, just wanted to eliminate the obscure edge case.

Oh, and are you using Zookeeper or KRaft?

Cheers,

Liam

On Wed, Nov 10, 2021 at 1:00 PM David Ballano Fernandez <
dfernan...@demonware.net> wrote:

> I don't seem to have that config in any of our clusters. Is that broker
> config?
>
>
> On Tue, Nov 9, 2021 at 3:50 PM Liam Clarke-Hutchinson  >
> wrote:
>
> > Thanks David,
> >
> > Hmm, is the property create.topic.policy.class.name set in
> > server.properties at all?
> >
> > Cheers,
> >
> > Liam
> >
> > On Wed, Nov 10, 2021 at 12:21 PM David Ballano Fernandez <
> > dfernan...@demonware.net> wrote:
> >
> > > Hi Liam,
> > >
> > > I did a test creating topics with kafka-topics.sh and admin API from
> > > confluent kafka python.
> > > The same happened for both.
> > >
> > > thanks!
> > >
> > > On Tue, Nov 9, 2021 at 2:58 PM Liam Clarke-Hutchinson <
> > lclar...@redhat.com
> > > >
> > > wrote:
> > >
> > > > Hi David,
> > > >
> > > > What tool(s) are you using to create new topics? Is it the
> > > kafka-topics.sh
> > > > that ships with Apache Kafka?
> > > >
> > > > Cheers,
> > > >
> > > > Liam Clarke-Hutchinson
> > > >
> > > > On Wed, Nov 10, 2021 at 11:41 AM David Ballano Fernandez <
> > > > dfernan...@demonware.net> wrote:
> > > >
> > > > > Hi All,
> > > > > Trying to figure out why my brokers have some disk imbalance I have
> > > found
> > > > > that Kafka (maybe this is the way it is supposed to work?) is not
> > > > spreading
> > > > > all replicas to all available brokers.
> > > > >
> > > > > I have been trying to figure out how a topic with 5 partitions with
> > > > > replication_factor=3  (15 replicas) could endup having all replicas
> > > > spread
> > > > > over 9 brokers instead of 15, especially when there are more
> brokers
> > > than
> > > > > the total replicas for that specific topic.
> > > > >
> > > > > cluster has 48 brokers.
> > > > >
> > > > > # topics.py describe -topic topic1
> > > > > {145: 1, 148: 2, *101: 3*, 146: 1, 102: 2, 147: 1, 103: 2, 104: 2,
> > 105:
> > > > 1}
> > > > > the keys are the brokerid and the values is how many replicas they
> > > have.
> > > > >
> > > > > As you can see brokerid 101 has 3 replicas. which make the disk
> > > > unbalanced
> > > > > compared to other brokers.
> > > > >
> > > > > I created a brand new topic in a test cluster with 24 brokers.
> topic
> > > has
> > > > 5
> > > > > partitions with replication factor 3
> > > > > topics.py describe -topic test
> > > > > {119: 1, 103: 1, 106: 2, 109: 1, 101: 2, 114: 1, 116: 2, 118: 1,
> 111:
> > > 2,
> > > > > 104: 1, 121: 1}
> > > > >
> > > > > This time kafka decided to spread the replicas over 11 brokers
> > instead
> > > of
> > > > > 15.
> > > > > just for fun i ran a partition reassignment  for  topic test,
> > spreading
> > > > all
> > > > > replicas to all brokers, result:
> > > > >
> > > > > # topics.py describe -topic test
> > > > > {110: 1, 111: 1, 109: 1, 108: 1, 112: 1, 103: 1, 107: 1, 105: 1,
> 104:
> > > 1,
> > > > > 106: 1, 102: 1, 118: 1, 116: 1, 113: 1, 117: 1}
> > > > >
> > > > > Now all replicas are spread across 15 brokers.
> > > > >
> > > > > Is there something I am missing? Maybe the reason is to keep
> network
> > > > > chatter down?. By the way, I don't have any rack awareness
> > configured.
> > > > > Thanks!
> > > > >
> > > >
> > >
> >
>


Re: topic replicas unbalanced

2021-11-09 Thread Liam Clarke-Hutchinson
Thanks David,

Hmm, is the property create.topic.policy.class.name set in
server.properties at all?

Cheers,

Liam

On Wed, Nov 10, 2021 at 12:21 PM David Ballano Fernandez <
dfernan...@demonware.net> wrote:

> Hi Liam,
>
> I did a test creating topics with kafka-topics.sh and admin API from
> confluent kafka python.
> The same happened for both.
>
> thanks!
>
> On Tue, Nov 9, 2021 at 2:58 PM Liam Clarke-Hutchinson  >
> wrote:
>
> > Hi David,
> >
> > What tool(s) are you using to create new topics? Is it the
> kafka-topics.sh
> > that ships with Apache Kafka?
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> > On Wed, Nov 10, 2021 at 11:41 AM David Ballano Fernandez <
> > dfernan...@demonware.net> wrote:
> >
> > > Hi All,
> > > Trying to figure out why my brokers have some disk imbalance I have
> found
> > > that Kafka (maybe this is the way it is supposed to work?) is not
> > spreading
> > > all replicas to all available brokers.
> > >
> > > I have been trying to figure out how a topic with 5 partitions with
> > > replication_factor=3  (15 replicas) could endup having all replicas
> > spread
> > > over 9 brokers instead of 15, especially when there are more brokers
> than
> > > the total replicas for that specific topic.
> > >
> > > cluster has 48 brokers.
> > >
> > > # topics.py describe -topic topic1
> > > {145: 1, 148: 2, *101: 3*, 146: 1, 102: 2, 147: 1, 103: 2, 104: 2, 105:
> > 1}
> > > the keys are the brokerid and the values is how many replicas they
> have.
> > >
> > > As you can see brokerid 101 has 3 replicas. which make the disk
> > unbalanced
> > > compared to other brokers.
> > >
> > > I created a brand new topic in a test cluster with 24 brokers. topic
> has
> > 5
> > > partitions with replication factor 3
> > > topics.py describe -topic test
> > > {119: 1, 103: 1, 106: 2, 109: 1, 101: 2, 114: 1, 116: 2, 118: 1, 111:
> 2,
> > > 104: 1, 121: 1}
> > >
> > > This time kafka decided to spread the replicas over 11 brokers instead
> of
> > > 15.
> > > just for fun i ran a partition reassignment  for  topic test, spreading
> > all
> > > replicas to all brokers, result:
> > >
> > > # topics.py describe -topic test
> > > {110: 1, 111: 1, 109: 1, 108: 1, 112: 1, 103: 1, 107: 1, 105: 1, 104:
> 1,
> > > 106: 1, 102: 1, 118: 1, 116: 1, 113: 1, 117: 1}
> > >
> > > Now all replicas are spread across 15 brokers.
> > >
> > > Is there something I am missing? Maybe the reason is to keep network
> > > chatter down?. By the way, I don't have any rack awareness configured.
> > > Thanks!
> > >
> >
>


Re: topic replicas unbalanced

2021-11-09 Thread Liam Clarke-Hutchinson
Hi David,

What tool(s) are you using to create new topics? Is it the kafka-topics.sh
that ships with Apache Kafka?

Cheers,

Liam Clarke-Hutchinson

On Wed, Nov 10, 2021 at 11:41 AM David Ballano Fernandez <
dfernan...@demonware.net> wrote:

> Hi All,
> Trying to figure out why my brokers have some disk imbalance I have found
> that Kafka (maybe this is the way it is supposed to work?) is not spreading
> all replicas to all available brokers.
>
> I have been trying to figure out how a topic with 5 partitions with
> replication_factor=3  (15 replicas) could endup having all replicas spread
> over 9 brokers instead of 15, especially when there are more brokers than
> the total replicas for that specific topic.
>
> cluster has 48 brokers.
>
> # topics.py describe -topic topic1
> {145: 1, 148: 2, *101: 3*, 146: 1, 102: 2, 147: 1, 103: 2, 104: 2, 105: 1}
> the keys are the brokerid and the values is how many replicas they have.
>
> As you can see brokerid 101 has 3 replicas. which make the disk unbalanced
> compared to other brokers.
>
> I created a brand new topic in a test cluster with 24 brokers. topic has 5
> partitions with replication factor 3
> topics.py describe -topic test
> {119: 1, 103: 1, 106: 2, 109: 1, 101: 2, 114: 1, 116: 2, 118: 1, 111: 2,
> 104: 1, 121: 1}
>
> This time kafka decided to spread the replicas over 11 brokers instead of
> 15.
> just for fun i ran a partition reassignment  for  topic test, spreading all
> replicas to all brokers, result:
>
> # topics.py describe -topic test
> {110: 1, 111: 1, 109: 1, 108: 1, 112: 1, 103: 1, 107: 1, 105: 1, 104: 1,
> 106: 1, 102: 1, 118: 1, 116: 1, 113: 1, 117: 1}
>
> Now all replicas are spread across 15 brokers.
>
> Is there something I am missing? Maybe the reason is to keep network
> chatter down?. By the way, I don't have any rack awareness configured.
> Thanks!
>


Re: Spark Streams vs Kafka Streams

2021-04-28 Thread Liam Clarke-Hutchinson
Spark Structured Streaming has some significant limitations compared to
Kafka Streams.

This one has always proved hard to overcome:

"Multiple streaming aggregations (i.e. a chain of aggregations on a
streaming DF) are not yet supported on streaming Datasets."





On Thu, 29 Apr. 2021, 8:13 am Parthasarathy, Mohan, 
wrote:

> Matthias,
>
> I will create a KIP or ticket for tracking this issue.
>
> -thanks
> Mohan
>
>
> On 4/28/21, 1:01 PM, "Matthias J. Sax"  wrote:
>
> Feel free to do a KIP and contribute to Kafka!
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>
> Or create a ticket for tracking.
>
>
> -Matthias
>
> On 4/28/21 12:49 PM, Parthasarathy, Mohan wrote:
> > Andrew,
> >
> > I am not sure I understand. We have built several analytics
> applications. We typically use custom aggregations as they are not
> available directly in the library.
> >
> > -mohan
> >
> >
> > On 4/28/21, 12:12 PM, "Andrew Otto"  wrote:
> >
> > I'd assume this is because Kafka Streams is positioned for
> building
> > streaming applications, rather than doing analytics, whereas
> Spark is more
> > often used for analytics purposes.
> >
> >
>
>
>


Re: Kafka Connect: producer idempotence after increasing producer.max.in.flight.requests.per.connection

2021-04-23 Thread Liam Clarke-Hutchinson
Hi Yong Gang,

Idempotence is not guaranteed if you increase max in flight requests.
Because if 5 requests are dispatched, but 2 requests fail and are retried,
the messages in those 2 requests will arrive after the messages in the
other three requests, so ordering will be changed.

Assuming everything is happy in your cluster, network etc., you won't see
immediate negative effects on idempotence when increasing that property.

But when the cluster/network/etc. is heavily loaded, then you'll start to
see the negative effects.

Does that make sense?

Kind regards,

Liam Clarke-Hutchinson

On Fri, 23 Apr. 2021, 11:44 am Yong Gang,  wrote:

> Hi guys,
>
> We are using Kafka Connect to do Kafka to Kafka data replication between
> DCs.
> For this config item *producer.max.in.flight.requests.per.connection* the
> default value in KafkaProducer (Kafka client) is 5 and can still
> maintain idempotence, but in Kafka Connect the default value of this is 1.
>
> I wonder if increasing *producer.max.in.flight.requests.per.connection *to
> 5 can Kafka Connect still be idempotent?
>
> Thanks,
> YongGang
>


Re: Which book to read?

2021-04-12 Thread Liam Clarke-Hutchinson
The first is definitely worth a read, even the chapters that are perhaps
somewhat out of date (e.g., cluster replication focused on Mirror Maker v1)
are still useful in how they describe the various ways to run multiple
clusters - hot/hot, hot/warm, stretch etc.

Also, the chapter on monitoring and which metrics offer the most value is
great for anyone looking to set up some decent monitoring off the back of
Prometheus' jmx_exporter or Datadog's similar JMX functionality and Grafana
etc.

I also really recommend I <3 Logs by Jay Kreps, it's short, high level, I
think I finished it on one bus ride, but gives a great description of the
log-based architecture LinkedIn built around Kafka, and was instrumental in
convincing me, and my bosses(!) that Kafka would be an ideal backbone for
our data streams in my previous company.

Cheers,

Liam Clarke-Hutchinson

On Tue, Apr 13, 2021 at 2:23 AM SuarezMiguelC
 wrote:

> Hello Apache Kafka Community!... A quick question:
>
> I am using right now kafka a LOT (with kafka connect, streams with
> nodejs...) in my arquitecture and, my knowledge has increased because of
> this, however, I'm looking for more experience, so, I downloaded the Kakfa
> book, my question is: Since there are 2 volumes, is the second just a
> better update?, or should I read the first one too?.
>
> Thanks in advance,
> Miguel Suárez


Re: Kafka Definitive guide v2 states auto.leader.rebalance.enable = true is not recommended

2021-04-12 Thread Liam Clarke-Hutchinson
Ah, thanks Todd :)

Was it causing issues back in the day of consumer rebalances always being
stop the world? I was wondering if the statement had perhaps predated the
cooperative / stick assignors we're able to run now.

Cheers,

Liam Clarke-Hutchinson



On Tue, Apr 13, 2021 at 2:34 AM Todd Palino  wrote:

> As a note, that part of the second edition has not been updated yet. This
> setting used to cause significant problems, but more recent updates to the
> controller code have made the auto leader rebalancing usable.
>
> -Todd
>
> On Mon, Apr 12, 2021 at 10:20 AM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hi all,
> >
> > This question arose elsewhere, and I'm also going to fire it off to
> > O'Reilly in the hopes that they'll clarify, but on page 180 of the
> > Definitive Guide v2
> > <
> >
> https://assets.confluent.io/m/2849a76e39cda2bd/original/20201119-EB-Kafka_The_Definitive_Guide-Preview-Chapters_1_thru_6.pdf
> > >
> > it
> > states:
> >
> > *Kafka brokers do not automatically take partition leadership back
> (unless
> > auto leader rebalance is enabled, but this configuration is not
> > recommended)*
> >
> > The original commenter raised the point that this defaults to true, and
> it
> > sounds like a good idea to have auto leader rebalancing.
> >
> > So I'm curious, in anyone's war stories or experiences, has this property
> > being enabled been harmful? From the context that the paragraph was
> written
> > in, I'm assuming the writers were perhaps intending to emphasise the
> Cruise
> > Control or Confluents self-balancing-cluster / auto-balancing features
> were
> > preferable, but in my very brief Google didn't see any advice to set
> > auto.leader.rebalance.enabled to false to use those tools.
> >
> > So yeah, just curious if this rings any bells.
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> --
> *Todd Palino*
> Senior Staff Engineer, Site Reliability
> Capacity Engineering
>
>
>
> linkedin.com/in/toddpalino
>


Kafka Definitive guide v2 states auto.leader.rebalance.enable = true is not recommended

2021-04-12 Thread Liam Clarke-Hutchinson
Hi all,

This question arose elsewhere, and I'm also going to fire it off to
O'Reilly in the hopes that they'll clarify, but on page 180 of the
Definitive Guide v2
<https://assets.confluent.io/m/2849a76e39cda2bd/original/20201119-EB-Kafka_The_Definitive_Guide-Preview-Chapters_1_thru_6.pdf>
it
states:

*Kafka brokers do not automatically take partition leadership back (unless
auto leader rebalance is enabled, but this configuration is not
recommended)*

The original commenter raised the point that this defaults to true, and it
sounds like a good idea to have auto leader rebalancing.

So I'm curious, in anyone's war stories or experiences, has this property
being enabled been harmful? From the context that the paragraph was written
in, I'm assuming the writers were perhaps intending to emphasise the Cruise
Control or Confluents self-balancing-cluster / auto-balancing features were
preferable, but in my very brief Google didn't see any advice to set
auto.leader.rebalance.enabled to false to use those tools.

So yeah, just curious if this rings any bells.

Cheers,

Liam Clarke-Hutchinson


Re: Kafka Connect Distributed Mode Issues

2021-04-06 Thread Liam Clarke-Hutchinson
Yeah, looks like it's an issue with the plugin. I don't have any experience
of it, sorry.

On Tue, 6 Apr. 2021, 12:32 am Himanshu Shukla, 
wrote:

> bootstrap.servers=b-1:9092,b-2:9092
> group.id=connect-cluster
> key.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter=org.apache.kafka.connect.json.JsonConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> offset.storage.topic=connect-offsets-2
> offset.storage.replication.factor=2
> #offset.storage.partitions=25
> config.storage.topic=connect-configs-2
> config.storage.replication.factor=2
> status.storage.topic=connect-status-2
> status.storage.replication.factor=2
> #status.storage.partitions=5
> offset.flush.interval.ms=1
> #rest.host.name=
> #rest.port=8083
> rest.advertised.host.name=172.16.234.122
> rest.advertised.port=8083
> plugin.path=/apps/libs/streamthoughts-kafka-connect-file-pulse-1.6.0/
> consumer.max.poll.records=100
> consumer.max.poll.interval.ms=60
>
>
> These are the connect-distributed.properties, I am using. I have changed
> the last two fields but still having the same issue.
>
> is it related to the source connecter(file pulse in my case). It is
> scanning around 20K files and stuck. With smaller scan directory, it is
> properly running.
>
> On Mon, Apr 5, 2021 at 2:52 PM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hi Himanshu,
> >
> > Have you adjusted your consumer properties as the error message
> suggested?
> >
> > Alternatively reduce your your consumer.max.poll.records in the worker
> > config.
> >
> > Basically, the sink you're using is spending too much time processing in
> > the poll loop, so either tweak the properties as mentioned in the error
> > message, or reduce the number of records processed in a batch so that it
> > doesn't hit that time out.
> >
> > If you have adjusted these properties, and still have issues, please
> > respond with your current worker properties to make it easier to debug.
> >
> > Please note that for any KC sink or source connector, response times from
> > the underlying data source/store can impact performance, so you may also
> > need to look into that aspect also.
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> > On Thu, 1 Apr. 2021, 5:26 pm Himanshu Shukla, <
> himanshushukla...@gmail.com
> > >
> > wrote:
> >
> > > Hi,
> > > I am using kafka-connect-file-pulse connector and scanning around 20K
> > > files. After the scan step, the whole connect cluster is becoming
> > > unresponsive. I can not even access localhost:8083/connectors/ URL. It
> is
> > > giving request timeout.
> > >
> > >
> > > I have observed the below errors from the connect logs. Did anyone face
> > > this issue?
> > >
> > > Please advise if I am doing something wrong.
> > >
> > >
> > > [2021-03-31 16:21:58,920] INFO Scanning local file system directory
> > > '/apps/datafiles_1/cm_dir/QA1/'
> > >
> > >
> >
> (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner:241)
> > > [2021-03-31 16:22:57,586] WARN [Worker clientId=connect-1,
> > > groupId=connect-cluster] This member will leave the group because
> > consumer
> > > poll timeout has expired. This means the time between subsequent calls
> to
> > > poll() was longer than the configured max.poll.interval.ms, which
> > > typically
> > > implies that the poll loop is spending too much time processing
> messages.
> > > You can address this either by increasing max.poll.interval.ms or by
> > > reducing the maximum size of batches returned in poll() with
> > > max.poll.records.
> > > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1051)
> > > [2021-03-31 16:22:57,586] INFO [Worker clientId=connect-1,
> > > groupId=connect-cluster] *Member
> > > connect-1-064cf0bf-b834-40d2-9e72-e61b229157c4 sending LeaveGroup
> request
> > > to coordinator URL:9092* (id: 2147483646 rack: null)
> > > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)
> > > [2021-03-31 16:23:24,562] ERROR Request to leader to reconfigure
> > connector
> > > tasks failed
> > > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1037)
> > >
> > >
> > > *org.apache.kafka.connect.runtime.rest.errors.ConnectRestException:
> > Request
> > > timed out*
> > > at
> > >
> > >

Re: Kafka Connect Distributed Mode Issues

2021-04-05 Thread Liam Clarke-Hutchinson
Hi Himanshu,

Have you adjusted your consumer properties as the error message suggested?

Alternatively reduce your your consumer.max.poll.records in the worker
config.

Basically, the sink you're using is spending too much time processing in
the poll loop, so either tweak the properties as mentioned in the error
message, or reduce the number of records processed in a batch so that it
doesn't hit that time out.

If you have adjusted these properties, and still have issues, please
respond with your current worker properties to make it easier to debug.

Please note that for any KC sink or source connector, response times from
the underlying data source/store can impact performance, so you may also
need to look into that aspect also.

Cheers,

Liam Clarke-Hutchinson

On Thu, 1 Apr. 2021, 5:26 pm Himanshu Shukla, 
wrote:

> Hi,
> I am using kafka-connect-file-pulse connector and scanning around 20K
> files. After the scan step, the whole connect cluster is becoming
> unresponsive. I can not even access localhost:8083/connectors/ URL. It is
> giving request timeout.
>
>
> I have observed the below errors from the connect logs. Did anyone face
> this issue?
>
> Please advise if I am doing something wrong.
>
>
> [2021-03-31 16:21:58,920] INFO Scanning local file system directory
> '/apps/datafiles_1/cm_dir/QA1/'
>
> (io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner:241)
> [2021-03-31 16:22:57,586] WARN [Worker clientId=connect-1,
> groupId=connect-cluster] This member will leave the group because consumer
> poll timeout has expired. This means the time between subsequent calls to
> poll() was longer than the configured max.poll.interval.ms, which
> typically
> implies that the poll loop is spending too much time processing messages.
> You can address this either by increasing max.poll.interval.ms or by
> reducing the maximum size of batches returned in poll() with
> max.poll.records.
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1051)
> [2021-03-31 16:22:57,586] INFO [Worker clientId=connect-1,
> groupId=connect-cluster] *Member
> connect-1-064cf0bf-b834-40d2-9e72-e61b229157c4 sending LeaveGroup request
> to coordinator URL:9092* (id: 2147483646 rack: null)
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:822)
> [2021-03-31 16:23:24,562] ERROR Request to leader to reconfigure connector
> tasks failed
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1037)
>
>
> *org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Request
> timed out*
> at
>
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:97)
> at
>
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$18.run(DistributedHerder.java:1034)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> [2021-03-31 16:23:24,562] ERROR* Failed to reconfigure connector's tasks,
> retrying after backoff:
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:958)*
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Request
> timed out
> at
>
> org.apache.kafka.connect.runtime.rest.RestClient.httpRequest(RestClient.java:97)
> at
>
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$18.run(DistributedHerder.java:1034)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51
>
> --
> Regards,
> Himanshu Shukla
>


Re: Strictly one rebalancing when scaling a consumer group to a Random number on kubernetes.

2021-03-29 Thread Liam Clarke-Hutchinson
Hi Mazzen,

Have you looked into incremental cooperative rebalancing? It may help with
your issues, at least it can avoid stop the world rebalances.

https://www.confluent.io/blog/incremental-cooperative-rebalancing-in-kafka/

Cheers,

Liam Clarke

On Mon, 29 Mar. 2021, 8:04 pm Mazen Ezzeddine, <
mazen.ezzedd...@etu.univ-cotedazur.fr> wrote:

> Hi all,
> Given a replicaset/statefulset of kafka consumers that constitute a
> consumer group running on kubernetes, if the number of replicas is x than
> sometimes x rebalancing might be triggered since not all of the
> replicas/consumers send a join group request in a timely and synced manner
> to the group coordinator... This also happens when we perform scale up/down
> of consumers which might result in multiple rebalancing rounds…
>  If we manage that each consumer in the consumer group send along with the
> join group request the number of replicas of the replicaset  in the
> subscriptionUserData (using the kubernetes client API) , and then group
> cordinator/server wait until the specified number of replicas join (or
> timeout) before launching a rebalance? would that work to strict the number
> of rebalancing to one, any hint please?
> Thank you.
>
>


Re: Error in Kafka property file contains no connector type

2021-03-12 Thread Liam Clarke-Hutchinson
I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
should already have the common conf lib in there).


On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, 
wrote:

> Thanks Liam for the suggestion.
>
> This is the redone sink file (plain text)
>
> name=bigquery-sink
> connector.type=bigquery-connector
> connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> defaultDataset=test
> project=axial-glow-224522
> topics=md
> autoCreateTables=false
> gcsBucketName=tmp_storage_bucket
> queueSize=-1
> bigQueryRetry=0
> bigQueryRetryWait=1000
> bigQueryMessageTimePartitioning=false
> bigQueryPartitionDecorator=true
> timePartitioningType=DAY
> keySource=FILE
> keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
> sanitizeTopics=false
>
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> threadPoolSize=10
> allBQFieldsNullable=false
> avroDataCacheSize=100
> batchLoadIntervalSec=120
> convertDoubleSpecialValues=false
> enableBatchLoad=false
> upsertEnabled=false
> deleteEnabled=false
> mergeIntervalMs=60_000L
> mergeRecordsThreshold=-1
> autoCreateBucket=true
> allowNewBigQueryFields=false
> allowBigQueryRequiredFieldRelaxation=false
> allowSchemaUnionization=false
> kafkaDataFieldName=null
> kafkaKeyFieldName=null
>
> Now when I run the command
>
> $KAFKA_HOME/bin/connect-standalone.sh \
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>
> It comes back with this error:
>
> [2021-03-12 09:23:54,523] INFO REST server listening at
> http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
> (org.apache.kafka.connect.runtime.rest.RestServer:207)
> [2021-03-12 09:23:54,523] INFO Kafka Connect started
> (org.apache.kafka.connect.runtime.Connect:55)
> [2021-03-12 09:23:54,534] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
> at
>
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
> at
>
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
> at
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
> at
>
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
> at
>
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
> at
>
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
>
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> I downloaded common-config-6.1.0.jar and added to lib directory in
>
> /wepay-kafka-connect-bigquery-2.1.0/lib
>
> But little joy I am afraid.
>
> Cheers,
>
> Mich
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hi Mich,
> >
> > Your bigquery-sink.properties file is in a JSON format - which won't
> work.
> > It needs to follow the usual format of a Java properties file.
> >
> > Kind regards,
> >
> > Liam Clarke-Hutchinson
> >
> > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> > mich.talebza...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > > Trying to stream from Kafka to Google BigQuery.
> > >
> > >
> > >  The connect-standalone.properties is as follows
> > >
> > >
&

Re: Error in Kafka property file contains no connector type

2021-03-11 Thread Liam Clarke-Hutchinson
Hi Mich,

Your bigquery-sink.properties file is in a JSON format - which won't work.
It needs to follow the usual format of a Java properties file.

Kind regards,

Liam Clarke-Hutchinson

On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh 
wrote:

> Hi,
>
>
> Trying to stream from Kafka to Google BigQuery.
>
>
>  The connect-standalone.properties is as follows
>
>
> key.converter=org.apache.kafka.connect.storage.StringConverter
>
> ##value.converter=org.apache.kafka.connect.storage.StringConverter
>
> value.converter=org.apache.kafka.connect.json.JsonConverter
>
> #
>
> # Converter-specific settings can be passed in by prefixing the Converter's
>
> # setting with the converter we want to apply it to
>
> key.converter.schemas.enable=true
>
> value.converter.schemas.enable=false
>
>
> # The internal converter used for offsets and config data is configurable
> and
>
> # must be specified, but most users will always want to use the built-in
>
> # default. Offset and config data is never visible outside of Kafka Connect
> in
>
> # this format.
>
> ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
>
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
>
> internal.key.converter=org.apache.kafka.connect.storage.StringConverter
>
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
>
> internal.key.converter.schemas.enable=false
>
> internal.value.converter.schemas.enable=false
>
>
> offset.storage.file.filename=/tmp/connect_bq.offsets
>
> # Flush much faster than normal, which is useful for testing/debugging
>
> offset.flush.interval.ms=1
>
>
> # Set to a list of filesystem paths separated by commas (,) to enable class
>
> # loading isolation for plugins (connectors, converters, transformations).
> The
>
> # list should consist of top level directories that include any combination
> of:
>
> # a) directories immediately containing jars with plugins and their
> dependencies
>
> # b) uber-jars with plugins and their dependencies
>
> # c) directories immediately containing the package directory structure of
>
> # classes of plugins and their dependencies Note: symlinks will be followed
> to
>
> # discover dependencies or plugins.
>
> # Examples:
>
> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>
>
> And bigquery-sink.properties file has this
>
>
> {
>
>  "name": "bigquery-sink",
>
>  "connector.type": "bigquery-connector",
>
>  "connector.class":
> "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
>
>  "defaultDataset": "test",
>
>  "project": "xyz",
>
>  "topics": "md",
>
>  "autoCreateTables": "false",
>
>  "gcsBucketName": "tmp_storage_bucket",
>
>  "queueSize": "-1",
>
>  "bigQueryRetry": "0",
>
>  "bigQueryRetryWait": "1000",
>
>  "bigQueryMessageTimePartitioning": "false",
>
>  "bigQueryPartitionDecorator": "true",
>
>  "timePartitioningType": "DAY",
>
>  "keySource": "FILE",
>
>  "keyfile": "/home/hduser/xyz.json",
>
>  "sanitizeTopics": "false",
>
>  "schemaRetriever":
>
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
>
>  "threadPoolSize": "10",
>
>  "allBQFieldsNullable": "false",
>
>  "avroDataCacheSize": "100",
>
>  "batchLoadIntervalSec": "120",
>
>  "convertDoubleSpecialValues": "false",
>
>  "enableBatchLoad": "false",
>
>  "upsertEnabled": "false",
>
>  "deleteEnabled": "false",
>
>  "mergeIntervalMs": "60_000L",
>
>  "mergeRecordsThreshold": "-1",
>
>  "autoCreateBucket": "true",
>
>  "allowNewBigQueryFields": "false",
>
>  "allowBigQueryRequiredFieldRelaxation": "false",
>
>  "allowSchemaUnionization": "false",
>
>  "kafkaDataFieldName": "null",
>
>  "kafkaKeyFieldName": "n

Re: options for kafka cluster backup?

2021-03-07 Thread Liam Clarke-Hutchinson
As Ryanne said,

MM2 "syncs" offsets - in that it maintains a mapping of "cluster A offsets"
to "cluster B offsets" in cluster B, so if you have to move a consumer
group from A to B, you can relatively easily point the consumer group at
the offsets on B that map to its offsets on A.

On Mon, Mar 8, 2021 at 7:00 PM Ryanne Dolan  wrote:

> MirrorMaker v1 does not sync offsets, but MM2 does!
>
> Ryanne
>
> On Sun, Mar 7, 2021, 10:02 PM Pushkar Deole  wrote:
>
> > Thanks you all!
> >
> > Blake, for your comment:
> >
> > It'll require having a HA cluster running in another region, of course.
> > One other caveat is that it doesn't preserve the offsets of the records
> >
> > -> I believe I can't afford to keep another cluster running due to cost
> > reasons.Can you elaborate on the offset part, if offset is not preserved
> > then how the backup cluster know where to start processing for each
> topic?
> >
> > For example, you could use a Kafka Connect s3 sink. You'd have to write
> > some disaster-recovery code to restore lost data from s3 into Kafka.
> >
> > -> again here the same question, does s3 also store offset for each topic
> > as it is modified in kafka? If not then when the back is restored back
> into
> > kafka cluster, how it will know where to process each topic from?
> >
> > On Sat, Mar 6, 2021 at 4:44 PM Himanshu Shukla <
> > himanshushukla...@gmail.com>
> > wrote:
> >
> > > Hi Pushkar,
> > >
> > > you could also look at the available Kafka-connect plugins. It provides
> > > many connectors which could be leveraged to move the data in/out from
> > > Kafka.
> > >
> > > On Sat, Mar 6, 2021 at 10:18 AM Blake Miller 
> > > wrote:
> > >
> > > > MirrorMaker is one reasonable way to do this, certainly it can
> > replicate
> > > to
> > > > another region, with most of the latency being the unavoidable kind,
> if
> > > you
> > > > give it enough resources.
> > > >
> > > > It'll require having a HA cluster running in another region, of
> course.
> > > One
> > > > other caveat is that it doesn't preserve the offsets of the records.
> > > That's
> > > > probably okay for your use-case, but you should be aware of it.
> > > >
> > > > Since what you want is a backup, there are many ways to do that which
> > > might
> > > > be cheaper than another Kafka cluster.
> > > >
> > > > For example, you could use a Kafka Connect s3 sink. You'd have to
> write
> > > > some disaster-recovery code to restore lost data from s3 into Kafka.
> > > >
> > > >
> https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/
> > > >
> > > > There are many other sinks available, but s3 might be a reasonable
> > choice
> > > > for backup. It's inexpensive and reliable.
> > > >
> > > > On Fri, Mar 5, 2021, 2:48 AM Pushkar Deole 
> > wrote:
> > > >
> > > > > Yes.. so the requirement for me is to have data backed up or
> > replicated
> > > > in
> > > > > a different 'region' to cater for disaster scenarios and recover
> from
> > > > them
> > > > >
> > > > > On Fri, Mar 5, 2021 at 3:01 PM Ran Lupovich  >
> > > > wrote:
> > > > >
> > > > > > I guess that in case of avoiding data lose you would need to use
> 3
> > > > > replica
> > > > > > in different rack/sites awareness to avoid data lose, Confluent's
> > > > > > Replicator or MirrorMaker are for copying data from one cluster
> to
> > > > > another
> > > > > > usually in different dc / regions, If I am not mistaken
> > > > > >
> > > > > > בתאריך יום ו׳, 5 במרץ 2021, 11:21, מאת Pushkar Deole ‏<
> > > > > > pdeole2...@gmail.com
> > > > > > >:
> > > > > >
> > > > > > > Thanks Luke... is the mirror maker asynchronous? What will be
> > > typical
> > > > > lag
> > > > > > > between the replicated cluster and running cluster and in case
> of
> > > > > > disaster,
> > > > > > > what are the chances of data loss?
> > > > > > >
> > > > > > > On Fri, Mar 5, 2021 at 11:37 AM Luke Chen 
> > > wrote:
> > > > > > >
> > > > > > > > Hi Pushkar,
> > > > > > > > MirrorMaker is what you're looking for.
> > > > > > > > ref:
> > > > > >
> https://kafka.apache.org/documentation/#georeplication-mirrormaker
> > > > > > > >
> > > > > > > > Thanks.
> > > > > > > > Luke
> > > > > > > >
> > > > > > > > On Fri, Mar 5, 2021 at 1:50 PM Pushkar Deole <
> > > pdeole2...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi All,
> > > > > > > > >
> > > > > > > > > I was looking for some options to backup a running kafka
> > > cluster,
> > > > > for
> > > > > > > > > disaster recovery requirements. Can someone provide what
> are
> > > the
> > > > > > > > available
> > > > > > > > > options to backup and restore a running cluster in case the
> > > > entire
> > > > > > > > cluster
> > > > > > > > > goes down?
> > > > > > > > >
> > > > > > > > > Thanks..
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > > Regards,
> > > Himanshu Shukla
> > >
> >
>


Re: Window Store

2021-02-18 Thread Liam Clarke-Hutchinson
Hmmm, thanks Navneeth,

I feel like a session store set to an inactivity period of 10 minutes,
suppressed until session window closed, combined with a GlobalKTable would
be how I'd start to approach this in the DSL, with the below topology. I
have no idea if my ASCII art below will survive email formatting, so I'll
try to explain. User ids stream into the GlobalKTable, and also into the
session store. After 10 minutes of inactivity for a given user id key, the
session expires, and the session store emits the user_id -> some_value. I'd
then map the some_value to null, to take advantage of KTable semantics
where `k -> null` is treated as a delete for key k, so an inactive user
would be deleted from the ktable. You could then periodically query the
ktable's key-value store for outside emission.

That said, this is only how I'd start to explore the problem, and there are
obvious questions that need to be answered first like how much state would
you end up storing in the session store, etc. I'm hoping someone like John
Roesler who has far better insights into Kafka Streams might weigh in here.


user ids -->
globalktable < keyValueStore periodically queried.
  \> session store > map (user_id -> null) --/

Good luck,

Liam

On Thu, Feb 18, 2021 at 7:49 AM Navneeth Krishnan 
wrote:

> Hi Liam,
>
> The use case is stream all data and send it to storage after processing.
> Also when the user is inactive for a 10 min period then send a special
> event that marks the user as inactive. I'm trying to implement the special
> event here.
>
> Thanks
>
>
> On Tue, Feb 16, 2021 at 1:18 AM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hey Navneeth,
> >
> > So to understand your problem better - do you only want to stream users
> > active within 10 minutes to storage?
> >
> > Cheers,
> >
> > Liam
> >
> > On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan <
> > reachnavnee...@gmail.com>
> > wrote:
> >
> > > It’s just for emitting to data storage. There is no join here.
> > >
> > > Thanks
> > >
> > > On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> > > liam.cla...@adscale.co.nz> wrote:
> > >
> > > > Hi Navneeth,
> > > >
> > > > What is the purpose of holding these user records? Is it to join
> > against
> > > > other streams, or emit to data storage?
> > > >
> > > > Cheers,
> > > >
> > > > Liam Clarke-Hutchinson
> > > >
> > > >
> > > >
> > > > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> > > reachnavnee...@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I have a question about how I can use window stores to achieve this
> > use
> > > > > case. Thanks for all the help.
> > > > >
> > > > > A user record will be created when the user first logins and the
> > > records
> > > > > needs to be cleaned up after 10 mins of inactivity. Thus for each
> > user
> > > > > there will be a TTL but the TTL value will be updated each time
> when
> > > the
> > > > > user is active before he becomes inactive for the entire 10 min
> > period.
> > > > We
> > > > > are currently using PAPI for all our topologies and I was thinking
> of
> > > > > implementing it using a punctuator.
> > > > >
> > > > > My initial logic was to have a KV store with each user as key and
> TTL
> > > as
> > > > > the value and run a scheduled task every minute that looks at all
> the
> > > > > records which have TTL value lesser than the timestamp. But the
> > problem
> > > > in
> > > > > this approach was performance. When there are more than 1M records
> it
> > > > takes
> > > > > more than a few seconds to complete this task.
> > > > >
> > > > > Next approach is to have a window store and a KV store. Window
> store
> > > will
> > > > > have each user and corresponding TTL rounded to the nearest minute.
> > > Then
> > > > > find all keys between the current time and current time - 1min.
> Then
> > > > > iterate these keys and use the KV store to find if the TTL value is
> > > still
> > > > > the same or if we have received any updates after that. If not then
> > the
> > > > > user will be evicted.
> > > > >
> > > > > What would be a better and much more scalable solution for this.
> > > > >
> > > > > Thanks
> > > > >
> > > >
> > >
> >
>


Re: Window Store

2021-02-16 Thread Liam Clarke-Hutchinson
Hey Navneeth,

So to understand your problem better - do you only want to stream users
active within 10 minutes to storage?

Cheers,

Liam

On Tue, Feb 16, 2021 at 9:50 AM Navneeth Krishnan 
wrote:

> It’s just for emitting to data storage. There is no join here.
>
> Thanks
>
> On Mon, Feb 15, 2021 at 1:42 AM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hi Navneeth,
> >
> > What is the purpose of holding these user records? Is it to join against
> > other streams, or emit to data storage?
> >
> > Cheers,
> >
> > Liam Clarke-Hutchinson
> >
> >
> >
> > On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, <
> reachnavnee...@gmail.com
> > >
> > wrote:
> >
> > > Hi All,
> > >
> > > I have a question about how I can use window stores to achieve this use
> > > case. Thanks for all the help.
> > >
> > > A user record will be created when the user first logins and the
> records
> > > needs to be cleaned up after 10 mins of inactivity. Thus for each user
> > > there will be a TTL but the TTL value will be updated each time when
> the
> > > user is active before he becomes inactive for the entire 10 min period.
> > We
> > > are currently using PAPI for all our topologies and I was thinking of
> > > implementing it using a punctuator.
> > >
> > > My initial logic was to have a KV store with each user as key and TTL
> as
> > > the value and run a scheduled task every minute that looks at all the
> > > records which have TTL value lesser than the timestamp. But the problem
> > in
> > > this approach was performance. When there are more than 1M records it
> > takes
> > > more than a few seconds to complete this task.
> > >
> > > Next approach is to have a window store and a KV store. Window store
> will
> > > have each user and corresponding TTL rounded to the nearest minute.
> Then
> > > find all keys between the current time and current time - 1min. Then
> > > iterate these keys and use the KV store to find if the TTL value is
> still
> > > the same or if we have received any updates after that. If not then the
> > > user will be evicted.
> > >
> > > What would be a better and much more scalable solution for this.
> > >
> > > Thanks
> > >
> >
>


Re: Window Store

2021-02-15 Thread Liam Clarke-Hutchinson
Hi Navneeth,

What is the purpose of holding these user records? Is it to join against
other streams, or emit to data storage?

Cheers,

Liam Clarke-Hutchinson



On Mon, 15 Feb. 2021, 9:08 pm Navneeth Krishnan, 
wrote:

> Hi All,
>
> I have a question about how I can use window stores to achieve this use
> case. Thanks for all the help.
>
> A user record will be created when the user first logins and the records
> needs to be cleaned up after 10 mins of inactivity. Thus for each user
> there will be a TTL but the TTL value will be updated each time when the
> user is active before he becomes inactive for the entire 10 min period. We
> are currently using PAPI for all our topologies and I was thinking of
> implementing it using a punctuator.
>
> My initial logic was to have a KV store with each user as key and TTL as
> the value and run a scheduled task every minute that looks at all the
> records which have TTL value lesser than the timestamp. But the problem in
> this approach was performance. When there are more than 1M records it takes
> more than a few seconds to complete this task.
>
> Next approach is to have a window store and a KV store. Window store will
> have each user and corresponding TTL rounded to the nearest minute. Then
> find all keys between the current time and current time - 1min. Then
> iterate these keys and use the KV store to find if the TTL value is still
> the same or if we have received any updates after that. If not then the
> user will be evicted.
>
> What would be a better and much more scalable solution for this.
>
> Thanks
>


Re: print.partition=true not working

2021-02-10 Thread Liam Clarke-Hutchinson
Hi Gilles,

Looks like 2.5.0+ will do, as far as I can tell from the source,
print.partition came in with changes made for KAFKA-9417 which was released
as part of 2.5.0.

Cheers,

Liam



On Thu, 11 Feb. 2021, 11:36 am Gilles Philippart,
 wrote:

> Hi Lorenzo
>
> It seems to have been added only recently to the Kafka console consumer
> (see https://github.com/apache/kafka/pull/9099). You need a 2.7.0 Kafka
> distrib.
>
> Gilles Philippart
> Principle Software Engineer
> Funding Circle
>
> > On 10 Feb 2021, at 21:57, Robin Moffatt  wrote:
> >
> > FWIW kafkacat will do this no sweat (I realise that doesn't help w.r.t.
> the
> > tool you're trying to use, but mentioning it in case :) )
> >
> > kafkacat -b kafka-broker:9092 \
> >  -t my_topic_name -C \
> >  -f '\nKey (%K bytes): %k
> >  Value (%S bytes): %s
> >  Timestamp: %T
> >  Partition: %p
> >  Offset: %o
> >  Headers: %h\n'
> >
> >
> > --
> >
> > Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff
> >
> >
> >> On Wed, 10 Feb 2021 at 17:59, Rovere Lorenzo  wrote:
> >>
> >> Hi
> >>
> >>
> >>
> >> We are on Kafka version: 2.2.1-kafka-4.1.0
> >>
> >> We have some issues when trying to dump some messages from a topic.
> >>
> >> Topic describe: Topic:test PartitionCount:3
> >> ReplicationFactor:2 Configs:message.timestamp.type=LogAppendTime
> >>
> >>
> >>
> >> Using the kafka-console-consumer I want to print the timestamp and the
> >> partition besides the content of the topic, so I specified --property
> >> print.partition=true --property print.timestamp=true in the
> >> kafka-console-consumer.sh but I only get the timestamp. The partition
> field
> >> is always empty. Any reason why this would happen? Thanks
> >>
> >>
> >>
> >> Lorenzo Rovere
> >>
> >>
> >>
> >>
> >> Lorenzo Rovere
> >>
> >> Technology Reply
> >> Via Avogadri, 2
> >> 31057 - Silea (TV) - ITALY
> >> phone: +39 0422 1836521
> >> l.rov...@reply.it
> >> www.reply.it
> >>
> >> [image: Technology Reply]
> >>
>
> --
>
>
>
>
> Funding Circle Limited is authorised and regulated by the Financial
> Conduct Authority under firm registration number 722513. Funding Circle is
> not covered by the Financial Services Compensation Scheme. Registered in
> England (Co. No. 06968588) with registered office at 71 Queen Victoria
> Street, London EC4V 4AY.
>


Re: Send several Serialized Java objects under one Kafka Topic

2021-01-31 Thread Liam Clarke-Hutchinson
Hi Peter,

The biggest issue with Java serialization has always been compatibility
between versions of the classes between producer and consumer, it can be
avoided via very careful management, but it's a lot more painful (IMO) than
other serialisation formats.

If you're looking to avoid JSON, I recommend Avro and/or Protobuf, as the
Schema Registry developed by Confluent and associated Kafka producer
serializers/ consumer deserializers  explicitly support JSON, Avro and
Protobuf, and (once again, in my opinion) using the schema registry really
takes the pain out of schema version compatibility management.

Kind regards,

Liam Clarke-Hutchinson


On Mon, 1 Feb. 2021, 3:49 am Peter Penzov,  wrote:

> Hello All,
>  I'm working on proof of concept for sending several Java Objects
> under one Kafka Topic. More about the requirements:
>
> https://stackoverflow.com/questions/65811681/design-kafka-consumers-and-producers-for-scalability
>
> I managed to implement this working concept:
> https://github.com/rcbandit111/skyobject_engine
>
> In this code example I send several Java Objects under one Kafka Topic
> and response is sent back into another Kafka Topic.
>
> I would like to ask you what are the advantages and disadvantages in
> this design of sending data between Consumer and Producer? I want to
> use a serialized Java object, not JSON because I think it's more
> optimized for performance.
>
> Can you share your feedback, please?
>
> Also is there some better way to implement this? Like for example
> using Streaming API?
>
>
> BR,
> Peter
>


Re: hi,guys kafka 2.7.0 builld fail, any suggestion?

2021-01-27 Thread Liam Clarke-Hutchinson
Hi,

What version of Gradle?

Cheers,

Liam

On Wed, 27 Jan. 2021, 10:35 pm felixzh,  wrote:

> ~/Downloads/kafka-2.7.0# gradle clean
>
>
>
>
> > Configure project :
>
> Building project 'core' with Scala version 2.13.3
>
>
>
>
> FAILURE: Build failed with an exception.
>
>
>
>
> * Where:
>
> Build file '/root/Downloads/kafka-2.7.0/build.gradle' line: 471
>
>
>
>
> * What went wrong:
>
> A problem occurred evaluating root project 'kafka-2.7.0'.
>
> > Failed to apply plugin [id 'org.gradle.scala']
>
>> Could not find method scala() for arguments
> [build_97rd0aprcab26i03s589xrhv1$_run_closure5$_closure74$_closure108@30f53a84]
> on object of type org.gradle.api.plugins.scala.ScalaPlugin.
>
>
>
>
> * Try:
>
> Run with --stacktrace option to get the stack trace. Run with --info or
> --debug option to get more log output. Run with --scan to get full insights.
>
>
>
>
> * Get more help at https://help.gradle.org
>
>
>
>
> BUILD FAILED in 1s


Re: High latency for Kafka transactions

2020-12-02 Thread Liam Clarke-Hutchinson
Hi John,

The default for the C# client is actually 5ms, not that it really makes a
difference. Are you able to share any of your producer code, especially
around the configuration?

Kind regards,

Liam Clarke

On Thu, Dec 3, 2020 at 5:01 PM John Coleman  wrote:

> Hi,
>
> We are using Confluent.Kafka (dotnet) and according to the docs the
> linger.ms is default to 0.5ms which would cover a lot of serializations
> since each occurs in < 10 microseconds. A typical transaction batch would
> be 2 to 3 messages, occasionally more.
>
> Regards,
> John
>
> On 14/11/20 05:24, "Gary Russell"  wrote:
>
> Have you increased the producer linger.ms property [1] to ~1s?
> (Default is 0 so records are sent immediately; it generally shouldn't be
> more than a few ms - depending on how long your serialization takes).
>
> If not, perhaps your serialization (or compression) might be the
> problem; best to profile the app to see where the time is being spent.
>
> [1]:
> https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fkafka.apache.org%2Fdocumentation%2F%23linger.msdata=04%7C01%7CJohn.co%40central.tech%7Cee6e35911f484f03184208d88822d5e0%7C817e531d191b4cf58812f0061d89b53d%7C0%7C0%7C637409030504161229%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000sdata=t6XTDwN4QQs4kL5pMZzCSpcbjwMX95QrA8dzpDdrjyo%3Dreserved=0
> 
> From: John Coleman 
> Sent: Friday, November 13, 2020 4:09 AM
> To: users@kafka.apache.org 
> Subject: High latency for Kafka transactions
>
> Hi,
>
> We have very high latency ~1 second when we send a small number of
> small messages to Kafka. The whole group of messages is less than 1k. We
> are thinking better to just send the messages together as 1 message
> containing a collection of all the messages.
>
> Is there something that we can tweak to reduce this latency? We have
> 10 producers and they publish to a topic with 5 consumers. Be nice if we
> don’t have to change our message structure.
>
> TIA
> John
>
>


Re: Large messages max.message.bytes > message.max.bytes

2020-12-01 Thread Liam Clarke-Hutchinson
Topic limits can't override broker limits.

On Tue, Dec 1, 2020 at 6:42 PM Sakke Ferraris  wrote:

> Hi all!
>
> I have questions regarding configuration for large messages. I understand
> that kafka has settings for broker and topic for message max sizes;
> message.max.bytes (broker config) and max.message.bytes (topic config).
>
> I wonder if max.message.bytes for a topic can be larger that the server
> default message.max.bytes? The problem I am facing is that now and then
> there are messages that hit the 1MB wall even compressed. It is rare that
> the messages get larger, but also there is no theoretical max size for the
> messages.
>
> I am not allowed to tamper with the general setting since it could have
> performance implications for all other users, but what could the
> performance implications be if the max.message.bytes is considerably higher
> on one topic?
>
> Best Regards, Sakke F
>
>
>


Re: Reg: Max number of partitions in a topic

2020-11-25 Thread Liam Clarke-Hutchinson
Yes, you can have 1000 partitions. But, there are implications of having a
large number. Each partition has a leader. Clients downloading metadata to
find those 1000 leaders will be a bit slower than finding 100 leaders.
Producers that are buffering messages in order to use batching create a
buffer per partition, so you may see increased memory usage. Likewise, if
one of your brokers failed, the more partition leaders on it, the more
leader elections that have to occur.

I'd suggest benchmarking your use case with different partition counts and
see where your sweet spot is. This old blog post has some good ideas:
https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster/

Cheers,

Liam Clarke

On Tue, Nov 24, 2020 at 10:51 PM Gowtham S  wrote:

> Hi,
> Can we have 1000 partitions in a Single topic? If not how many partitions
> will a single topic have at the max?
> Anyone, please tell.
>
> Thanks and regards,
> Gowtham S
>


Re: Many dups received by consumer (kafka_2.13)

2020-11-25 Thread Liam Clarke-Hutchinson
Can you upgrade Kafka to 2.5.1? This problem was fixed in that release.
https://issues.apache.org/jira/browse/KAFKA-9839

On Wed, Nov 25, 2020 at 9:44 PM Dev Op  wrote:

> Hello community! Hasn't anyone faced a similar problem? I see nobody can
> give me advice on what's happening with our Kafka cluster. :(
>
> пн, 9 нояб. 2020 г. в 10:57, Dev Op :
>
> > Hello all!
> >
> > Please, help me to understand why my consumer start receives the
> > duplicates. I think it is because of problems on my kafka1 node.
> >
> > Cluster consists of three nodes: kafka1 (192.168.137.19, id=1),
> > kafka2 (192.168.137.20, id=2), kafka3 ( 192.168.137.21, id=3)
> > Version of Kafka: kafka_2.13-2.4.1
> > Configs:
> > - Broker config (server.properties from kafka1):
> > https://pastebin.com/MR20rZdQ
> > - Zookeeper config (zookeeper.properties from kafka1):
> > https://pastebin.com/vCpFU0gp
> >
> > /opt/kafka_2.13-2.4.1/bin/kafka-topics.sh --describe --topic in_raw
> > --zookeeper localhost:2181
> > Topic: in_raw   PartitionCount: 1   ReplicationFactor: 3Configs:
> > Topic: in_raw   Partition: 0Leader: 1   Replicas: 1,3,2
> > Isr: 1,2,3
> >
> > Producer put one msg in `in_raw' topic msg, after it our consumer starts
> > receive many dups from that topic every 10 minutes:
> >
> > The first duplicate occurrence was at 20:01:
> > $ xzcat parsers.log-20201105.xz | perl -MData::Dumper -lne 'if
> > (/(unitId=\d+, unitDate=\d+, msgNumber=\d+)/) { ++$a->{$1}; die "$_\n" if
> > $a->{$1} > 1; }'
> > 2020-11-04 20:01:47.173
> > [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1]
> ParserService
> > - PARSE: Event{id='aeb33e8d-f7e5-4734-bc41-3c93da3cf94f', unitId=1073,
> > unitDate=1604519428552, msgNumber=6948}
> > ...
> >
> > A couple of record from log file:
> > 2020-11-04 19:54:52.740
> > [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1]
> ParserService
> > - PARSE: Event{id='86cc792b-fb5e-4ebb-be49-7a51f3a1c954', unitId=1073,
> > unitDate=1604519428552, msgNumber=6948}
> > 2020-11-04 20:01:47.173
> > [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1]
> ParserService
> > - PARSE: Event{id='aeb33e8d-f7e5-4734-bc41-3c93da3cf94f', unitId=1073,
> > unitDate=1604519428552, msgNumber=6948}
> > 2020-11-04 20:11:47.217
> > [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1]
> ParserService
> > - PARSE: Event{id='05a059e0-8002-48d0-b2da-269e42b879a0', unitId=1073,
> > unitDate=1604519428552, msgNumber=6948}
> > 2020-11-04 20:21:47.185
> > [parser-59bb945d-1e57-454e-a152-b6c92b6bf981-StreamThread-1]
> ParserService
> > - PARSE: Event{id='5b590bde-9e86-4660-8916-db4a590ba12e', unitId=1073,
> > unitDate=1604519428552, msgNumber=6948}
> > ..and so on.
> >
> > Something went wrong earlier at 19:50.
> >
> > Log from kafka1 broker:
> >
> > [2020-11-04 19:04:02,195] INFO [GroupMetadataManager brokerId=1] Removed
> 0
> > expired offsets in 0 milliseconds.
> > (kafka.coordinator.group.GroupMetadataManager)
> > [2020-11-04 19:14:02,195] INFO [GroupMetadataManager brokerId=1] Removed
> 0
> > expired offsets in 0 milliseconds.
> > (kafka.coordinator.group.GroupMetadataManager)
> > [2020-11-04 19:24:02,195] INFO [GroupMetadataManager brokerId=1] Removed
> 0
> > expired offsets in 0 milliseconds.
> > (kafka.coordinator.group.GroupMetadataManager)
> > [2020-11-04 19:34:02,195] INFO [GroupMetadataManager brokerId=1] Removed
> 0
> > expired offsets in 0 milliseconds.
> > (kafka.coordinator.group.GroupMetadataManager)
> > [2020-11-04 19:44:02,195] INFO [GroupMetadataManager brokerId=1] Removed
> 0
> > expired offsets in 0 milliseconds.
> > (kafka.coordinator.group.GroupMetadataManager)
> > [2020-11-04 19:50:19,506] WARN Client session timed out, have not heard
> > from server in 7997ms for sessionid 0x160d4310001
> > (org.apache.zookeeper.ClientCnxn)
> > [2020-11-04 19:50:19,526] INFO Client session timed out, have not heard
> > from server in 7997ms for sessionid 0x160d4310001, closing socket
> > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> > [2020-11-04 19:50:20,774] INFO Opening socket connection to server
> > kafka2.8m.local/192.168.137.20:2181. Will not attempt to authenticate
> > using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> > [2020-11-04 19:50:20,775] INFO Socket connection established, initiating
> > session, client: /192.168.137.19:57606, server: kafka2.8m.local/
> > 192.168.137.20:2181 (org.apache.zookeeper.ClientCnxn)
> > [2020-11-04 19:50:22,776] WARN Client session timed out, have not heard
> > from server in 2002ms for sessionid 0x160d4310001
> > (org.apache.zookeeper.ClientCnxn)
> > [2020-11-04 19:50:22,776] INFO Client session timed out, have not heard
> > from server in 2002ms for sessionid 0x160d4310001, closing socket
> > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> > [2020-11-04 19:50:23,360] INFO Opening socket connection to server
> > kafka3.8m.local/192.168.137.21:2181. Will not attempt to 

Re: Kafka issue (Leaders not elected)

2020-11-23 Thread Liam Clarke-Hutchinson
Hi Mohammad,

Leader elections are generally seldom and usually quick.

If a leader is unable to elected, this could point to a) replication being
set too low or b) inter-broker communication issues, plus a bunch of other
possible problems.

You'd have to look into your Kafka broker's logs to figure out more.

Cheers,

Liam Clarke-Hutchinson



On Tue, 24 Nov. 2020, 4:15 am Mohmmad Ahmad,  wrote:

> Hi There,
>
> we are facing following issue in kafka. We have gone through some of the
> references and found it as a bug from kafka side. Please let us know if
> there are any solutions.
>
> │ 47] [PID:1] Producer send message failed, err:kafka server: In the
> middle of a leadership election, there is currently no leader for this
> partition and │
>
>
>
> Thanks,
> Mohmmad Ahmad
>


Re: Kafka Streams: SessionWindowedSerde Vs TimeWindowedSerde. Ambiguous implicit values

2020-11-19 Thread Liam Clarke-Hutchinson
That said John, nothing wrong with being explicit in code. :)

On Fri, 20 Nov. 2020, 1:46 pm John Roesler,  wrote:

> Oh, nice. Thanks, Daniel!
>
> That’s much nicer than my ham-handed approach.
>
> Thanks,
> John
>
> On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote:
> > Hope this helps, I tried copying your code into a sample application. I
> got
> > it to compile with the implicits all resolving. I think the trick was
> there
> > were two implementations for Windowing Serdes.  You just need to block
> one
> > from the imports.  See if that fits with what you are doing.  Oh also, I
> > noticed that the types were not resolving when calling builder.stream,
> so I
> > put [String, String] in the builder.  Here is a gist, which formats
> better.
> >
> > https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
> >
> > import java.time.Duration
> > import java.util.Properties
> >
> > import org.apache.kafka.common.config.Config
> > import org.apache.kafka.streams.Topology
> > import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
> > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}
> > import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped,
> > Materialized, Produced}
> > import org.apache.kafka.streams.scala.{ByteArraySessionStore,
> StreamsBuilder}
> >
> > class SampleStream {
> >   def createTopology(conf: Config, properties: Properties): Topology = {
> >
> > implicit val produced: Produced[Windowed[String], Long] =
> >   Produced.`with`[Windowed[String], Long]
> >
> > implicit val grouped: Grouped[String, String] =
> >   Grouped.`with`[String, String]
> >
> > implicit val consumed: Consumed[String, String] =
> >   Consumed.`with`[String, String]
> >
> > implicit val materialized: Materialized[String, Long,
> > ByteArraySessionStore] = Materialized.`with`[String, Long,
> > ByteArraySessionStore]
> >
> > val builder: StreamsBuilder = new StreamsBuilder()
> >
> > builder
> >   .stream[String, String]("streams-plaintext-input")
> >   .groupBy((_, word) => word)
> >   .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
> >   .count()
> >   .toStream
> >   .to("streams-pipe-output")
> >
> > builder.build()
> >   }
> > }
> >
> >
> >
> >
> > On Thu, Nov 19, 2020 at 7:24 AM John Roesler 
> wrote:
> >
> > > Hi Eric,
> > >
> > > Sure thing. Assuming the definition of ‘produced’ you had tried in your
> > > code, it’s just:
> > >
> > > ...
> > > .toStream.to("streams-pipe-output")(produced)
> > >
> > > As far as the json serde goes, I think that I wrote an example of using
> > > Jackson to implement a serde in Confluent’s kafka-streams-examples
> repo.
> > > I’m not sure what other/better examples
> > > might be out there.
> > >
> > > Hope this helps,
> > > John
> > >
> > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
> > > > Not sure what you mean by "pass it explicitly". The definition of
> 'to' is
> > > > given below. Can we pass it explicitly in this case. If yes, can you
> > > please
> > > > show me how?
> > > >
> > > > def to(topic: String)(implicit produced: Produced[K, V]): Unit =
> > > >   inner.to(topic, produced)
> > > >
> > > >
> > > > Also not sure how to use a self documenting format like JSON. Any
> > > > examples to share?
> > > >
> > > >
> > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler 
> > > wrote:
> > > >
> > > > > Hi Eric,
> > > > >
> > > > > Ah, that’s a bummer. The correct serde is the session windowed
> serde,
> > > as I
> > > > > can see you know. I’m afraid I’m a bit rusty on implicit resolution
> > > rules,
> > > > > so I can’t be much help there.
> > > > >
> > > > > But my general recommendation for implicits is that when things get
> > > weird,
> > > > > just don’t use them at all. For example, you can just explicitly
> pass
> > > the
> > > > > Produced in the second arg list of ‘to’.
> > > > >
> > > > > One other tip is that the serialized form produced by those serdes
> is
> > > kind
> > > > > of specialized and might not be the most convenient for your use.
> If
> > > this
> > > > > is just a POC, if suggest mapping the keys to strings, so they are
> > > > > human-readable. If this is a production use case, then you might
> want
> > > to
> > > > > use a more self-documenting format like JSON or AVRO. Just my two
> > > cents.
> > > > >
> > > > > I hope this helps!
> > > > > -John
> > > > >
> > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> > > > > > I keep getting '*ambiguous implicit values*' message in the
> following
> > > > > code.
> > > > > > I tried several things (as can be seen from a couple of lines
> I've
> > > > > > commented out). Any ideas on how to fix this? This is in *Scala*.
> > > > > >
> > > > > >  def createTopology(conf: Config, properties: Properties):
> Topology =
> > > > > > {//implicit val sessionSerde =
> > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//implicit
> val
> > > > > > 

Re: KafkaProducer.send() blocks when metadata noch present

2020-11-02 Thread Liam Clarke-Hutchinson
Hi,

So yeah, you can set the max.block.ms to 0, at the cost of perhaps some
data loss and occasional noise on the console.

But usual logging solution to this is to wrap the Kafka appender in an
async appender with an appropriately sized buffer.

Cheers,

Liam Clarke

On Mon, 2 Nov. 2020, 11:27 pm DSA SA,  wrote:

> Hello everyone!
>
> We are using the Log4j Kafka Appender to send application LogEvents to a
> Kafka topic.
>
> We are using the syncSend=false producer attribute to asynchronously send
> LogEvents to Kafka.
> This setting works well for already connected producers which are not able
> to connect to the cluster anymore. For example due to network connectivity
> issues.
> Thus, the kafka producer send() method won't wait for an answer from the
> brokers and exceptions are logged and discarded.
>
> However, for applications that can't initially reach the kafka cluster, the
> send() method will block for every LogEntry, as configured in the producer
> property max.block.ms.
>
> The following exception is thrown:
> org.apache.kafka.common.errors.TimeoutException: Topic logs not present in
> metadata after 1000 ms.
>
> This is also described in the kafka producer's max.block.ms property
> documentation:
> "The configuration controls how long KafkaProducer.send() and
> KafkaProducer.partitionsFor() will block. These methods can be blocked
> either because the buffer is full or metadata unavailable."
>
> I wonder if the send() call will also block after metadata.max.age has
> expired?
>
> Is there an out-of-the-box workaround ,thus the send() call will not block,
> without modifying max.block.ms settings?
>
> Or is the only solution an independent kafka consumer which checks the
> connectivity to the cluster. And enables/disables the log4j kafka appender
> to prevent blocking the application?
>
> Kind regards
>


Re: multi-threaded consumer configuration like stream threads?

2020-10-27 Thread Liam Clarke-Hutchinson
Hi Pushkar,

No. You'd need to combine a consumer with a thread pool or similar as you
prefer. As the docs say (from
https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
)

We have intentionally avoided implementing a particular threading model for
> processing. This leaves several options for implementing multi-threaded
> processing of records.
> 1. One Consumer Per Thread
> A simple option is to give each thread its own consumer instance. Here are
> the pros and cons of this approach:
>
>- *PRO*: It is the easiest to implement
>
>
>- *PRO*: It is often the fastest as no inter-thread co-ordination is
>needed
>
>
>- *PRO*: It makes in-order processing on a per-partition basis very
>easy to implement (each thread just processes messages in the order it
>receives them).
>
>
>- *CON*: More consumers means more TCP connections to the cluster (one
>per thread). In general Kafka handles connections very efficiently so this
>is generally a small cost.
>
>
>- *CON*: Multiple consumers means more requests being sent to the
>server and slightly less batching of data which can cause some drop in I/O
>throughput.
>
>
>- *CON*: The number of total threads across all processes will be
>limited by the total number of partitions.
>
> 2. Decouple Consumption and Processing
> Another alternative is to have one or more consumer threads that do all
> data consumption and hands off ConsumerRecords
> <https://kafka.apache.org/26/javadoc/org/apache/kafka/clients/consumer/ConsumerRecords.html>
>  instances
> to a blocking queue consumed by a pool of processor threads that actually
> handle the record processing. This option likewise has pros and cons:
>
>- *PRO*: This option allows independently scaling the number of
>consumers and processors. This makes it possible to have a single consumer
>that feeds many processor threads, avoiding any limitation on partitions.
>
>
>- *CON*: Guaranteeing order across the processors requires particular
>care as the threads will execute independently an earlier chunk of data may
>actually be processed after a later chunk of data just due to the luck of
>thread execution timing. For processing that has no ordering requirements
>this is not a problem.
>
>
>- *CON*: Manually committing the position becomes harder as it
>requires that all threads co-ordinate to ensure that processing is complete
>for that partition.
>
> There are many possible variations on this approach. For example each
> processor thread can have its own queue, and the consumer threads can hash
> into these queues using the TopicPartition to ensure in-order consumption
> and simplify commit.


Cheers,

Liam Clarke-Hutchinson

On Tue, Oct 27, 2020 at 8:04 PM Pushkar Deole  wrote:

> Hi,
>
> Is there any configuration in kafka consumer to specify multiple threads
> the way it is there in kafka streams?
> Essentially, can we have a consumer with multiple threads where the threads
> would divide partitions of topic among them?
>


Re: Kafka Naming

2020-10-20 Thread Liam Clarke-Hutchinson
Hi Jonathon,

Kafka Connect is designed to, well, connect things to Kafka.
Kafka Streams is for stream processing using Kafka.

They're rather descriptive and apt names, don't you think? If it helps, we
abbreviate Connect as KC and Streams as KStreams in our organisation.

Kind regards,

Liam Clarke

On Wed, Oct 21, 2020 at 3:18 AM Jonathan Goings 
wrote:

> I've been working through product redesign and development with product
> groups in our organization.
>
> Has anyone ever tried to explain data flow through Kafka to non-Kafka
> people and notice the inconveniently named Kafka product offerings?
>
> Please rename Connect and Streams.
>
> Jonathan Goings
>
> NOTICE: This electronic mail message and any files transmitted with it are
> intended
> exclusively for the individual or entity to which it is addressed. The
> message,
> together with any attachment, may contain confidential and/or privileged
> information.
> Any unauthorized review, use, printing, saving, copying, disclosure or
> distribution
> is strictly prohibited. If you have received this message in error, please
> immediately advise the sender by reply email and delete all copies.
>


Re: LeaderElectionNotNeededException

2020-10-20 Thread Liam Clarke-Hutchinson
Hi Tom,

You're right, I had had a complete brain fade on this.

Turns out that the script I wrote had a logical error - while trying to
ensure every zone had a replica, it went through zones in this same order,
thus unintentionally ensuring that the two brokers in zone A were quite
often the first replica in the list, hence preferred leader for 80% of the
partitions...

...and I think this incident has very much sold my team on Cruise Control.

Thanks for the reply :)

Liam Clarke


On Tue, 20 Oct. 2020, 11:49 pm Tom Bentley,  wrote:

> Hi Liam,
>
> I think you have a misunderstanding of what preferred leader election does.
> All it does is ensure that the "preferred leader" (the first in the list of
> replicas for a partition) becomes the actual leader if it can (that is, if
> it's in the ISR) and if the current leader is already the preferred leader
> you get ElectionNotNeededException. It *doesn't* change the brokers which
> are replicating a partition, or change which broker is the preferred
> leader. So if the broker assignments are unbalanced, or ignore rack
> constraints it doesn't help.
>
> Obviously you can use kafka-reassign-partitions.sh to change how partitions
> are assigned to brokers, including which is the preferred leader, in order
> to ensure the cluster is better balanced. But it's up to you to figure out
> how to assign replicas to the brokers to achieve that (i.e. for each
> partition come up with the list of brokers which should have a replica,
> with the preferred one being first in the list). As you note with the
> racks, in general there are several criteria to consider and finding a good
> assignment is an exercise in constrained optimization. So people often use
> external tools to find "good" assignments according to their criteria, and
> often to manage the reassignment process too. There are several open source
> tools available, such as Cruise Control.
>
> Hope this helps,
>
> Tom
>
> On Fri, Oct 16, 2020 at 7:05 AM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > And to follow up on this, we rolled one of the 41% brokers, partition
> > leadership was redistributed to other brokers, but a preferred leader
> > election immediately led to the exact same distribution.
> >
> > What's worse, is both of the 41% leadership brokers are in the same rack.
> >
> > Very keen for advice, or in the worst case, recommendations on how to
> > manually elect leaders for a topic-partition to get us through the
> weekend.
> >
> > Kind regards,
> >
> > Liam
> >
> > On Fri, 16 Oct. 2020, 6:22 pm Liam Clarke-Hutchinson, <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Kia ora,
> > >
> > > I am rather bemused by this one, I'll admit. Kafka version 2.4.0. We've
> > > been migrating topic-partitions from old broker machines to new
> brokers,
> > > have redistributed replicas evenly across the new brokers, removed the
> > old
> > > brokers, and tried to run a preferred leader election.
> > >
> > > And when I did that, I got a "LeaderElectionNotNeededException", which
> I
> > > really disagree with, as two brokers out of six have 41% of partition
> > > leaders.
> > >
> > > I wonder if it's rack awareness messing with us or something? We are
> > > running two brokers in three AZs/racks with correctly configured
> > rack.ids,
> > > but what are the origins of a "LeaderElectionNotNeededException"? And
> for
> > > the love of God, where is the parameter in the admin client for "I
> > > significantly disagree".
> > >
> > > Kind regards,
> > >
> > > Liam Clarke-Hutchinson
> > >
> >
>


Re: LeaderElectionNotNeededException

2020-10-15 Thread Liam Clarke-Hutchinson
And to follow up on this, we rolled one of the 41% brokers, partition
leadership was redistributed to other brokers, but a preferred leader
election immediately led to the exact same distribution.

What's worse, is both of the 41% leadership brokers are in the same rack.

Very keen for advice, or in the worst case, recommendations on how to
manually elect leaders for a topic-partition to get us through the weekend.

Kind regards,

Liam

On Fri, 16 Oct. 2020, 6:22 pm Liam Clarke-Hutchinson, <
liam.cla...@adscale.co.nz> wrote:

> Kia ora,
>
> I am rather bemused by this one, I'll admit. Kafka version 2.4.0. We've
> been migrating topic-partitions from old broker machines to new brokers,
> have redistributed replicas evenly across the new brokers, removed the old
> brokers, and tried to run a preferred leader election.
>
> And when I did that, I got a "LeaderElectionNotNeededException", which I
> really disagree with, as two brokers out of six have 41% of partition
> leaders.
>
> I wonder if it's rack awareness messing with us or something? We are
> running two brokers in three AZs/racks with correctly configured rack.ids,
> but what are the origins of a "LeaderElectionNotNeededException"? And for
> the love of God, where is the parameter in the admin client for "I
> significantly disagree".
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>


LeaderElectionNotNeededException

2020-10-15 Thread Liam Clarke-Hutchinson
Kia ora,

I am rather bemused by this one, I'll admit. Kafka version 2.4.0. We've
been migrating topic-partitions from old broker machines to new brokers,
have redistributed replicas evenly across the new brokers, removed the old
brokers, and tried to run a preferred leader election.

And when I did that, I got a "LeaderElectionNotNeededException", which I
really disagree with, as two brokers out of six have 41% of partition
leaders.

I wonder if it's rack awareness messing with us or something? We are
running two brokers in three AZs/racks with correctly configured rack.ids,
but what are the origins of a "LeaderElectionNotNeededException"? And for
the love of God, where is the parameter in the admin client for "I
significantly disagree".

Kind regards,

Liam Clarke-Hutchinson


Re: Number of topics to which provider sends

2020-10-14 Thread Liam Clarke-Hutchinson
Hi Victoria,

The obvious advantage is that you could configure producers differently.

For example, one may be sending small messages as fast as possible, while
another producer may be utilising batching with large batch sizes and a
long linger to send larger amounts of data more efficiently, but slower.

So it really depends on how you intend to use the two topics.

Kind regards,

Liam Clarke-Hutchinson

On Thu, 15 Oct. 2020, 12:32 am Victoria Zuberman, <
victoria.zuber...@imperva.com> wrote:

> Hi,
>
> Background: Java, Kafka 2.1.0
>
> I have application that sends to two different topics.
> In theory I can use the same provider.
> Are there any advantages to having provider per topic?
> I looked for best practices for this matter but didn’t find any...
>
> Thanks,
> Victoria
> ---
> NOTICE:
> This email and all attachments are confidential, may be proprietary, and
> may be privileged or otherwise protected from disclosure. They are intended
> solely for the individual or entity to whom the email is addressed.
> However, mistakes sometimes happen in addressing emails. If you believe
> that you are not an intended recipient, please stop reading immediately. Do
> not copy, forward, or rely on the contents in any way. Notify the sender
> and/or Imperva, Inc. by telephone at +1 (650) 832-6006 and then delete or
> destroy any copy of this email and its attachments. The sender reserves and
> asserts all rights to confidentiality, as well as any privileges that may
> apply. Any disclosure, copying, distribution or action taken or omitted to
> be taken by an unintended recipient in reliance on this message is
> prohibited and may be unlawful.
> Please consider the environment before printing this email.
>


Re: Kafka Size of ISR Set(3) insufficient for min.isr 2

2020-09-26 Thread Liam Clarke-Hutchinson
Hi Franz,

The last bit of your command output shows that only one partition
(partition 3) is in sync, I've put asterisks around it,  ISR being short
for In Sync Replicas
Hence why you're seeing that exception.

Topic: FooBar Partition: 0 Leader: 3 Replicas: 2,3,1 *Isr: 3*

I'd suggest looking in the logs for brokers 1 and 2 for any ReplicaFetcher
errors as a  next step.

Cheers,

Liam Clarke-Hutchinson



On Sun, 27 Sep. 2020, 5:54 pm Franz van Betteraey, 
wrote:

> Hi all,
> <https://stackoverflow.com/posts/64080819/timeline>
>
> I have a strange Kafka Server error when mirroring data with the
> MirrorMaker 1 in Apache Kafka 2.6.
>
> |org.apache.kafka.common.errors.NotEnoughReplicasException: The size of
> the current ISR Set(3) is insufficient to satisfy the min.isr
> requirement of 2 for partition FooBar-0 |
>
> The strange thing is, that the |min.isr| setting is 2 and the ISR Set
> has 3 nodes. Nevertheless I get the /NotEnoughReplicasException/ Exception.
>
> Also taking a deeper look to the topic does not show any curiosities
>
> |[root@LoremIpsum kafka]# /usr/lib/kafka/kafka/bin/kafka-topics.sh
> --bootstrap-server localhost:9092 --describe --topic FooBar Topic:
> FooBar PartitionCount: 1 ReplicationFactor: 3 Configs:
>
> min.insync.replicas=2,cleanup.policy=compact,segment.bytes=1073741824,max.message.bytes=5242880,
> min.compaction.lag.ms
> =60480,message.timestamp.type=LogAppendTime,unclean.leader.election.enable=false
> Topic: FooBar Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3 |
>
> The logs of the 3 nodes look normal (as far as I can judge). Is there
> any other reason that could produce this message. What else could be
> checked?
>
> Thank you very much for any advice!
>
> I also posted this question on SO here:
>
> https://stackoverflow.com/questions/64080819/kafka-size-of-isr-set3-insufficient-for-min-isr-2
>
> Kind regards,
>
>Franz
>
>


Re: Poll Records Return Zero Records

2020-09-18 Thread Liam Clarke-Hutchinson
Hi Gowtham, that output indicates that there is no lag for the consumer
group involved. What makes you think there is?

On Fri, Sep 18, 2020 at 7:21 PM Gowtham S  wrote:

> Thanks for the reply  Liam Clarke-Hutchinson, Shaohan Yin
>
> > Hi, what is the output of kafka-consumer-groups.sh --describe for that
> > consumer's group please?
>
> GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET  LAG CONSUMER-ID   HOST
>   CLIENT-ID
>
> oneg   one04838037 48380370   fa9d1602
> localhost
> fa9d1602
>
>
> The poll doesn't return always zero if we make 100 poll calls in 100
> seconds it returns 0 records on 70% of poll calls.
>
>
> You could check if there are any throttles on the broker. The
> > metric kafka_server_quota_throttle_time might do you a favor.
>
>
> There are no quota throttles. We are using the default quota configuration.
> Please let us know if we miss any configuration/metrics to change/listen?
>
>
> With regards,
> Gowtham S, MCA
>
>
>
> On Fri, 18 Sep 2020 at 09:51, Shaohan Yin  wrote:
>
> > Hi Gowtham,
> >
> > You could check if there are any throttles on the broker. The
> > metric kafka_server_quota_throttle_time might do you a favor.
> >
> > Cheers
> >
> > On Fri, 18 Sep 2020 at 07:20, Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Hi, what is the output of kafka-consumer-groups.sh --describe for that
> > > consumer's group please?
> > >
> > > On Thu, 17 Sep. 2020, 7:37 pm Gowtham S, 
> > wrote:
> > >
> > > > Hello All,
> > > > We are consuming a topic with a single partition, the
> > consumer.poll(1000)
> > > > returns "0" records mostly event if we have more than 1 records
> in
> > > lag.
> > > > In which case it will behave like this.
> > > > We are using Kafka-2.4.0 client and 2.4.0 broker.  The single record
> > size
> > > > is 100Kb.
> > > >
> > > > Consumer configuration
> > > >
> > > >- fetch.max.bytes=102428800
> > > >- fetch.min.bytes=1
> > > >- receive.buffer.bytes=-1
> > > >- max.partition.fetch.bytes=102428800
> > > >- max.poll.records=1000
> > > >
> > > > Can anyone help us to resolve this?
> > > >
> > > > Thanks and  regards,
> > > > Gowtham S, MCA
> > > >
> > >
> >
>


Re: Poll Records Return Zero Records

2020-09-17 Thread Liam Clarke-Hutchinson
Hi, what is the output of kafka-consumer-groups.sh --describe for that
consumer's group please?

On Thu, 17 Sep. 2020, 7:37 pm Gowtham S,  wrote:

> Hello All,
> We are consuming a topic with a single partition, the consumer.poll(1000)
> returns "0" records mostly event if we have more than 1 records in lag.
> In which case it will behave like this.
> We are using Kafka-2.4.0 client and 2.4.0 broker.  The single record size
> is 100Kb.
>
> Consumer configuration
>
>- fetch.max.bytes=102428800
>- fetch.min.bytes=1
>- receive.buffer.bytes=-1
>- max.partition.fetch.bytes=102428800
>- max.poll.records=1000
>
> Can anyone help us to resolve this?
>
> Thanks and  regards,
> Gowtham S, MCA
>


Re: Amazon MSK Feeback

2020-09-07 Thread Liam Clarke-Hutchinson
Hi Himanshu,

We looked at MSK when we moved into AWS. Some of the other aspects to
consider is that inter-replica network traffic isn't charged for separately
under MSK (whereas traffic between EC2 nodes is charged, and at a higher
rate when it's inter-AZ) so if you're running a high replica situation, MSK
may offer some savings over your own Kafka deployment. You can also get
Prometheus monitoring rather simply with it. Likewise it makes upgrading a
cluster rather trivial.

A caveat - in some regions, you can only have a two AZ MSK cluster, not
three AZ clusters, I encountered this in ap-southeast-2 (Sydney). This
means that losing one AZ could prevent ZooKeeper from reaching quorum.
Also, if any of your replicas end up under-replicated or you want to
reassign partitions etc. you'll still have to do that manually. MSK doesn't
really offer anything in terms of per-node recovery more than EC2 already
does.

In the end, we decided not to use MSK because our desired failover topology
didn't quite fit how MSK works, and because we wanted to retain flexibility
- an obvious example being that we could upgrade to 2.5 when we needed it,
instead of waiting for AWS to catch up - latest supported version is 2.4.1.

I'd suggest trialling MSK alongside an existing cluster to get some ideas
of how the costs will stack up for you, to decide if it's the right path.

Kind regards,

Liam Clarke-Hutchinson


On Mon, Sep 7, 2020 at 9:27 PM Himanshu Shukla 
wrote:

> The reasons which I could think are
>
>
>1. I need not take care of the state of the cluster for scenarios when
>the broker goes down or the zookeeper goes down.
>2. They are by default deploying the cluster across multiple
>Availability zones.
>3. They have their own mirroring policy across the AZs.
>
>
>
> On Mon, Sep 7, 2020 at 2:45 PM Tom Black  wrote:
>
> >
> >
> > Himanshu Shukla wrote:
> > > We are planning to go for amazon MSK instead of having our own self
> > > launched cluster on bare EC2 machines.
> > >
> > > It would be very helpful, if someone who has used it before or know
> about
> > > it, can share the feedback/observations regarding the same.
> > >
> > > My main concerns are,
> > > Is it worth using Amazon MSK?
> >
> > They are expensive. I am curious why you'd consider to use the full
> > managed service instead of building your own cluster?
> >
> > Regards.
> >
>
>
> --
> Regards,
> Himanshu Shukla
>


Re: Kafka Streams Key-value store question

2020-08-20 Thread Liam Clarke-Hutchinson
Hi Pirow,

You can configure the auto offset reset for your stream source's consumer
to "earliest" if you want to consume all available data if no committed
offset exists. This will populate the state store on first run.

Cheers,

Liam Clarke-Hutchinson


On Thu, 20 Aug. 2020, 11:58 pm Pirow Engelbrecht, <
pirow.engelbre...@etion.co.za> wrote:

> Hi Bill,
>
>
>
> Yes, that seems to be exactly what I need. I’ve instantiated this global
> store with:
>
> topology.addGlobalStore(storeBuilder, "KVstoreSource", Serdes.String().
> deserializer(), Serdes.String().deserializer(), this.config_kvStoreTopic,
> "KVprocessor", KeyValueProcessor::new);
>
>
>
>
>
> I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs
> into the store. The problem is that if the application starts for the first
> time, it does not process any key-value pairs already in the Kafka topic.
> Is there a way around this?
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht*
> System Engineer
>
> *E.* pirow.engelbre...@etion.co.za
> *T.* +27 12 678 9740 (ext. 9879)
> *M.* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> <https://goo.gl/maps/v9ZbwjqpPyL2>
> *www.etion.co.za* <https://www.parsec.co.za/>
>
> <https://www.parsec.co.za/>
>
> Facebook
> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
> <https://twitter.com/Etionlimited> | Instagram
> <https://www.instagram.com/Etionlimited/>
>
>
>
> *From:* Bill Bejeck 
> *Sent:* Wednesday, 19 August 2020 3:53 PM
> *To:* users@kafka.apache.org
> *Subject:* Re: Kafka Streams Key-value store question
>
>
>
> Hi Pirow,
>
> If I'm understanding your requirements correctly, I think using a global
> store
> <
> https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier-
> >
> will
> work for you.
>
> HTH,
> Bill
>
> On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht <
> pirow.engelbre...@etion.co.za> wrote:
>
> > Hello,
> >
> >
> >
> > We’re building a JSON decorator using Kafka Streams’ processing API.
> >
> >
> >
> > The process is briefly that a piece of JSON should be consumed from an
> > input topic (keys are null, value is the JSON). The JSON contains a field
> > (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a
> > timestamp) is used to look-up another piece JSON from a key-value topic
> > (keys are all the different values of “thisField”, values are JSON). This
> > key-value topic is created by another service in Kafka. This additional
> > piece of JSON then gets appended to the input JSON and the result gets
> > written to an output topic (keys are null, value is now the original
> JSON +
> > lookup JSON).
> >
> >
> >
> > To do the query against a key-value store, ideally I want Kafka Streams
> to
> > directly create and update a window key-value store in memory (or disk)
> > from my key-value topic in Kafka, but I am unable to find a way to
> specify
> > this through the StoreBuilder interface. Does anybody know how to do
> this?
> >
> > Here is my current Storebuilder code snippet:
> >
> > StoreBuilder> storeBuilder = Stores.
> > windowStoreBuilder(
> >
> > Stores.persistentWindowStore("loopkupStore",
> > Duration.ofDays(14600), Duration.ofDays(14600), false),
> >
> > Serdes.String(),
> >
> > Serdes.String());
> >
> > storeBuilder.build();
> >
> >
> >
> >
> >
> > Currently my workaround is to have a sink for the key-value store and
> then
> > create/update this key-value store using a node in the processing
> topology,
> > but this has issues when restarting the service, i.e. when the service is
> > restarted, the key-value store topic needs to be consumed from the start
> to
> > rebuild the store in memory, but the sink would have written commit
> offsets
> > which prevents the topic to be consumed from the start. I also cannot use
> > streams.cleanUp() as this will reset all the sinks in my topology (y
> other
> > sink ingests records from the input topic).
> >
> >
> >
> > Thanks
> >
> >
> >
> > *Pirow Engelbrecht*
> > System Engineer
> >
> > *E.* pirow.engelbre...@etion.co.za
> > *T.* +27 12 678 9740 (ext. 9879)
> > *M.* +27 63 148 3376
> >
> > 76 Regency Drive | Irene | Centurion | 0157
> > <https://goo.gl/maps/v9ZbwjqpPyL2>
> > *www.etion.co.za <https://www.parsec.co.za/>*
> >
> > <https://www.parsec.co.za/>
> >
> > Facebook
> > <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
> > YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
> > LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
> > <https://twitter.com/Etionlimited> | Instagram
> > <https://www.instagram.com/Etionlimited/>
> >
> >
> >
>


Re: Quick clarification on ConsumerRecord.getOffset() and RecordMetadata.getOffset()

2020-08-17 Thread Liam Clarke-Hutchinson
Brilliant, thanks for that Matthias :)

On Tue, 18 Aug. 2020, 3:07 am Matthias J. Sax,  wrote:

> Your understanding is correct.
>
> On 8/16/20 10:25 PM, Liam Clarke-Hutchinson wrote:
> > Kia ora koutou katoa,
> >
> > Just double checking my understanding - the RecordMetadata returned by a
> > producer send returns an offset for the record - is it the actual offset
> of
> > the record on the partition, or the next offset? Reading the code
> indicates
> > it was the actual offset.
> >
> > And likewise, when I call getOffset on a ConsumerRecord, if I'm reading
> the
> > comments in the code correctly, it was the offset of that record, not the
> > next offset.
> >
> > As opposed to consumer.position which will return the next offset (and
> how
> > when offsets are committed, it's the next offset to read).
> >
> > Am I correct in my understanding?
> >
> > Kind regards,
> >
> > Liam Clarke-Hutchinson
> >
>
>


Quick clarification on ConsumerRecord.getOffset() and RecordMetadata.getOffset()

2020-08-16 Thread Liam Clarke-Hutchinson
Kia ora koutou katoa,

Just double checking my understanding - the RecordMetadata returned by a
producer send returns an offset for the record - is it the actual offset of
the record on the partition, or the next offset? Reading the code indicates
it was the actual offset.

And likewise, when I call getOffset on a ConsumerRecord, if I'm reading the
comments in the code correctly, it was the offset of that record, not the
next offset.

As opposed to consumer.position which will return the next offset (and how
when offsets are committed, it's the next offset to read).

Am I correct in my understanding?

Kind regards,

Liam Clarke-Hutchinson


Re: Time stamp in Kafka

2020-08-15 Thread Liam Clarke-Hutchinson
One option could be to configure the topic to use log append time as the
timestamp, not producer timestamp.

On Sun, 16 Aug. 2020, 7:23 am Matthias J. Sax,  wrote:

> What is you configs? There are many configs that interact with each other.
>
> I did a Kafka Summit talk about this topic that may help:
>
> https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/
>
>
> -Matthias
>
> On 8/15/20 11:48 AM, KhajaAsmath Mohammed wrote:
> > Hi,
> >
> > I don’t have code related to it. It is pushed by other team who uses
> .net framework to push data into Kafka topic. I am suspecting it is issue
> with library they use.
> >
> > I used java library to push data and it works as expected.
> >
> > Thanks,
> > Asmath
> >
> > Sent from my iPhone
> >
> >> On Aug 15, 2020, at 1:44 PM, pierreneter  wrote:
> >>
> >> can you show a little of your code related to that?
> >>
> >>> On Sat, Aug 15, 2020 at 11:50 PM KhajaAsmath Mohammed <
> >>> mdkhajaasm...@gmail.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> We have a producer application that has written data to Kafka topic.
> >>>
> >>> We are reading the data from Kafka topic using spark streaming but the
> >>> time stamp on Kafka is 1969-12-31 format for all the data.
> >>>
> >>> Is there a way to fix this while reading ?
> >>>
> >>> Thanks,
> >>> Asmath
> >>>
> >>> Sent from my iPhone
>
>


Re: backup

2020-08-10 Thread Liam Clarke-Hutchinson
Hi Kumar,

You can restore but you'll lose data since your last snapshot. I use KC ->
SO instead for obvious reasons.

All that said, I've never had multiple Kafka nodes fail to the extent that
I need my S3 backup. But it's good to have it, for peace of mind.

Thanks,

Liam Clarke-Hutchinson

On Mon, 10 Aug. 2020, 2:27 pm kumar,  wrote:

> Hi Liam -  I did not understand cloning kafka broker volumes.
>
> if  you have 1 TB disk , Assuming the usage is 65%  data in the volume is
> changing so fast.
> take 650 GB every hour or every min ? how do we restore if there was
> failure?
>
> Oracle database provides point in time recovery(incremental+full
> backup+archive logs) of their database. is it possible to recover kafka
> like that?
> We had storage failure of the entire site. We were not confident on the
> data recovered on kafka compared to oracle database. We had 3 kafka nodes
> with no mirror maker.
>
> My understanding is  replication and mirror maker works until there is no
> lag on replication.  There is no guarantee of data loss.
> I never tested mirror maker with compact topics.
>
> Thanks,
> pt
>
>
>
>
> On Sun, Aug 9, 2020 at 7:52 AM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hi Dor,
> >
> > There are multiple approaches.
> >
> > 1) Clone your Kafka broker volumes
> > 2) Use Kafka Connect to stream all data to a different storage system
> such
> > as Hadoop, S3, etc.
> > 3) Use Mirrormaker to replicate all data to a backup cluster.
> >
> > Which approach is right for you really depends on your needs, but
> > generally, if you have enough nodes in your clusters, and a correct
> > replication setting for a topic, you won't need to backup Kafka. As a
> rule
> > of thumb, a topic with a replication factor of N can survive N - 1 node
> > failures without data loss.
> >
> > If you can provide more information about the problems you're trying to
> > solve, our advice can be more directed :)
> >
> > Kind regards,
> >
> > Liam Clarke-Hutchinson
> >
> > On Sun, Aug 9, 2020 at 11:43 PM Dor Ben Dov 
> > wrote:
> >
> > > Hi All,
> > > What is the best recommended way, and tool to backup kafka in
> production?
> > > Regards,
> > > Dor
> > >
> > > This email and the information contained herein is proprietary and
> > > confidential and subject to the Amdocs Email Terms of Service, which
> you
> > > may review at https://www.amdocs.com/about/email-terms-of-service <
> > > https://www.amdocs.com/about/email-terms-of-service>
> > >
> > >
> >
>


Re: backup

2020-08-09 Thread Liam Clarke-Hutchinson
Hi Dor,

Yep, we're using Mirrormaker 2.0 currently, migrated from MM 1 with no real
issues. Admittedly the documentation is a bit lacking currently, but
between the KIP
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0>
and
the occasional code reading, we got there fine. One caveat to my experience
- it's built on top of Kafka Connect, and as we already had a cluster of KC
workers to stream data into/out of our Kafka cluster from various sources,
MM 2 was easy to deploy, just another config. So if you're starting from
scratch, there might be some overhead around getting your Kafka Connect
workers deployed and configured correctly. But if you're looking to run a
hot/warm (main/backup) cluster scenario, MM 2 is ideal.

Kind regards,

Liam Clarke-Hutchinson

On Sun, Aug 9, 2020 at 11:56 PM Dor Ben Dov  wrote:

> Hi Liam,
> No actual problem just wondering, still you answered most of the things I
> already know so no I am convinced that I am ok.
> Still, wondering about the mmk2. How reliable is it, have you used it in
> production for instance?
>
> Regards,
> Dor
>
> -----Original Message-
> From: Liam Clarke-Hutchinson 
> Sent: Sunday, August 9, 2020 2:52 PM
> To: users@kafka.apache.org
> Subject: Re: backup
>
> Hi Dor,
>
> There are multiple approaches.
>
> 1) Clone your Kafka broker volumes
> 2) Use Kafka Connect to stream all data to a different storage system such
> as Hadoop, S3, etc.
> 3) Use Mirrormaker to replicate all data to a backup cluster.
>
> Which approach is right for you really depends on your needs, but
> generally, if you have enough nodes in your clusters, and a correct
> replication setting for a topic, you won't need to backup Kafka. As a rule
> of thumb, a topic with a replication factor of N can survive N - 1 node
> failures without data loss.
>
> If you can provide more information about the problems you're trying to
> solve, our advice can be more directed :)
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>
> On Sun, Aug 9, 2020 at 11:43 PM Dor Ben Dov 
> wrote:
>
> > Hi All,
> > What is the best recommended way, and tool to backup kafka in production?
> > Regards,
> > Dor
> >
> > This email and the information contained herein is proprietary and
> > confidential and subject to the Amdocs Email Terms of Service, which
> > you may review at
> > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.
> > amdocs.com%2Fabout%2Femail-terms-of-servicedata=02%7C01%7Cdor.ben
> > -dov%40amdocs.com%7C41b5e7ba2ac947d8f92208d83c5ac7c3%7Cc8eca3ca127646d
> > 59d9da0f2a028920f%7C0%7C0%7C637325707889579052sdata=jXXwnZCqI5bjQ
> > 8beZTR8WM7l1yhEPkuSQJ%2FMqfJlz00%3Dreserved=0 <
> > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.
> > amdocs.com%2Fabout%2Femail-terms-of-servicedata=02%7C01%7Cdor.ben
> > -dov%40amdocs.com%7C41b5e7ba2ac947d8f92208d83c5ac7c3%7Cc8eca3ca127646d
> > 59d9da0f2a028920f%7C0%7C0%7C637325707889579052sdata=jXXwnZCqI5bjQ
> > 8beZTR8WM7l1yhEPkuSQJ%2FMqfJlz00%3Dreserved=0>
> >
> >
> This email and the information contained herein is proprietary and
> confidential and subject to the Amdocs Email Terms of Service, which you
> may review at https://www.amdocs.com/about/email-terms-of-service <
> https://www.amdocs.com/about/email-terms-of-service>
>
>


Re: backup

2020-08-09 Thread Liam Clarke-Hutchinson
Hi Dor,

There are multiple approaches.

1) Clone your Kafka broker volumes
2) Use Kafka Connect to stream all data to a different storage system such
as Hadoop, S3, etc.
3) Use Mirrormaker to replicate all data to a backup cluster.

Which approach is right for you really depends on your needs, but
generally, if you have enough nodes in your clusters, and a correct
replication setting for a topic, you won't need to backup Kafka. As a rule
of thumb, a topic with a replication factor of N can survive N - 1 node
failures without data loss.

If you can provide more information about the problems you're trying to
solve, our advice can be more directed :)

Kind regards,

Liam Clarke-Hutchinson

On Sun, Aug 9, 2020 at 11:43 PM Dor Ben Dov  wrote:

> Hi All,
> What is the best recommended way, and tool to backup kafka in production?
> Regards,
> Dor
>
> This email and the information contained herein is proprietary and
> confidential and subject to the Amdocs Email Terms of Service, which you
> may review at https://www.amdocs.com/about/email-terms-of-service <
> https://www.amdocs.com/about/email-terms-of-service>
>
>


Re: kafka per topic metrics

2020-07-15 Thread Liam Clarke-Hutchinson
Hi Pushkar,

Best way to see what metrics are available is to connect to a broker via
JConsole to see the exposed mbeans.

You can iterate over them programmatically by using the MBean API.

Also recommend chapter 10 of Kafka: The Definitive Guide, it covers the
metrics really well.

Cheers,

Liam Clarke-Hutchinson

On Thu, 16 Jul. 2020, 5:05 pm Pushkar Deole,  wrote:

> Thanks Liam...
> Few questions: in your pattern the topic parameter is appended pattern:
> 'kafka.server topic=(.+)><>OneMinuteRate'
> however the kafka docs doesn't mention that
> kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
>
>   does the topic parameter available in all BrokerTopicMetrics and can the
> broker provide that parameter dynamically by iterating through all topics
> or selective topics? Where will the logic reside to iterate through
> available topics and export metrics for all of them?
>
> On Thu, Jul 16, 2020 at 10:04 AM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Whoops, just spotted a typo - the second $1 in the above snippet should
> of
> > course be $2.
> >
> > On Thu, Jul 16, 2020 at 4:33 PM Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Hi Pushkar,
> > >
> > > There are broker side metrics for messages in / bytes in / bytes out
> per
> > > topic per second. I use this jmx_exporter rule to export them:
> > >   - pattern: 'kafka.server > > topic=(.+)><>OneMinuteRate'
> > > name: kafka_broker_per_topic_$1_one_minute_rate
> > > labels:
> > >   topic: $1
> > > type: GAUGE
> > >
> > > You can't get the number of messages out per topic from the broker
> > > because... I think it's somehow related to batching, or similar, it
> > doesn't
> > > count messages out, only bytes out. You can, however, get that metric
> > from
> > > the consumer if you're using the Java Kafka client, there's a per-topic
> > > messages consumed per second metric exposed as an MBean.
> > >
> > > You could either use jmx_exporter to also export that from the client
> > app,
> > > or if possible, add some code that connects to the mbean inside the JVM
> > and
> > > then exports it via any pre-existing Prometheus registry. You might
> want
> > to
> > > then use a Prometheus aggregating rule to collate all the per-consumer
> > apps
> > > into a per-consumer-group metric, unless the per consumer granularity
> is
> > of
> > > interest to you: https://prometheus.io/docs/practices/rules/
> > >
> > > Hope that helps,
> > >
> > > Kind regards,
> > >
> > > Liam Clarke-Hutchinsons
> > >
> > > On Thu, 16 Jul. 2020, 3:46 pm Pushkar Deole, 
> > wrote:
> > >
> > >> Thanks Claudia! For broker level metrics, we are also using same jmx
> > >> exporter to export those metrics to prometheus.
> > >> Are you fetching any per topic metrics from broker? e.g. messages
> > produced
> > >> on a certain topic or messages consumed from a certain topic. I am
> > mainly
> > >> interested in these metrics.
> > >>
> > >> I read in kafka docs that they are present at producer/consumer,
> > however I
> > >> am not sure how to fetch them from consumer/producer.
> > >>
> > >> On Wed, Jul 15, 2020 at 8:32 PM Claudia Kesslau 
> > >> wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > I use https://github.com/prometheus/jmx_exporter for collecting
> > broker
> > >> > metrics and integrating them into prometheus.
> > >> >
> > >> > Hope this helps.
> > >> > Greetings,
> > >> > Claudia
> > >> >
> > >> > -Ursprüngliche Nachricht-
> > >> > Von: Pushkar Deole 
> > >> > Gesendet: Mittwoch, 15. Juli 2020 09:07
> > >> > An: users@kafka.apache.org
> > >> > Betreff: Re: kafka per topic metrics
> > >> >
> > >> > We are using prometheus as metrics collection and storage system and
> > >> > Grafana for displaying those metrics, so integration with them is
> > >> required
> > >> >
> > >> > On Wed, Jul 15, 2020 at 11:11 AM rohit garg  >
> > >> > wrote:
> > >> >
> > >> > > You can try using kafka manager and check it will fullfill most of
> > >>

Re: kafka per topic metrics

2020-07-15 Thread Liam Clarke-Hutchinson
Whoops, just spotted a typo - the second $1 in the above snippet should of
course be $2.

On Thu, Jul 16, 2020 at 4:33 PM Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Hi Pushkar,
>
> There are broker side metrics for messages in / bytes in / bytes out per
> topic per second. I use this jmx_exporter rule to export them:
>   - pattern: 'kafka.server topic=(.+)><>OneMinuteRate'
> name: kafka_broker_per_topic_$1_one_minute_rate
> labels:
>   topic: $1
> type: GAUGE
>
> You can't get the number of messages out per topic from the broker
> because... I think it's somehow related to batching, or similar, it doesn't
> count messages out, only bytes out. You can, however, get that metric from
> the consumer if you're using the Java Kafka client, there's a per-topic
> messages consumed per second metric exposed as an MBean.
>
> You could either use jmx_exporter to also export that from the client app,
> or if possible, add some code that connects to the mbean inside the JVM and
> then exports it via any pre-existing Prometheus registry. You might want to
> then use a Prometheus aggregating rule to collate all the per-consumer apps
> into a per-consumer-group metric, unless the per consumer granularity is of
> interest to you: https://prometheus.io/docs/practices/rules/
>
> Hope that helps,
>
> Kind regards,
>
> Liam Clarke-Hutchinsons
>
> On Thu, 16 Jul. 2020, 3:46 pm Pushkar Deole,  wrote:
>
>> Thanks Claudia! For broker level metrics, we are also using same jmx
>> exporter to export those metrics to prometheus.
>> Are you fetching any per topic metrics from broker? e.g. messages produced
>> on a certain topic or messages consumed from a certain topic. I am mainly
>> interested in these metrics.
>>
>> I read in kafka docs that they are present at producer/consumer, however I
>> am not sure how to fetch them from consumer/producer.
>>
>> On Wed, Jul 15, 2020 at 8:32 PM Claudia Kesslau 
>> wrote:
>>
>> > Hi,
>> >
>> > I use https://github.com/prometheus/jmx_exporter for collecting broker
>> > metrics and integrating them into prometheus.
>> >
>> > Hope this helps.
>> > Greetings,
>> > Claudia
>> >
>> > -Ursprüngliche Nachricht-
>> > Von: Pushkar Deole 
>> > Gesendet: Mittwoch, 15. Juli 2020 09:07
>> > An: users@kafka.apache.org
>> > Betreff: Re: kafka per topic metrics
>> >
>> > We are using prometheus as metrics collection and storage system and
>> > Grafana for displaying those metrics, so integration with them is
>> required
>> >
>> > On Wed, Jul 15, 2020 at 11:11 AM rohit garg 
>> > wrote:
>> >
>> > > You can try using kafka manager and check it will fullfill most of
>> > > requirement of yours.
>> > >
>> > > Thanks and Regards,
>> > > Rohit
>> > >
>> > > On Wed, Jul 15, 2020, 10:33 Pushkar Deole 
>> wrote:
>> > >
>> > > > Hi All,
>> > > >
>> > > > Any inputs as to how the kafka consumer and producer metrics can be
>> > > hooked
>> > > > up to a monitoring system such as prometheus ?
>> > > >
>> > > > On Tue, Jul 14, 2020 at 4:22 PM Pushkar Deole > >
>> > > > wrote:
>> > > >
>> > > > > i did find these metrics from confluent docs: however how can i
>> > > > > get
>> > > this
>> > > > > metric ? is it available at kafka broker?
>> > > > >
>> > > > > Per-Topic Metrics
>> > > > >
>> > > > > MBean:
>> > > > >
>> > > >
>> > > kafka.producer:type=producer-topic-metrics,client-id=([-.w]+),topic=([
>> > > -.w]+)
>> > > > > record-send-rateThe average number of records sent per second for
>> > > > > a
>> > > > topic.
>> > > > >
>> > > > > On Tue, Jul 14, 2020 at 3:27 PM Pushkar Deole
>> > > > > 
>> > > > > wrote:
>> > > > >
>> > > > >> Hi All,
>> > > > >>
>> > > > >> Need some help on kafka metrics, i am interested in certain
>> > > > >> metrics
>> > > e.g.
>> > > > >> i need to know the number of records published on a particular
>> > > > >> topic
>> > > and
>> > > > >> number of records consumed from that topic by a specific consumer
>> > > > group, i
>> > > > >> would need a total of these 2 and also average per second for
>> them.
>> > > > >>
>> > > > >> Are those metrics available on kafka brokers and if yes then
>> > > > >> which are those metrics that would give me above counts?
>> > > > >>
>> > > > >
>> > > >
>> > >
>> >
>>
>


Re: kafka per topic metrics

2020-07-15 Thread Liam Clarke-Hutchinson
Hi Pushkar,

There are broker side metrics for messages in / bytes in / bytes out per
topic per second. I use this jmx_exporter rule to export them:
  - pattern: 'kafka.server<>OneMinuteRate'
name: kafka_broker_per_topic_$1_one_minute_rate
labels:
  topic: $1
type: GAUGE

You can't get the number of messages out per topic from the broker
because... I think it's somehow related to batching, or similar, it doesn't
count messages out, only bytes out. You can, however, get that metric from
the consumer if you're using the Java Kafka client, there's a per-topic
messages consumed per second metric exposed as an MBean.

You could either use jmx_exporter to also export that from the client app,
or if possible, add some code that connects to the mbean inside the JVM and
then exports it via any pre-existing Prometheus registry. You might want to
then use a Prometheus aggregating rule to collate all the per-consumer apps
into a per-consumer-group metric, unless the per consumer granularity is of
interest to you: https://prometheus.io/docs/practices/rules/

Hope that helps,

Kind regards,

Liam Clarke-Hutchinsons

On Thu, 16 Jul. 2020, 3:46 pm Pushkar Deole,  wrote:

> Thanks Claudia! For broker level metrics, we are also using same jmx
> exporter to export those metrics to prometheus.
> Are you fetching any per topic metrics from broker? e.g. messages produced
> on a certain topic or messages consumed from a certain topic. I am mainly
> interested in these metrics.
>
> I read in kafka docs that they are present at producer/consumer, however I
> am not sure how to fetch them from consumer/producer.
>
> On Wed, Jul 15, 2020 at 8:32 PM Claudia Kesslau 
> wrote:
>
> > Hi,
> >
> > I use https://github.com/prometheus/jmx_exporter for collecting broker
> > metrics and integrating them into prometheus.
> >
> > Hope this helps.
> > Greetings,
> > Claudia
> >
> > -Ursprüngliche Nachricht-
> > Von: Pushkar Deole 
> > Gesendet: Mittwoch, 15. Juli 2020 09:07
> > An: users@kafka.apache.org
> > Betreff: Re: kafka per topic metrics
> >
> > We are using prometheus as metrics collection and storage system and
> > Grafana for displaying those metrics, so integration with them is
> required
> >
> > On Wed, Jul 15, 2020 at 11:11 AM rohit garg 
> > wrote:
> >
> > > You can try using kafka manager and check it will fullfill most of
> > > requirement of yours.
> > >
> > > Thanks and Regards,
> > > Rohit
> > >
> > > On Wed, Jul 15, 2020, 10:33 Pushkar Deole 
> wrote:
> > >
> > > > Hi All,
> > > >
> > > > Any inputs as to how the kafka consumer and producer metrics can be
> > > hooked
> > > > up to a monitoring system such as prometheus ?
> > > >
> > > > On Tue, Jul 14, 2020 at 4:22 PM Pushkar Deole 
> > > > wrote:
> > > >
> > > > > i did find these metrics from confluent docs: however how can i
> > > > > get
> > > this
> > > > > metric ? is it available at kafka broker?
> > > > >
> > > > > Per-Topic Metrics
> > > > >
> > > > > MBean:
> > > > >
> > > >
> > > kafka.producer:type=producer-topic-metrics,client-id=([-.w]+),topic=([
> > > -.w]+)
> > > > > record-send-rateThe average number of records sent per second for
> > > > > a
> > > > topic.
> > > > >
> > > > > On Tue, Jul 14, 2020 at 3:27 PM Pushkar Deole
> > > > > 
> > > > > wrote:
> > > > >
> > > > >> Hi All,
> > > > >>
> > > > >> Need some help on kafka metrics, i am interested in certain
> > > > >> metrics
> > > e.g.
> > > > >> i need to know the number of records published on a particular
> > > > >> topic
> > > and
> > > > >> number of records consumed from that topic by a specific consumer
> > > > group, i
> > > > >> would need a total of these 2 and also average per second for
> them.
> > > > >>
> > > > >> Are those metrics available on kafka brokers and if yes then
> > > > >> which are those metrics that would give me above counts?
> > > > >>
> > > > >
> > > >
> > >
> >
>


Re: Consumer Groups Describe is not working

2020-07-08 Thread Liam Clarke-Hutchinson
Hi Ann,

It's common practice in many Spark Streaming apps to store offsets external
to Kafka. Especially when checkpointing is enabled.

Are you sure that the app is committing offsets to Kafka?

Kind regards,

Liam Clarke

On Thu, 9 Jul. 2020, 8:00 am Ann Pricks,  wrote:

> Hi Ricardo,
>
> Thanks for your kind response.
>
> As per your suggestion, I have enabled trace and PFB the content of the
> log file.
>
> Log File Content:
>
> [2020-07-08 18:48:08,963] INFO Registered kafka:type=kafka.Log4jController
> MBean (kafka.utils.Log4jControllerRegistration$)
> [2020-07-08 18:48:09,244] TRACE Registered metric named MetricName
> [name=count, group=kafka-metrics-count, description=total number of
> registered metrics, tags={}] (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,289] INFO Successfully logged in.
> (org.apache.kafka.common.security.authenticator.AbstractLogin)
> [2020-07-08 18:48:09,290] TRACE LoginManager(serviceName=kafka,
> publicCredentials=[admin], refCount=1) acquired
> (org.apache.kafka.common.security.authenticator.LoginManager)
> [2020-07-08 18:48:09,292] DEBUG Updated cluster metadata version 1 to
> Cluster(id = null, nodes = [broker1:2345 (id: -1 rack: null), broker2:2345
> (id: -2 rack: null), broker3:2345 (id: -3 rack: null)], partitions = [],
> controller = null) (org.apache.kafka.clients.Metadata)
> [2020-07-08 18:48:09,303] DEBUG Added sensor with name connections-closed:
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,305] TRACE Registered metric named MetricName
> [name=connection-close-total, group=admin-metrics, description=The total
> number of connections closed, tags={}]
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,306] TRACE Registered metric named MetricName
> [name=connection-close-rate, group=admin-metrics, description=The number of
> connections closed per second, tags={}]
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,306] DEBUG Added sensor with name
> connections-created: (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,306] TRACE Registered metric named MetricName
> [name=connection-creation-total, group=admin-metrics, description=The total
> number of new connections established, tags={}]
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,306] TRACE Registered metric named MetricName
> [name=connection-creation-rate, group=admin-metrics, description=The number
> of new connections established per second, tags={}]
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,306] DEBUG Added sensor with name
> successful-authentication: (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,306] TRACE Registered metric named MetricName
> [name=successful-authentication-total, group=admin-metrics, description=The
> total number of connections with successful authentication, tags={}]
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,306] TRACE Registered metric named MetricName
> [name=successful-authentication-rate, group=admin-metrics, description=The
> number of connections with successful authentication per second, tags={}]
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,306] DEBUG Added sensor with name
> failed-authentication: (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,306] TRACE Registered metric named MetricName
> [name=failed-authentication-total, group=admin-metrics, description=The
> total number of connections with failed authentication, tags={}]
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,307] TRACE Registered metric named MetricName
> [name=failed-authentication-rate, group=admin-metrics, description=The
> number of connections with failed authentication per second, tags={}]
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,307] DEBUG Added sensor with name
> bytes-sent-received: (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,307] TRACE Registered metric named MetricName
> [name=network-io-total, group=admin-metrics, description=The total number
> of network operations (reads or writes) on all connections, tags={}]
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,307] TRACE Registered metric named MetricName
> [name=network-io-rate, group=admin-metrics, description=The number of
> network operations (reads or writes) on all connections per second,
> tags={}] (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,307] DEBUG Added sensor with name bytes-sent:
> (org.apache.kafka.common.metrics.Metrics)
> [2020-07-08 18:48:09,307] TRACE Registered metric named MetricName
> [name=outgoing-byte-total, group=admin-metrics, description=The total
> number of outgoing 

Re: NotEnoughReplicasException: The size of the current ISR Set(2) is insufficient to satisfy the min.isr requirement of 3

2020-07-07 Thread Liam Clarke-Hutchinson
Hi Nag,

ISR is the replicas that are in sync with the leader, and there's a
different ISR set for each partition of a given topic. If you use
`kafka/bin/kafka-topics --describe --topic ` it'll show you the replicas
and ISR for each partition.

min.insync.replicas and replication factor are all about preventing data
loss. Generally I set min ISR to 2 for a topic with a replication factor of
3 so that one down or struggling broker doesn't prevent producers writing
to topics, but I still have a replica of the data in case the broker acting
as leader goes down - a new partition leader can only be elected from the
insync replicas.

On Tue, Jul 7, 2020 at 7:39 PM Nag Y  wrote:

> I had the following setup Brokers : 3 - all are up and running with
> min.insync.replicas=3.
>
> I created a topic with the following configuration
>
> bin\windows\kafka-topics --zookeeper 127.0.0.1:2181 --topic topic-ack-all
> --create --partitions 4 --replication-factor 3
>
> I triggered the producer with "ack = all" and producer is able to send the
> message. However, the problem starts when i start the consumer
>
> bin\windows\kafka-console-consumer --bootstrap-server
> localhost:9094,localhost:9092 --topic topic-ack-all --from-beginning
>
> The error is
>
> NotEnoughReplicasException: The size of the current ISR Set(2) is
> insufficient to satisfy the min.isr requirement of 3
> NotEnoughReplicasException:The size of the current ISR Set(3) is
> insufficient to satisfy the min.isr requirement of 3 for partition __con
>
> I see two kinds of errors here . I went though the documentation and had
> also understaning about "min.isr", However, these error messages are not
> clear .
>
>1. What does it mean by current ISR set ? Is it different for each topic
>and what it signifies ?
>2. I guess min.isr is same as min.insync.replicas . I hope is should
>have value at least same as "replication factor" ?
>


Re: _consumer_offsets topic produce rate is dam high

2020-06-24 Thread Liam Clarke-Hutchinson
...the classpath is required to make the formatter available to the console
producer consumer...

On Thu, Jun 25, 2020 at 9:44 AM Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> To investigate, you can read the messages on the topic to figure out how
> consumer groups are committing. They're stored in a binary format, but you
> can make them human readable using a formatter. Here's what I did last time
> to do this - the classpath is required to make the formatter available to
> the console producer, but you could do this in code with Kafka as a
> dependency:
>
> export CLASSPATH=/home/development/kafka_2.12-2.4.1/libs/* \
> && /home/development/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh \
> --bootstrap-server kafka01:9092 \
> --topic __consumer_offsets \
> --from-beginning \
> --formatter 
> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" > 
> offsets.txt
>
> In case the email formatting garbles the above, here it is in a gist also 
> <https://gist.github.com/LiamClarkeFMG/4a0f038a8d1782b1348e34b3ec471c21>. You 
> can then parse and analyse the output data using your preferred scripting 
> language.
>
> Hope that helps,
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>
>
>
>
> On Thu, Jun 25, 2020 at 3:25 AM Karolis Pocius
>  wrote:
>
>> Check if any of your consumers have auto commit turned off and instead
>> commit after processing each message.
>>
>> Also, even if all consumers are using auto commit, maybe some of them have
>> the interval set to something crazy low like 1 ms.
>>
>> On Sat, Jun 20, 2020 at 8:31 PM Ashutosh singh 
>> wrote:
>>
>> > Hi Guys,
>> >
>> > Hope you all are doing well.
>> >
>> > All of sudden I see very high throughput  for _consumer_offsets topic.
>> it
>> > is around  30-40K per second.  What could be the reason for such a high
>> > rate ?  Do I need to be concerned around this ?
>> >
>> > [image: image.png]
>> >
>> >
>> > This happens for an hour and after that it goes down.  Now it is
>> happening
>> > once or twice daily.
>> >
>> > I have 8 node cluster , 1000+ topics and 644 consumer groups.
>> > All nodes have almost equal number of lead partition across nodes and
>> > almost equal partition on all nodes.
>> >
>> > Kafka version : 2.1.1
>> >
>> > If you see above graph, there are no other topics where messages are
>> more
>> > than 300 Message/sec.  only _consumer_offsets  is having so high through
>> > put.
>> >
>> > I know this is internal topic ans store metadata of topics and consumer
>> > information.  But I don't see anything abnormal in my cluster then why
>> > _consumer_offset topic is going crazy.  What is going on here ?
>> >
>> > Any help will be appreciated.
>> >
>> > --
>> > Thanx & Regard
>> > Ashutosh Singh
>> > 08151945559
>> >
>> >
>>
>


Re: _consumer_offsets topic produce rate is dam high

2020-06-24 Thread Liam Clarke-Hutchinson
To investigate, you can read the messages on the topic to figure out how
consumer groups are committing. They're stored in a binary format, but you
can make them human readable using a formatter. Here's what I did last time
to do this - the classpath is required to make the formatter available to
the console producer, but you could do this in code with Kafka as a
dependency:

export CLASSPATH=/home/development/kafka_2.12-2.4.1/libs/* \
&& /home/development/kafka_2.12-2.4.1/bin/kafka-console-consumer.sh \
--bootstrap-server kafka01:9092 \
--topic __consumer_offsets \
--from-beginning \
--formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
> offsets.txt

In case the email formatting garbles the above, here it is in a gist
also <https://gist.github.com/LiamClarkeFMG/4a0f038a8d1782b1348e34b3ec471c21>.
You can then parse and analyse the output data using your preferred
scripting language.

Hope that helps,

Kind regards,

Liam Clarke-Hutchinson




On Thu, Jun 25, 2020 at 3:25 AM Karolis Pocius
 wrote:

> Check if any of your consumers have auto commit turned off and instead
> commit after processing each message.
>
> Also, even if all consumers are using auto commit, maybe some of them have
> the interval set to something crazy low like 1 ms.
>
> On Sat, Jun 20, 2020 at 8:31 PM Ashutosh singh  wrote:
>
> > Hi Guys,
> >
> > Hope you all are doing well.
> >
> > All of sudden I see very high throughput  for _consumer_offsets topic.
> it
> > is around  30-40K per second.  What could be the reason for such a high
> > rate ?  Do I need to be concerned around this ?
> >
> > [image: image.png]
> >
> >
> > This happens for an hour and after that it goes down.  Now it is
> happening
> > once or twice daily.
> >
> > I have 8 node cluster , 1000+ topics and 644 consumer groups.
> > All nodes have almost equal number of lead partition across nodes and
> > almost equal partition on all nodes.
> >
> > Kafka version : 2.1.1
> >
> > If you see above graph, there are no other topics where messages are more
> > than 300 Message/sec.  only _consumer_offsets  is having so high through
> > put.
> >
> > I know this is internal topic ans store metadata of topics and consumer
> > information.  But I don't see anything abnormal in my cluster then why
> > _consumer_offset topic is going crazy.  What is going on here ?
> >
> > Any help will be appreciated.
> >
> > --
> > Thanx & Regard
> > Ashutosh Singh
> > 08151945559
> >
> >
>


Re: Kafka still writable when only one process available

2020-06-23 Thread Liam Clarke-Hutchinson
Hi Can,

acks is a producer setting only, setting it to all on the broker will have
no effect. The default acks for a producer is 1, which means so long as the
partition leader acknowledges the write, it's successful. You have three
replicas, two downed brokers leaves 1 replica (which become the leader if
not already, assuming it was in-sync prior), so the producer will receive
that 1 ack and consider the write completed.

If you set acks to all on the producer, then it will throw an exception
when attempting a write when there's not enough insync replicas to
acknowledge the write.

Hope that makes sense?

Kind regards,

Liam Clarke-Hutchinson

On Wed, Jun 24, 2020 at 3:22 PM Can Zhang  wrote:

> Hello,
>
> I'm doing some tests about behaviors of Kafka under faulty circumstances.
>
> Here are my configs(one of total 3, comments removed):
>
> broker.id=0
> listeners=PLAINTEXT://:9092
> advertised.listeners=PLAINTEXT://10.0.8.233:9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/tmp/kafka-logs
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> default.replication.factor=3
> replication.factor=3
> acks=all
> offsets.topic.replication.factor=3
> transaction.state.log.replication.factor=3
> transaction.state.log.min.isr=2
> min.insync.replicas = 2
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> zookeeper.connect=localhost:2181
> zookeeper.connection.timeout.ms=18000
> group.initial.rebalance.delay.ms=3000
>
> I created a topic with 1 partition and 3 replicas, and produce/consume
> with those shell tools shipped with Kafka. And I find even if I kill 2
> processes of Kafka with `kill -9`, the topic is still writable. I
> believe this would cause potential data loss.
>
> I don't know if I misconfigured something, could someone review it for
> me? I'm testing with kafka_2.12-2.5.0
>
>
>
> Best,
> Can Zhang
>


Re: How to Change number of partitions without Rolling restart?

2020-06-21 Thread Liam Clarke-Hutchinson
Hi Sunil,

You'd want to do a bit of shell scripting for this.

for topic in $(./kafka-topics.sh --bootstrap-server your-kafka-here:9092
--list); do
  ./kafka-topics.sh --bootstrap-server your-kafka-here:9092 --topic $topic
--partitions X --alter
done

Etc. etc. And yep, to make sure further auto created topics have your
desired partitions, you will need to change that property on the brokers
and rolling restart.

Cheers,

Liam Clarke-Hutchinson

On Mon, Jun 22, 2020 at 4:16 PM sunil chaudhari 
wrote:

> Manoj,
> You mean I have execute this command manually for all 350 Topics which I
> already have?
> Is there any possibility I can use any wild cards?
>
>
> On Mon, 22 Jun 2020 at 9:28 AM,  wrote:
>
> > You can use below command to alter to partition
> >
> > ./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic my-topic
> > --partitions 6
> >
> >  Thanks
> > Manoj
> >
> >
> >
> > On 6/21/20, 7:38 PM, "sunil chaudhari" 
> > wrote:
> >
> > [External]
> >
> >
> > Hi,
> > I already have 350 topics created. Please guide me how can I do that
> > for
> > these many topics?
> > Also I want each new topic to be created with more number partitions
> > automatically than previous number 3, which I had set in properties.
> >
> > Regards,
> > Sunil.
> >
> > On Mon, 22 Jun 2020 at 6:31 AM, Liam Clarke-Hutchinson <
> > liam.cla...@adscale.co.nz> wrote:
> >
> > > Hi Sunil,
> > >
> > > The broker setting num.partitions only applies to automatically
> > created
> > > topics (if that is enabled) at the time of creation. To change
> > partitions
> > > for a topic you need to use kafka-topics.sh to do so for each
> topic.
> > >
> > > Kind regards,
> > >
> > > Liam Clarke-Hutchinson
> > >
> > > On Mon, Jun 22, 2020 at 3:16 AM sunil chaudhari <
> > > sunilmchaudhar...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > > I want to change number of partitions for all topics.
> > > > How can I change that? Is it server.properties which I need to
> > change?
> > > > Then, in that case I have to restart broker right?
> > > >
> > > > I checked from confluent control center, there is no option to
> > change
> > > > partitions.
> > > >
> > > > Please advise.
> > > >
> > > > Regards,
> > > > Sunil
> > > >
> > >
> >
> >
> > This e-mail and any files transmitted with it are for the sole use of the
> > intended recipient(s) and may contain confidential and privileged
> > information. If you are not the intended recipient(s), please reply to
> the
> > sender and destroy all copies of the original message. Any unauthorized
> > review, use, disclosure, dissemination, forwarding, printing or copying
> of
> > this email, and/or any action taken in reliance on the contents of this
> > e-mail is strictly prohibited and may be unlawful. Where permitted by
> > applicable law, this e-mail and other e-mail communications sent to and
> > from Cognizant e-mail addresses may be monitored.
> > This e-mail and any files transmitted with it are for the sole use of the
> > intended recipient(s) and may contain confidential and privileged
> > information. If you are not the intended recipient(s), please reply to
> the
> > sender and destroy all copies of the original message. Any unauthorized
> > review, use, disclosure, dissemination, forwarding, printing or copying
> of
> > this email, and/or any action taken in reliance on the contents of this
> > e-mail is strictly prohibited and may be unlawful. Where permitted by
> > applicable law, this e-mail and other e-mail communications sent to and
> > from Cognizant e-mail addresses may be monitored.
> >
>


Re: How to Change number of partitions without Rolling restart?

2020-06-21 Thread Liam Clarke-Hutchinson
Hi Sunil,

The broker setting num.partitions only applies to automatically created
topics (if that is enabled) at the time of creation. To change partitions
for a topic you need to use kafka-topics.sh to do so for each topic.

Kind regards,

Liam Clarke-Hutchinson

On Mon, Jun 22, 2020 at 3:16 AM sunil chaudhari 
wrote:

> Hi,
> I want to change number of partitions for all topics.
> How can I change that? Is it server.properties which I need to change?
> Then, in that case I have to restart broker right?
>
> I checked from confluent control center, there is no option to change
> partitions.
>
> Please advise.
>
> Regards,
> Sunil
>


Re: Highwater mark interpretation

2020-06-21 Thread Liam Clarke-Hutchinson
Hi Nag,

In my experience running Kafka in production for 6 years, so long as the
number of replicas (and the leader is one of those replicas) in the insync
replica set (ISR) is greater than the min.insync.replica setting, the
partition leader will accept writes, and customers can read those writes,
even if the topic is underreplicated.

Being able to do so is sort of one of the key features of Kafka.

An out of sync partition replica will catch up when it comes back online,
but until it's a member of the ISR it won't be eligible to become the
partition leader, or for rack aware consumption (a rack aware consumer in
Kafka 2.4.5+ (IIRC) can consume from a replica in the same rack instead of
having to consume from the leader, which can improve latency and in cloud
environments reduce data ingress/egress costs).

But, I've never encountered a situation where an underreplicated partition
prevents consuming.

Cheers,

Liam Clarke-Hutchinson

On Sun, 21 Jun. 2020, 2:08 pm Nag Y,  wrote:

> Thanks D C. Thanks a lot . That is quite a detailed explanation.
> If I understand correctly, ( ignoring the case where producers
> create transactions) - since the replica is down and never comes , the high
> watermark CANNOT advance and the consumer CAN NOT read the messages which
> were sent after the replica is down as the message is NOT committed - Hope
> this is correct ?
>
> To address this situation, either we should make sure the replica is up or
> reduce the replication factor so that the message will be committed and
> consumer can start reading the messages ...
>
> Regards,
>  Nag
>
>
> On Sun, Jun 21, 2020 at 3:25 AM D C  wrote:
>
> > The short answer is : yes, a consumer can only consume messages up to the
> > High Watermark.
> >
> > The long answer is not exactly, for the following reasons:
> >
> > At the partition level you have 3 major offsets that are important to the
> > health of the partition and accessibility from the consumer pov:
> > LeO (log end offset) - which represents the highest offset in the highest
> > segment
> > High Watermark - which represents the latest offset that has been
> > replicated to all the followers
> > LSO (Last stable offset) - which is important when you use producers that
> > create transactions - which represents the the highest offset that has
> been
> > committed by a transaction and that is allowed to be read with isolation
> > level = read_commited.
> >
> > The LeO can only be higher or equal to the High Watermark (for obvious
> > reasons)
> > The High Watermark can only be higher or equal to the LSO (the messages
> up
> > to this point may have been committed to all the followers but the
> > transaction isn't yet finished)
> > And coming to your question, in case the transaction hasn't finished, the
> > LSO may be lower than the High Watermark so if your consumer is accessing
> > the data in Read_Committed, it won't be able to surpass the LSO.
> >
> > Cheers,
> > D
> >
> > On Sat, Jun 20, 2020 at 9:05 PM Nag Y 
> wrote:
> >
> > > As I understand it, the consumer can only read "committed" messages -
> > which
> > > I believe, if we look at internals of it, committed messages are
> nothing
> > > but messages which are upto the high watermark.
> > > *The high watermark is the offset of the last message that was
> > successfully
> > > copied to all of the log’s replicas. *
> > >
> > > *Having said that, if one of the replica is down, will high water mark
> > be*
> > > *advanced?*
> > >
> > > *If replica can't come forever, can we consider this message cant be
> > > consumed by the consumer since it is never committed *
> > >
> >
>


Stuck replicas - can't remove them

2020-06-14 Thread Liam Clarke-Hutchinson
We hit some memory issues with our brokers caused by large numbers of high
cardinality unique client ids connecting and disconnecting and leaving
behind MBeans (v0.11.0.2 and yep, upgrading is definitely high on the
agenda now) and after a rolling restart, we've got a weird situation.

For topic X, partition 10 shows as having 3 replicas on brokers [A, B, C],
with [B, C] as the ISR - but the thing is, this topic now has a replication
factor of 2, and it's only this one partition that has 3.

I've tried running reassign-partitions for that partition specifying that
it's replicas are only B and C, but the reassignment request fails.

It's causing issues in that our alerting on under-replicated partitions
keeps picking up partition 10 as being underreplicated.

Any ideas on how to convince the cluster to discard the replica on broker A?

Kind regards,

Liam Clarke-Hutchinson


Re: Kafka - how to know whether it is broker property or topic property or producer property

2020-06-14 Thread Liam Clarke-Hutchinson
Hi Nag,


https://kafka.apache.org/documentation/#topicconfigs

> Configurations pertinent to topics have both a server default as well an
optional per-topic override. If no per-topic configuration is given the
server default is used. The override can be set at topic creation time by
giving one or more --config options.


Hope this helps :)

Cheers,

Liam Clarke-Hutchinson


On Sun, Jun 14, 2020 at 6:48 PM Nag Y  wrote:

> Is it documented somewhere the property is shared or we need to look it in
> multiple places ?
>
> Thanks
>
> On Sun, Jun 14, 2020 at 12:15 PM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hi Nag,
> >
> > If topic and brokers share the same property, then it is safe to assume
> > that the broker property is the default, which can be overridden by
> > explicit topic configs.
> >
> > Kind regards,
> >
> > Liam Clarke
> >
> > On Sun, Jun 14, 2020 at 6:42 PM Nag Y 
> wrote:
> >
> > > I am going through the documentation and often times, it is either not
> > > clear or need to look at in multiple pleaces to see to which a
> prticular
> > > property belongs and is it a specific property to an entity etc ..
> > >
> > > To give an example, consider *"min.insync.replicas"* - This is just for
> > an
> > > example. From the apache documentation, it is mentioned under
> > > https://kafka.apache.org/documentation/#brokerconfigs . From the
> > confluent
> > > documentation it is mentioned under
> > >
> > >
> >
> https://docs.confluent.io/current/installation/configuration/topic-configs.html
> > > .
> > > Later, I came to know that this property is available under both and
> > > follows inheritance based on where it is configured. This needed to
> look
> > > into multiple places to understand more about this property to see
> where
> > it
> > > belongs etc ..
> > >
> > > But, is not there a documentation about where each property belongs,
> and
> > > will it be inherited or not etc.
> > >
> > > I do not think this answer need not be complex like looking into source
> > > code, it should be simple enough - perhaps I might be missing
> something.
> > >
> > >
> > > Also posted here
> > >
> > >
> >
> https://stackoverflow.com/questions/62369238/kafka-how-to-know-whether-it-is-broker-property-or-topic-property-or-producer
> > >
> >
>


Re: Kafka - how to know whether it is broker property or topic property or producer property

2020-06-14 Thread Liam Clarke-Hutchinson
Hi Nag,

If topic and brokers share the same property, then it is safe to assume
that the broker property is the default, which can be overridden by
explicit topic configs.

Kind regards,

Liam Clarke

On Sun, Jun 14, 2020 at 6:42 PM Nag Y  wrote:

> I am going through the documentation and often times, it is either not
> clear or need to look at in multiple pleaces to see to which a prticular
> property belongs and is it a specific property to an entity etc ..
>
> To give an example, consider *"min.insync.replicas"* - This is just for an
> example. From the apache documentation, it is mentioned under
> https://kafka.apache.org/documentation/#brokerconfigs . From the confluent
> documentation it is mentioned under
>
> https://docs.confluent.io/current/installation/configuration/topic-configs.html
> .
> Later, I came to know that this property is available under both and
> follows inheritance based on where it is configured. This needed to look
> into multiple places to understand more about this property to see where it
> belongs etc ..
>
> But, is not there a documentation about where each property belongs, and
> will it be inherited or not etc.
>
> I do not think this answer need not be complex like looking into source
> code, it should be simple enough - perhaps I might be missing something.
>
>
> Also posted here
>
> https://stackoverflow.com/questions/62369238/kafka-how-to-know-whether-it-is-broker-property-or-topic-property-or-producer
>


  1   2   3   >