Hi Shimi,

I’ve noticed with our benchmarks that on AWS environments with high network 
latency the network socket buffers often need adjusting. Any chance you could 
add the following to your streams configuration to change the default socket 
size bytes to a higher value (at least 1MB) and let us know?

private static final int SOCKET_SIZE_BYTES = 1 * 1024 * 1024; // at least 1MB
streamsProps.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, SOCKET_SIZE_BYTES);
streamsProps.put(ProducerConfig.SEND_BUFFER_CONFIG, SOCKET_SIZE_BYTES);

Thanks
Eno 

> On May 4, 2017, at 3:45 PM, Shimi Kiviti <shim...@gmail.com> wrote:
> 
> Thanks Eno,
> 
> We still see problems on our side.
> when we run kafka-streams 0.10.1.1 eventually the problem goes away but
> with 0.10.2.1 it is not.
> We see a lot of the rebalancing messages I wrote before
> 
> on at least 1 kafka-stream nodes we see disconnection messages like the
> following. These messages repeat all the time
> 
> 2017-05-04 14:25:56,063 [StreamThread-1] INFO
> o.a.k.c.c.i.AbstractCoordinator: Discovered coordinator
> ip-10-0-91-10.ec2.internal:9092 (id: 2147483646 rack: null) for group sa.
> 2017-05-04 14:25:56,063 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> Initiating connection to node 2147483646 at ip-10-0-91-10.ec2.internal:9092.
> 2017-05-04 14:25:56,091 [StreamThread-1] INFO
> o.a.k.c.c.i.AbstractCoordinator: (Re-)joining group sa
> 2017-05-04 14:25:56,093 [StreamThread-1] DEBUG
> o.a.k.s.p.i.StreamPartitionAssignor: stream-thread [StreamThread-1] found
> [sa-events] topics possibly matching regex
> 2017-05-04 14:25:56,096 [StreamThread-1] DEBUG o.a.k.s.p.TopologyBuilder:
> stream-thread [StreamThread-1] updating builder with
> SubscriptionUpdates{updatedTopicSubscriptions=[sa-events]} topic(s) with po
> ssible matching regex subscription(s)
> 2017-05-04 14:25:56,096 [StreamThread-1] DEBUG
> o.a.k.c.c.i.AbstractCoordinator: Sending JoinGroup ((type:
> JoinGroupRequest, groupId=sa, sessionTimeout=10000,
> rebalanceTimeout=2147483647, memb
> erId=, protocolType=consumer,
> groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@2f894d9b))
> to coordinator ip-10-0-91-10.ec2.internal:9092 (id: 2147483646 rack: null)
> 2017-05-04 14:25:56,097 [StreamThread-1] DEBUG o.a.k.c.n.Selector: Created
> socket with SO_RCVBUF = 1048576, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node
> 2147483646
> 2017-05-04 14:25:56,097 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> Completed connection to node 2147483646.  Fetching API versions.
> 2017-05-04 14:25:56,097 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> Initiating API versions fetch from node 2147483646.
> 2017-05-04 14:25:56,104 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> Recorded API versions for node 2147483646: (Produce(0): 0 to 2 [usable: 2],
> Fetch(1): 0 to 3 [usable: 3], Offsets(2): 0 to 1 [usable: 1],
> Metadata(3): 0 to 2 [usable: 2], LeaderAndIsr(4): 0 [usable: 0],
> StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 2 [usable: 2],
> ControlledShutdown(7): 1 [usable: 1], OffsetCommit(8): 0 to 2 [usable:
> 2], OffsetFetch(9): 0 to 1 [usable: 1], GroupCoordinator(10): 0 [usable:
> 0], JoinGroup(11): 0 to 1 [usable: 1], Heartbeat(12): 0 [usable: 0],
> LeaveGroup(13): 0 [usable: 0], SyncGroup(14): 0 [usable: 0], Desc
> ribeGroups(15): 0 [usable: 0], ListGroups(16): 0 [usable: 0],
> SaslHandshake(17): 0 [usable: 0], ApiVersions(18): 0 [usable: 0],
> CreateTopics(19): 0 [usable: 0], DeleteTopics(20): 0 [usable: 0])
> 2017-05-04 14:29:44,800 [kafka-producer-network-thread |
> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG
> o.a.k.c.NetworkClient: Node -2 disconnected.
> 2017-05-04 14:29:44,801 [kafka-producer-network-thread |
> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG
> o.a.k.c.NetworkClient: Sending metadata request (type=MetadataR
> equest, topics=) to node 1
> 2017-05-04 14:29:44,801 [kafka-producer-network-thread |
> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG
> o.a.k.c.NetworkClient: Node -1 disconnected.
> 2017-05-04 14:29:44,802 [kafka-producer-network-thread |
> sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-producer] DEBUG
> o.a.k.c.Metadata: Updated cluster metadata version 4 to Cluster
> (id = JsVqjH3tS4CIcqpd2jkogA, nodes = [ip-10-0-91-10.ec2.internal:9092 (id:
> 1 rack: null), ip-10-0-95-250.ec2.internal:9092 (id: 2 rack: null)],
> partitions = [])
> 2017-05-04 14:30:56,062 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> Sending metadata request (type=MetadataRequest, topics=<ALL>) to node 2
> 2017-05-04 14:30:56,073 [StreamThread-1] DEBUG o.a.k.c.Metadata: Updated
> cluster metadata version 7 to Cluster(id = JsVqjH3tS4CIcqpd2jkogA, nodes =
> [ip-10-0-95-250.ec2.internal:9092 (id: 2 rack: null), ip-10
> -0-91-10.ec2.internal:9092 (id: 1 rack: null)], partitions =
> [Partition(topic = sa-events, partition = 0, leader = 1, replicas = [1,2],
> isr = [2,1]), Partition(topic = sa-events, partition = 1, lea
> der = 2, replicas = [1,2], isr = [2,1]), Partition(topic = sa-events,
> partition = 2, leader = 1, replicas = [1,2], isr = [2,1])])
> 2017-05-04 14:31:06,085 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> Disconnecting from node 2147483646 due to request timeout.
> 2017-05-04 14:31:06,086 [StreamThread-1] DEBUG
> o.a.k.c.c.i.ConsumerNetworkClient: Cancelled JOIN_GROUP request
> {api_key=11,api_version=1,correlation_id=16,client_id=sa-5788b5a5-aadc-4276-916f
> -1640008c17da-StreamThread-1-consumer} with correlation id 16 due to node
> 2147483646 being disconnected
> 2017-05-04 14:31:06,086 [StreamThread-1] INFO
> o.a.k.c.c.i.AbstractCoordinator: Marking the coordinator
> ip-10-0-91-10.ec2.internal:9092 (id: 2147483646 rack: null) dead for group
> sa
> 2017-05-04 14:31:06,195 [StreamThread-1] DEBUG
> o.a.k.c.c.i.AbstractCoordinator: Sending GroupCoordinator request for group
> sa to broker ip-10-0-91-10.ec2.internal:9092 (id: 1 rack: null)
> 2017-05-04 14:31:06,196 [StreamThread-1] DEBUG o.a.k.c.NetworkClient:
> Sending metadata request (type=MetadataRequest, topics=<ALL>) to node 2
> 2017-05-04 14:31:06,200 [StreamThread-1] DEBUG
> o.a.k.c.c.i.AbstractCoordinator: Received GroupCoordinator response
> ClientResponse(receivedTimeMs=1493908266200, latencyMs=5,
> disconnected=false, requestHeader=
> {api_key=10,api_version=0,correlation_id=19,client_id=sa-5788b5a5-aadc-4276-916f-1640008c17da-StreamThread-1-consumer},
> responseBody={error_code=0,coordinator={node_id=1,host=ip-10-0-91-10.ec
> 2.internal,port=9092}}) for group sa
> 
> 
> On Mon, May 1, 2017 at 4:19 PM, Eno Thereska <eno.there...@gmail.com> wrote:
> 
>> Hi Shimi,
>> 
>> 0.10.2.1 contains a number of fixes that should make the out of box
>> experience better, including resiliency under broker failures and better
>> exception handling. If you ever get back to it, and if the problem happens
>> again, please do send us the logs and we'll happily have a look.
>> 
>> Thanks
>> Eno
>>> On 1 May 2017, at 12:05, Shimi Kiviti <shim...@gmail.com> wrote:
>>> 
>>> Hi Eno,
>>> I am afraid I played too much with the configuration to make this
>>> productive investigation :(
>>> 
>>> This is a QA environment which includes 2 kafka instances and 3 zookeeper
>>> instances in AWS. There are only 3 partition for this topic.
>>> Kafka broker and kafka-stream are version 0.10.1.1
>>> Our kafka-stream app run on docker using kubernetes.
>>> I played around with with 1 to 3  kafka-stream processes, but I got the
>>> same results. It is too easy to scale with kubernetes :)
>>> Since there are only 3 partitions, I didn't start more then 3 instances.
>>> 
>>> I was too quick to upgraded only the kafka-stream app to 0.10.2.1 with
>> hope
>>> that it will solve the problem, It didn't.
>>> The log I sent before are from this version.
>>> 
>>> I did notice "unknown" offset for the main topic with kafka-stream
>> version
>>> 0.10.2.1
>>> $ ./bin/kafka-consumer-groups.sh   --bootstrap-server localhost:9092
>>> --describe --group sa
>>> GROUP                          TOPIC                          PARTITION
>>> CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
>>> sa             sa-events                 0          842199
>>> 842199          0
>>> sa-4557bf2d-ba79-42a6-aa05-5b4c9013c022-StreamThread-1-consumer_/
>> 10.0.10.9
>>> sa             sa-events                 1          1078428
>>> 1078428         0
>>> sa-4557bf2d-ba79-42a6-aa05-5b4c9013c022-StreamThread-1-consumer_/
>> 10.0.10.9
>>> sa             sa-events                 2          unknown
>>> 26093910        unknown
>>> sa-4557bf2d-ba79-42a6-aa05-5b4c9013c022-StreamThread-1-consumer_/
>> 10.0.10.9
>>> 
>>> After that I downgraded the kafka-stream app back to version 0.10.1.1
>>> After a LONG startup time (more than an hour) where the status of the
>> group
>>> was rebalancing, all the 3 processes started processing messages again.
>>> 
>>> This all thing started after we hit a bug in our code (NPE) that crashed
>>> the stream processing thread.
>>> So now after 4 days, everything is back to normal.
>>> This worries me since it can happen again
>>> 
>>> 
>>> On Mon, May 1, 2017 at 11:45 AM, Eno Thereska <eno.there...@gmail.com>
>>> wrote:
>>> 
>>>> Hi Shimi,
>>>> 
>>>> Could you provide more info on your setup? How many kafka streams
>>>> processes do you have and from how many partitions are they consuming
>> from.
>>>> If you have more processes than partitions some of the processes will be
>>>> idle and won’t do anything.
>>>> 
>>>> Eno
>>>>> On Apr 30, 2017, at 5:58 PM, Shimi Kiviti <shim...@gmail.com> wrote:
>>>>> 
>>>>> Hi Everyone,
>>>>> 
>>>>> I have a problem and I hope one of you can help me figuring it out.
>>>>> One of our kafka-streams processes stopped processing messages
>>>>> 
>>>>> When I turn on debug log I see lots of these messages:
>>>>> 
>>>>> 2017-04-30 15:42:20,228 [StreamThread-1] DEBUG o.a.k.c.c.i.Fetcher:
>>>> Sending
>>>>> fetch for partitions [devlast-changelog-2] to broker ip-x-x-x-x
>>>>> .ec2.internal:9092 (id: 1 rack: null)
>>>>> 2017-04-30 15:42:20,696 [StreamThread-1] DEBUG o.a.k.c.c.i.Fetcher:
>>>>> Ignoring fetched records for devlast-changelog-2 at offset 2962649
>> since
>>>>> the current position is 2963379
>>>>> 
>>>>> After a LONG time, the only messages in the log are these:
>>>>> 
>>>>> 2017-04-30 16:46:33,324 [kafka-coordinator-heartbeat-thread | sa]
>> DEBUG
>>>>> o.a.k.c.c.i.AbstractCoordinator: Sending Heartbeat request for group
>> sa
>>>> to
>>>>> coordinator ip-x-x-x-x.ec2.internal:9092 (id: 2147483646 rack: null)
>>>>> 2017-04-30 16:46:33,425 [kafka-coordinator-heartbeat-thread | sa]
>> DEBUG
>>>>> o.a.k.c.c.i.AbstractCoordinator: Received successful Heartbeat
>> response
>>>> for
>>>>> group same
>>>>> 
>>>>> Any idea?
>>>>> 
>>>>> Thanks,
>>>>> Shimi
>>>> 
>>>> 
>> 
>> 

Reply via email to