Hi,
I have three servers:
blade1 (192.168.112.31),
blade2 (192.168.112.32) and
blade3 (192.168.112.33).
On each of servers kafka_2.11-1.0.0 is installed.
On blade3 (192.168.112.33:2181) zookeeper is installed as well.
I have created a topic repl3part5 with the following line:
bin/kafka-topics.sh --zookeeper 192.168.112.33:2181 --create
--replication-factor 3 --partitions 5 --topic repl3part5
When I describe the topic, it looks like this:
[root@blade1 kafka]# bin/kafka-topics.sh --describe --topic repl3part5
--zookeeper 192.168.112.33:2181
Topic:repl3part5 PartitionCount:5 ReplicationFactor:3 Configs:
Topic: repl3part5 Partition: 0 Leader: 2 Replicas:
2,3,1 Isr: 2,3,1
Topic: repl3part5 Partition: 1 Leader: 3 Replicas:
3,1,2 Isr: 3,1,2
Topic: repl3part5 Partition: 2 Leader: 1 Replicas:
1,2,3 Isr: 1,2,3
Topic: repl3part5 Partition: 3 Leader: 2 Replicas:
2,1,3 Isr: 2,1,3
Topic: repl3part5 Partition: 4 Leader: 3 Replicas:
3,2,1 Isr: 3,2,1
I have a producer for this topic:
bin/kafka-console-producer.sh --broker-list
192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic
repl3part5
and single consumer:
bin/kafka-console-consumer.sh --bootstrap-server
192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic
repl3part5 --consumer-property group.id=zoran_1
Every message that is sent by producer gets collected by consumer. So
far - so good.
Now I would like to test fail over of the kafka servers. If I put down
blade 3 kafka service, I get consumer warnings but all produced messages
are still consumed.
[2018-01-30 14:30:01,203] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 3 could not be established. Broker
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:30:01,299] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 3 could not be established. Broker
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:30:01,475] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 3 could not be established. Broker
may not be available. (org.apache.kafka.clients.NetworkClient)
Now I have started up kafka service on blade 3 and I have put down kafka
service on blade 2 server.
Consumer now showed one warning but all produced messages are still
consumed.
[2018-01-30 14:31:38,164] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 2 could not be established. Broker
may not be available. (org.apache.kafka.clients.NetworkClient)
Now I have started up kafka service on blade 2 and I have put down kafka
service on blade 1 server.
Consumer now shows warnings about node 1/2147483646, but also
Asynchronous auto-commit of offsets ... failed: Offset commit failed
with a retriable exception. You should retry committing offsets. The
underlying error was: null.
[2018-01-30 14:33:16,393] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 1 could not be established. Broker
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,469] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 2147483646 could not be established.
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,557] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 1 could not be established. Broker
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,986] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 2147483646 could not be established.
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:16,991] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 1 could not be established. Broker
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:17,493] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 2147483646 could not be established.
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:17,495] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 1 could not be established. Broker
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:18,002] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 2147483646 could not be established.
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:18,003] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Asynchronous auto-commit of offsets
{repl3part5-4=OffsetAndMetadata{offset=18, metadata=''},
repl3part5-3=OffsetAndMetadata{offset=20, metadata=''},
repl3part5-2=OffsetAndMetadata{offset=19, metadata=''},
repl3part5-1=OffsetAndMetadata{offset=20, metadata=''},
repl3part5-0=OffsetAndMetadata{offset=20, metadata=''}} failed: Offset
commit failed with a retriable exception. You should retry committing
offsets. The underlying error was: null
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-01-30 14:33:18,611] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 1 could not be established. Broker
may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:18,932] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 2147483646 could not be established.
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:18,933] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Asynchronous auto-commit of offsets
{repl3part5-4=OffsetAndMetadata{offset=18, metadata=''},
repl3part5-3=OffsetAndMetadata{offset=20, metadata=''},
repl3part5-2=OffsetAndMetadata{offset=19, metadata=''},
repl3part5-1=OffsetAndMetadata{offset=20, metadata=''},
repl3part5-0=OffsetAndMetadata{offset=20, metadata=''}} failed: Offset
commit failed with a retriable exception. You should retry committing
offsets. The underlying error was: null
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-01-30 14:33:19,977] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 2147483646 could not be established.
Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-01-30 14:33:19,978] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Asynchronous auto-commit of offsets
{repl3part5-4=OffsetAndMetadata{offset=18, metadata=''},
repl3part5-3=OffsetAndMetadata{offset=20, metadata=''},
repl3part5-2=OffsetAndMetadata{offset=19, metadata=''},
repl3part5-1=OffsetAndMetadata{offset=20, metadata=''},
repl3part5-0=OffsetAndMetadata{offset=20, metadata=''}} failed: Offset
commit failed with a retriable exception. You should retry committing
offsets. The underlying error was: null
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-01-30 14:33:19,979] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Connection to node 1 could not be established. Broker
may not be available. (org.apache.kafka.clients.NetworkClient)
I tried to solve the problem by adding a
offsets.topic.replication.factor=2 (or 3) on all three server.properties
file (one of them is attached), but with no success.
My idea was that topic __consumer_offset wasn't replicated throughout
the cluster, but looks like it is not the case here.
While blade 1 kafka service was down topic describe showed the following:
[root@blade1 kafka]# bin/kafka-topics.sh --describe --topic repl3part5
--zookeeper 192.168.112.33:2181
Topic:repl3part5 PartitionCount:5 ReplicationFactor:3 Configs:
Topic: repl3part5 Partition: 0 Leader: 3 Replicas:
2,3,1 Isr: 3
Topic: repl3part5 Partition: 1 Leader: 3 Replicas:
3,1,2 Isr: 3
Topic: repl3part5 Partition: 2 Leader: 3 Replicas:
1,2,3 Isr: 3
Topic: repl3part5 Partition: 3 Leader: 3 Replicas:
2,1,3 Isr: 3
Topic: repl3part5 Partition: 4 Leader: 3 Replicas:
3,2,1 Isr: 3
Producer now shows the following warning, it still puts messages on the
topic but messages are just raising lag count on partitions:
[2018-01-30 14:37:21,816] WARN [Producer clientId=console-producer]
Connection to node 1 could not be established. Broker may not be
available. (org.apache.kafka.clients.NetworkClient)
I noticed that while kafka service on blade1 is alive, I can put down/up
blade 2 and 3 in any combination and consumer will always be able to
consume messages.
If kafka service on blade 1 is down, than even if kafka services on
blade 2 and blade 3 are up and running, consumer cannot consume messages.
After bringing kafka service up on blade 1, all messages that producer
has sent while kafka service on blade 1 was down are replayed and than
the following is showed in consumer terminal:
[2018-01-30 14:44:30,817] ERROR [Consumer clientId=consumer-1,
groupId=zoran_1] Offset commit failed on partition repl3part5-4 at
offset 20: This is not the correct coordinator.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-01-30 14:44:30,817] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Asynchronous auto-commit of offsets
{repl3part5-4=OffsetAndMetadata{offset=20, metadata=''},
repl3part5-3=OffsetAndMetadata{offset=22, metadata=''},
repl3part5-2=OffsetAndMetadata{offset=20, metadata=''},
repl3part5-1=OffsetAndMetadata{offset=22, metadata=''},
repl3part5-0=OffsetAndMetadata{offset=22, metadata=''}} failed: Offset
commit failed with a retriable exception. You should retry committing
offsets. The underlying error was: This is not the correct coordinator.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-01-30 14:44:31,202] ERROR [Consumer clientId=consumer-1,
groupId=zoran_1] Offset commit failed on partition repl3part5-4 at
offset 22: This is not the correct coordinator.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-01-30 14:44:31,202] WARN [Consumer clientId=consumer-1,
groupId=zoran_1] Asynchronous auto-commit of offsets
{repl3part5-4=OffsetAndMetadata{offset=22, metadata=''},
repl3part5-3=OffsetAndMetadata{offset=24, metadata=''},
repl3part5-2=OffsetAndMetadata{offset=22, metadata=''},
repl3part5-1=OffsetAndMetadata{offset=24, metadata=''},
repl3part5-0=OffsetAndMetadata{offset=24, metadata=''}} failed: Offset
commit failed with a retriable exception. You should retry committing
offsets. The underlying error was: This is not the correct coordinator.
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
From now on everything works with no problems or warnings and the
system is fully functional.
Can someone explain to me why kafka server on blade 1 is so important,
and what are my options in order to be able to stop any of the two
servers (including kafka server on blade 1) and be able to consume
messages with no delay?
This thing drives me crazy. :)
Can you please help?
Regards.
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings
#############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
#dodao zoran
#listeners=PLAINTEXT://192.168.107.54:9092
# Hostname and port the broker will advertise to producers and consumers. If
not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the
value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# dodao zoran
advertised.listeners=PLAINTEXT://192.168.107.54:9092
# Maps listener names to security protocols, the default is for them to be the
same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the
network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may
include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection
against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs
located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings
#############################
# The replication factor for the group metadata internal topics
"__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is
recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only
fsync() to sync
# the OS cache lazily. The following configurations control the flush of data
to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the
flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a
small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data
after a period of time or
# every N messages (or both). This can be done globally and overridden on a
per-topic basis.
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy
can
# be set to delete segments after a period of time, or after a given size has
accumulated.
# A segment will be deleted whenever *either* of these criteria are met.
Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log
unless the remaining
# segments drop below log.retention.bytes. Functions independently of
log.retention.hours.
#log.retention.bytes=1073741824
# dodao zoran (bilo zakomentirano)
log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log
segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted
according
# to the retention policies
#log.retention.check.interval.ms=300000
# zakomentirao iznad i dodao zoran
log.retention.check.interval.ms=3000
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
############################# Group Coordinator Settings
#############################
# The following configuration specifies the time, in milliseconds, that the
GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of
group.initial.rebalance.delay.ms as new members join the group, up to a maximum
of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience
for development and testing.
# However, in production environments the default value of 3 seconds is more
suitable as this will help to avoid unnecessary, and potentially expensive,
rebalances during application startup.
group.initial.rebalance.delay.ms=0