Sorry, I have attached wrong server.properties file. Now the right one is in the attachment.

Regards.


On 01/30/2018 02:59 PM, Zoran wrote:
Hi,

I have three servers:

blade1 (192.168.112.31),
blade2 (192.168.112.32) and
blade3 (192.168.112.33).

On each of servers kafka_2.11-1.0.0 is installed.
On blade3 (192.168.112.33:2181) zookeeper is installed as well.

I have created a topic repl3part5 with the following line:

bin/kafka-topics.sh --zookeeper 192.168.112.33:2181 --create --replication-factor 3 --partitions 5 --topic repl3part5

When I describe the topic, it looks like this:

[root@blade1 kafka]# bin/kafka-topics.sh --describe --topic repl3part5 --zookeeper 192.168.112.33:2181

Topic:repl3part5    PartitionCount:5    ReplicationFactor:3 Configs:
    Topic: repl3part5    Partition: 0    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1     Topic: repl3part5    Partition: 1    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2     Topic: repl3part5    Partition: 2    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3     Topic: repl3part5    Partition: 3    Leader: 2    Replicas: 2,1,3    Isr: 2,1,3     Topic: repl3part5    Partition: 4    Leader: 3    Replicas: 3,2,1    Isr: 3,2,1

I have a producer for this topic:

bin/kafka-console-producer.sh --broker-list 192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic repl3part5

and single consumer:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.112.31:9092,192.168.112.32:9092,192.168.112.33:9092 --topic repl3part5  --consumer-property group.id=zoran_1

Every message that is sent by producer gets collected by consumer. So far - so good.

Now I would like to test fail over of the kafka servers. If I put down blade 3 kafka service, I get consumer warnings but all produced messages are still consumed.

[2018-01-30 14:30:01,203] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:30:01,299] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:30:01,475] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Now I have started up kafka service on blade 3 and I have put down kafka service on blade 2 server. Consumer now showed one warning but all produced messages are still consumed.

[2018-01-30 14:31:38,164] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Now I have started up kafka service on blade 2 and I have put down kafka service on blade 1 server.

Consumer now shows warnings about node 1/2147483646, but also Asynchronous auto-commit of offsets ... failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: null.

[2018-01-30 14:33:16,393] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,469] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,557] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,986] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:16,991] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:17,493] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:17,495] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:18,002] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:18,003] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Asynchronous auto-commit of offsets {repl3part5-4=OffsetAndMetadata{offset=18, metadata=''}, repl3part5-3=OffsetAndMetadata{offset=20, metadata=''}, repl3part5-2=OffsetAndMetadata{offset=19, metadata=''}, repl3part5-1=OffsetAndMetadata{offset=20, metadata=''}, repl3part5-0=OffsetAndMetadata{offset=20, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: null (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-30 14:33:18,611] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:18,932] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:18,933] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Asynchronous auto-commit of offsets {repl3part5-4=OffsetAndMetadata{offset=18, metadata=''}, repl3part5-3=OffsetAndMetadata{offset=20, metadata=''}, repl3part5-2=OffsetAndMetadata{offset=19, metadata=''}, repl3part5-1=OffsetAndMetadata{offset=20, metadata=''}, repl3part5-0=OffsetAndMetadata{offset=20, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: null (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-30 14:33:19,977] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 2147483646 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient) [2018-01-30 14:33:19,978] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Asynchronous auto-commit of offsets {repl3part5-4=OffsetAndMetadata{offset=18, metadata=''}, repl3part5-3=OffsetAndMetadata{offset=20, metadata=''}, repl3part5-2=OffsetAndMetadata{offset=19, metadata=''}, repl3part5-1=OffsetAndMetadata{offset=20, metadata=''}, repl3part5-0=OffsetAndMetadata{offset=20, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: null (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-30 14:33:19,979] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

I tried to solve the problem by adding a offsets.topic.replication.factor=2 (or 3) on all three server.properties file (one of them is attached), but with no success. My idea was that topic __consumer_offset wasn't replicated throughout the cluster, but looks like it is not the case here.

While blade 1 kafka service was down topic describe showed the following:

[root@blade1 kafka]# bin/kafka-topics.sh --describe --topic repl3part5 --zookeeper 192.168.112.33:2181

Topic:repl3part5    PartitionCount:5    ReplicationFactor:3 Configs:
    Topic: repl3part5    Partition: 0    Leader: 3    Replicas: 2,3,1    Isr: 3     Topic: repl3part5    Partition: 1    Leader: 3    Replicas: 3,1,2    Isr: 3     Topic: repl3part5    Partition: 2    Leader: 3    Replicas: 1,2,3    Isr: 3     Topic: repl3part5    Partition: 3    Leader: 3    Replicas: 2,1,3    Isr: 3     Topic: repl3part5    Partition: 4    Leader: 3    Replicas: 3,2,1    Isr: 3

Producer now shows the following warning, it still puts messages on the topic but messages are just raising lag count on partitions:

[2018-01-30 14:37:21,816] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

I noticed that while kafka service on blade1 is alive, I can put down/up blade 2 and 3 in any combination and consumer will always be able to consume messages. If kafka service on blade 1 is down, than even if kafka services on blade 2 and blade 3 are up and running, consumer cannot consume messages.

After bringing kafka service up on blade 1, all messages that producer has sent while kafka service on blade 1 was down are replayed and than the following is showed in consumer terminal:

[2018-01-30 14:44:30,817] ERROR [Consumer clientId=consumer-1, groupId=zoran_1] Offset commit failed on partition repl3part5-4 at offset 20: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-30 14:44:30,817] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Asynchronous auto-commit of offsets {repl3part5-4=OffsetAndMetadata{offset=20, metadata=''}, repl3part5-3=OffsetAndMetadata{offset=22, metadata=''}, repl3part5-2=OffsetAndMetadata{offset=20, metadata=''}, repl3part5-1=OffsetAndMetadata{offset=22, metadata=''}, repl3part5-0=OffsetAndMetadata{offset=22, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-30 14:44:31,202] ERROR [Consumer clientId=consumer-1, groupId=zoran_1] Offset commit failed on partition repl3part5-4 at offset 22: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) [2018-01-30 14:44:31,202] WARN [Consumer clientId=consumer-1, groupId=zoran_1] Asynchronous auto-commit of offsets {repl3part5-4=OffsetAndMetadata{offset=22, metadata=''}, repl3part5-3=OffsetAndMetadata{offset=24, metadata=''}, repl3part5-2=OffsetAndMetadata{offset=22, metadata=''}, repl3part5-1=OffsetAndMetadata{offset=24, metadata=''}, repl3part5-0=OffsetAndMetadata{offset=24, metadata=''}} failed: Offset commit failed with a retriable exception. You should retry committing offsets. The underlying error was: This is not the correct coordinator. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

From now on everything works with no problems or warnings and the system is fully functional.

Can someone explain to me why kafka server on blade 1 is so important, and what are my options in order to be able to stop any of the two servers (including kafka server on blade 1) and be able to consume messages with no delay?
This thing drives me crazy. :)

Can you please help?

Regards.

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings 
#############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If 
not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the 
value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the 
same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the 
network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may 
include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection 
against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at 
startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs 
located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  
#############################
# The replication factor for the group metadata internal topics 
"__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is 
recommended for to ensure availability such as 3.
#offsets.topic.replication.factor=1

# zoran
offsets.topic.replication.factor = 3

transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only 
fsync() to sync
# the OS cache lazily. The following configurations control the flush of data 
to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the 
flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a 
small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data 
after a period of time or
# every N messages (or both). This can be done globally and overridden on a 
per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy 
can
# be set to delete segments after a period of time, or after a given size has 
accumulated.
# A segment will be deleted whenever *either* of these criteria are met. 
Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log 
unless the remaining
# segments drop below log.retention.bytes. Functions independently of 
log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log 
segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted 
according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
#zookeeper.connect=localhost:2181

zookeeper.connect=192.168.112.33:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings 
#############################

# The following configuration specifies the time, in milliseconds, that the 
GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of 
group.initial.rebalance.delay.ms as new members join the group, up to a maximum 
of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience 
for development and testing.
# However, in production environments the default value of 3 seconds is more 
suitable as this will help to avoid unnecessary, and potentially expensive, 
rebalances during application startup.
group.initial.rebalance.delay.ms=0


# dodao Zoran
delete.topic.enable=true
auto.create.topics.enable=false

offsets.topic.replication.factor=2

Reply via email to