Kafka message replay/ consumer offset corruption

2018-03-06 Thread Zhang, Chenyuan (WorldQuant)
Hi all,

I'm experiencing a message replay problem in Kafka, which I suspect it's being 
caused by corrupted consumer offset, which is caused by corrupted group 
metadata.

Background:

* Kafka cluster of 3 brokers with version 0.11.0.0.

* Zookeeper cluster of 3 nodes with version 3.4.8.

* Group xxx only consumes 1 topic, which has a partition of 1, and 
replication of 3.

The issue occurs when one of the brokers (broker 0) disconnected from 
zookeeper, which triggered group coordinator to migrate from broker 0 to broker 
1. During the migration, broker 1 tried to load group xxx's metadata 9 times, 
and ended up loading an old group metadata instead of the latest (because of 
val currentGroup = groupMetadataCache.putIfNotExists(group.groupId, group) 
https://github.com/apache/kafka/blob/1cabef0d3dc7a3c245f260b8d34a60d7d044bb9c/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala),
 which caused consumer offset corruption and message replay.

Looking at Kafka source code, 
groupCoordinator.handleGroupImmigration(partition.partitionId) should be 
executed once per partition, which means, group xxx's metadata should be loaded 
once during group coordinator migration. But from the Kafka server logs, this 
group metadata was loaded 9 times.

Kafka logs:
[Broker 0] [2018-02-02 09:51:29,599] INFO [GroupCoordinator 0]: 
Stabilized group xxx generation 352992 (__consumer_offsets-1) 
(kafka.coordinator.group.GroupCoordinator)
[Broker 0] [2018-02-02 09:51:29,667] INFO zookeeper state changed (Disconnected)
[Broker 1] [2018-02-02 09:51:30,000] INFO [GroupCoordinator 1]: Loading group 
metadata for xxx with generation 338355 
(kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,117] INFO [GroupCoordinator 1]: Loading group 
metadata for xxx with generation 340494 
(kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,248] INFO [GroupCoordinator 1]: Loading group 
metadata for xxx with generation 342313 
(kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,366] INFO [GroupCoordinator 1]: Loading group 
metadata for xxx with generation 344311 
(kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,506] INFO [GroupCoordinator 1]: Loading group 
metadata for xxx with generation 346157 
(kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,615] INFO [GroupCoordinator 1]: Loading group 
metadata for xxx with generation 348051 
(kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,735] INFO [GroupCoordinator 1]: Loading group 
metadata for xxx with generation 350699 
(kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,835] INFO [GroupCoordinator 1]: Loading group 
metadata for xxx with generation 352762 
(kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:30,849] INFO [GroupCoordinator 1]: Loading group 
metadata for xxx with generation 352992 
(kafka.coordinator.group.GroupCoordinator)
[Broker 1] [2018-02-02 09:51:31,526] INFO [GroupCoordinator 1]: Preparing to 
rebalance group xxx with old generation 338355 (__consumer_offsets-1)

Do you know why there are so many "Loading group metadata" actions in broker 1 
for group xxx? Could this be because of some configuration issue or operation 
issue? How can I prevent it from happening again?

Any help is appreciated.

Thanks,
Chenyuan



###

The information contained in this communication is confidential, may be

subject to legal privilege, and is intended only for the individual named.

If you are not the named addressee, please notify the sender immediately and

delete this email from your system.  The views expressed in this email are

the views of the sender only.  Outgoing and incoming electronic communications

to this address are electronically archived and subject to review and/or 
disclosure

to someone other than the recipient.

###


Re: [ANNOUNCE] Apache Kafka 1.0.1 Released

2018-03-06 Thread James Cheng
Congrats, everyone! Thanks for driving the release, Ewen!

-James

> On Mar 6, 2018, at 1:22 PM, Guozhang Wang  wrote:
> 
> Ewen, thanks for driving the release!!
> 
> 
> Guozhang
> 
> On Tue, Mar 6, 2018 at 1:14 PM, Ewen Cheslack-Postava  wrote:
> 
>> The Apache Kafka community is pleased to announce the release for Apache
>> Kafka
>> 1.0.1.
>> 
>> This is a bugfix release for the 1.0 branch that was first released with
>> 1.0.0 about 4 months ago. We've fixed 49 issues since that release. Most of
>> these are non-critical, but in aggregate these fixes will have significant
>> impact. A few of the more significant fixes include:
>> 
>> * KAFKA-6277: Make loadClass thread-safe for class loaders of Connect
>> plugins
>> * KAFKA-6185: Selector memory leak with high likelihood of OOM in case of
>> down conversion
>> * KAFKA-6269: KTable state restore fails after rebalance
>> * KAFKA-6190: GlobalKTable never finishes restoring when consuming
>> transactional messages
>> * KAFKA-6529: Stop file descriptor leak when client disconnects with
>> staged receives
>> * KAFKA-6238: Issues with protocol version when applying a rolling upgrade
>> to 1.0.0
>> 
>> 
>> All of the changes in this release can be found in the release notes:
>> 
>> 
>> https://dist.apache.org/repos/dist/release/kafka/1.0.1/RELEASE_NOTES.html
>> 
>> 
>> 
>> You can download the source release from:
>> 
>> 
>> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
>> kafka-1.0.1-src.tgz
>> 
>> 
>> and binary releases from:
>> 
>> 
>> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
>> kafka_2.11-1.0.1.tgz
>> (Scala 2.11)
>> 
>> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
>> kafka_2.12-1.0.1.tgz
>> (Scala 2.12)
>> 
>> 
>> ---
>> 
>> 
>> Apache Kafka is a distributed streaming platform with four core APIs:
>> 
>> 
>> ** The Producer API allows an application to publish a stream records to
>> one or more Kafka topics.
>> 
>> 
>> ** The Consumer API allows an application to subscribe to one or more
>> topics and process the stream of records produced to them.
>> 
>> 
>> ** The Streams API allows an application to act as a stream processor,
>> consuming an input stream from one or more topics and producing an output
>> stream to one or more output topics, effectively transforming the input
>> streams to output streams.
>> 
>> 
>> ** The Connector API allows building and running reusable producers or
>> consumers that connect Kafka topics to existing applications or data
>> systems. For example, a connector to a relational database might capture
>> every change to a table.three key capabilities:
>> 
>> 
>> 
>> With these APIs, Kafka can be used for two broad classes of application:
>> 
>> 
>> ** Building real-time streaming data pipelines that reliably get data
>> between systems or applications.
>> 
>> 
>> ** Building real-time streaming applications that transform or react to the
>> streams of data.
>> 
>> 
>> 
>> Apache Kafka is in use at large and small companies worldwide, including
>> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
>> Target, The New York Times, Uber, Yelp, and Zalando, among others.
>> 
>> 
>> 
>> A big thank you for the following 36 contributors to this release!
>> 
>> Alex Good, Andras Beni, Andy Bryant, Arjun Satish, Bill Bejeck, Colin P.
>> Mccabe, Colin Patrick McCabe, ConcurrencyPractitioner, Damian Guy, Daniel
>> Wojda, Dong Lin, Edoardo Comar, Ewen Cheslack-Postava, Filipe Agapito,
>> fredfp, Guozhang Wang, huxihx, Ismael Juma, Jason Gustafson, Jeremy
>> Custenborder, Jiangjie (Becket) Qin, Joel Hamill, Konstantine Karantasis,
>> lisa2lisa, Logan Buckley, Manjula K, Matthias J. Sax, Nick Chiu, parafiend,
>> Rajini Sivaram, Randall Hauch, Robert Yokota, Ron Dagostino, tedyu,
>> Yaswanth Kumar, Yu.
>> 
>> 
>> We welcome your help and feedback. For more information on how to
>> report problems,
>> and to get involved, visit the project website at http://kafka.apache.org/
>> 
>> 
>> Thank you!
>> Ewen
>> 
> 
> 
> 
> -- 
> -- Guozhang



KafkaConsumer (0.11) assign then poll - doesn't return results for all assigned TopicPartitions

2018-03-06 Thread Yi Yin
I want to manually fetch messages from all partitions of a topic. I'm doing
this by:

1. Create a list of TopicPartition - one for each partition of my topic
2. Create KafkConsumer, and call .assign(myTopicPartitionsList)
3. For Each TopicPartition, seek to the offset I want to read


But when I call consumer.poll(timeOut) - I only get messages from one of my
partitions. I checked the consumer configs and don't see anything that
would limit the fetching to some the partitions.

Does anyone have any insights on how I can fetch messages for all the
TopicPartition assigned to the consumer?

I posted the relevant code snippet and consumer config below.

Thanks!




relevant code:

List topicPartitions = new ArrayList<>();

for (int partitionNumber = 0; partitionNumber < NumPartitionsForTopic;
partitionNumber++) {
  topicPartitions.add(new TopicPartition("my_topic", partitionNumber));
}

...

try {
  KafkaConsumer consumer = KafkaConsumer<>(consumerProps);
  consumer.assign(topicPartitions);

  //seek to specific offset for each partition of the topic
  for (TopicParittion topicPartition : topicPartitions) {
consumer.seek(topicPartition,
offsetForPartition[topicPartition.partition()]);
  }
  ConsumerRecords records = consumer.poll(1);
  //ISSUE: records only contains messages from 1 of the partitions
} catch (...) {
  //hndle exceptions
} finally {
  //close consumer
}


My KafkaConsumer config:
INFO: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = none
bootstrap.servers = [brokerhostname1.com:9092, brokerhostname2.com:9092,
brokerhostname3.com:9092]
check.crcs = true
client.id = my_client_id
connections.max.idle.ms = 54
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = my_group_id_123
heartbeat.interval.ms = 3000
interceptor.classes = null
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 3145728
max.poll.interval.ms = 30
max.poll.records = 500
metadata.max.age.ms = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
send.buffer.bytes = 131072
session.timeout.ms = 1
value.deserializer = class
org.apache.kafka.common.serialization.ByteArrayDeserializer


Re: [ANNOUNCE] Apache Kafka 1.0.1 Released

2018-03-06 Thread Guozhang Wang
Ewen, thanks for driving the release!!


Guozhang

On Tue, Mar 6, 2018 at 1:14 PM, Ewen Cheslack-Postava  wrote:

> The Apache Kafka community is pleased to announce the release for Apache
> Kafka
> 1.0.1.
>
> This is a bugfix release for the 1.0 branch that was first released with
> 1.0.0 about 4 months ago. We've fixed 49 issues since that release. Most of
> these are non-critical, but in aggregate these fixes will have significant
> impact. A few of the more significant fixes include:
>
> * KAFKA-6277: Make loadClass thread-safe for class loaders of Connect
> plugins
> * KAFKA-6185: Selector memory leak with high likelihood of OOM in case of
> down conversion
> * KAFKA-6269: KTable state restore fails after rebalance
> * KAFKA-6190: GlobalKTable never finishes restoring when consuming
> transactional messages
> * KAFKA-6529: Stop file descriptor leak when client disconnects with
> staged receives
> * KAFKA-6238: Issues with protocol version when applying a rolling upgrade
> to 1.0.0
>
>
> All of the changes in this release can be found in the release notes:
>
>
> https://dist.apache.org/repos/dist/release/kafka/1.0.1/RELEASE_NOTES.html
>
>
>
> You can download the source release from:
>
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
> kafka-1.0.1-src.tgz
>
>
> and binary releases from:
>
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
> kafka_2.11-1.0.1.tgz
> (Scala 2.11)
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
> kafka_2.12-1.0.1.tgz
> (Scala 2.12)
>
> 
> ---
>
>
> Apache Kafka is a distributed streaming platform with four core APIs:
>
>
> ** The Producer API allows an application to publish a stream records to
> one or more Kafka topics.
>
>
> ** The Consumer API allows an application to subscribe to one or more
> topics and process the stream of records produced to them.
>
>
> ** The Streams API allows an application to act as a stream processor,
> consuming an input stream from one or more topics and producing an output
> stream to one or more output topics, effectively transforming the input
> streams to output streams.
>
>
> ** The Connector API allows building and running reusable producers or
> consumers that connect Kafka topics to existing applications or data
> systems. For example, a connector to a relational database might capture
> every change to a table.three key capabilities:
>
>
>
> With these APIs, Kafka can be used for two broad classes of application:
>
>
> ** Building real-time streaming data pipelines that reliably get data
> between systems or applications.
>
>
> ** Building real-time streaming applications that transform or react to the
> streams of data.
>
>
>
> Apache Kafka is in use at large and small companies worldwide, including
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
>
>
>
> A big thank you for the following 36 contributors to this release!
>
> Alex Good, Andras Beni, Andy Bryant, Arjun Satish, Bill Bejeck, Colin P.
> Mccabe, Colin Patrick McCabe, ConcurrencyPractitioner, Damian Guy, Daniel
> Wojda, Dong Lin, Edoardo Comar, Ewen Cheslack-Postava, Filipe Agapito,
> fredfp, Guozhang Wang, huxihx, Ismael Juma, Jason Gustafson, Jeremy
> Custenborder, Jiangjie (Becket) Qin, Joel Hamill, Konstantine Karantasis,
> lisa2lisa, Logan Buckley, Manjula K, Matthias J. Sax, Nick Chiu, parafiend,
> Rajini Sivaram, Randall Hauch, Robert Yokota, Ron Dagostino, tedyu,
> Yaswanth Kumar, Yu.
>
>
> We welcome your help and feedback. For more information on how to
> report problems,
> and to get involved, visit the project website at http://kafka.apache.org/
>
>
> Thank you!
> Ewen
>



-- 
-- Guozhang


Re: [VOTE] 1.1.0 RC1

2018-03-06 Thread Ted Yu
+1

Checked signature
Ran test suite - apart from flaky testMetricsLeak, other tests passed.

On Tue, Mar 6, 2018 at 2:45 AM, Damian Guy  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the second candidate for release of Apache Kafka 1.1.0.
>
> This is minor version release of Apache Kakfa. It Includes 29 new KIPs.
> Please see the release plan for more details:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71764913
>
> A few highlights:
>
> * Significant Controller improvements (much faster and session expiration
> edge cases fixed)
> * Data balancing across log directories (JBOD)
> * More efficient replication when the number of partitions is large
> * Dynamic Broker Configs
> * Delegation tokens (KIP-48)
> * Kafka Streams API improvements (KIP-205 / 210 / 220 / 224 / 239)
>
> Release notes for the 1.1.0 release:
> http://home.apache.org/~damianguy/kafka-1.1.0-rc1/RELEASE_NOTES.html
>
> *** Please download, test and vote by Friday, March 9th, 5pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~damianguy/kafka-1.1.0-rc1/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~damianguy/kafka-1.1.0-rc1/javadoc/
>
> * Tag to be voted upon (off 1.1 branch) is the 1.1.0 tag:
> https://github.com/apache/kafka/tree/1.1.0-rc1
>
>
> * Documentation:
> http://kafka.apache.org/11/documentation.html
>
> * Protocol:
> http://kafka.apache.org/11/protocol.html
>
> * Successful Jenkins builds for the 1.1 branch:
> Unit/integration tests: https://builds.apache.org/job/kafka-1.1-jdk7/68
> System tests: https://jenkins.confluent.io/job/system-test-kafka/job/1.1/
> 30/
>
> /**
>
> Thanks,
> Damian Guy
>


[ANNOUNCE] Apache Kafka 1.0.1 Released

2018-03-06 Thread Ewen Cheslack-Postava
The Apache Kafka community is pleased to announce the release for Apache Kafka
1.0.1.

This is a bugfix release for the 1.0 branch that was first released with 1.0.0 
about 4 months ago. We've fixed 49 issues since that release. Most of these are 
non-critical, but in aggregate these fixes will have significant impact. A few 
of the more significant fixes include:

* KAFKA-6277: Make loadClass thread-safe for class loaders of Connect plugins
* KAFKA-6185: Selector memory leak with high likelihood of OOM in case of down 
conversion
* KAFKA-6269: KTable state restore fails after rebalance
* KAFKA-6190: GlobalKTable never finishes restoring when consuming 
transactional messages
* KAFKA-6529: Stop file descriptor leak when client disconnects with staged 
receives
* KAFKA-6238: Issues with protocol version when applying a rolling upgrade to 
1.0.0


All of the changes in this release can be found in the release notes:


https://dist.apache.org/repos/dist/release/kafka/1.0.1/RELEASE_NOTES.html



You can download the source release from:


https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/kafka-1.0.1-src.tgz


and binary releases from:


https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/kafka_2.11-1.0.1.tgz
(Scala 2.11)

https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/kafka_2.12-1.0.1.tgz
(Scala 2.12)

---


Apache Kafka is a distributed streaming platform with four core APIs:


** The Producer API allows an application to publish a stream records to
one or more Kafka topics.


** The Consumer API allows an application to subscribe to one or more
topics and process the stream of records produced to them.


** The Streams API allows an application to act as a stream processor,
consuming an input stream from one or more topics and producing an output
stream to one or more output topics, effectively transforming the input
streams to output streams.


** The Connector API allows building and running reusable producers or
consumers that connect Kafka topics to existing applications or data
systems. For example, a connector to a relational database might capture
every change to a table.three key capabilities:



With these APIs, Kafka can be used for two broad classes of application:


** Building real-time streaming data pipelines that reliably get data
between systems or applications.


** Building real-time streaming applications that transform or react to the
streams of data.



Apache Kafka is in use at large and small companies worldwide, including 
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank, 
Target, The New York Times, Uber, Yelp, and Zalando, among others.



A big thank you for the following 36 contributors to this release!

Alex Good, Andras Beni, Andy Bryant, Arjun Satish, Bill Bejeck, Colin P. 
Mccabe, Colin Patrick McCabe, ConcurrencyPractitioner, Damian Guy, Daniel 
Wojda, Dong Lin, Edoardo Comar, Ewen Cheslack-Postava, Filipe Agapito, fredfp, 
Guozhang Wang, huxihx, Ismael Juma, Jason Gustafson, Jeremy Custenborder, 
Jiangjie (Becket) Qin, Joel Hamill, Konstantine Karantasis, lisa2lisa, Logan 
Buckley, Manjula K, Matthias J. Sax, Nick Chiu, parafiend, Rajini Sivaram, 
Randall Hauch, Robert Yokota, Ron Dagostino, tedyu, Yaswanth Kumar, Yu.


We welcome your help and feedback. For more information on how to
report problems,
and to get involved, visit the project website at http://kafka.apache.org/


Thank you!
Ewen


Re: when use kafka streams to(topic) method sometime throw error?

2018-03-06 Thread Sharat Joshi
unsubscribe

On Mon, Mar 5, 2018 at 7:23 PM, ? ?  wrote:

>
> hi:
> I meet a problem today.
> when I use kafka stream to consumer one topic and do mapValues() method,
> and to another topic then .sometimes throw an error
> this is code sample:
> new StreamsBuilder().stream(xxxtopic, Consumed.with(Serdes.String(),
> Serdes.String())).mapValus(method).to(newTopic).
> sometimes it's work well but sometime it's throw error
>
> to topic newTopic due to org.apache.kafka.common.errors.TimeoutException:
> Expiring 6 record(s) for newTopic-2: 30030 ms has passed since last attempt
> plus backoff time
>
> 
> funk...@live.com
>


Re: when use kafka streams to(topic) method sometime throw error?

2018-03-06 Thread Matthias J. Sax
Yes.

It's a known bug. You can read details here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer

You can avoid it, by increasing the `request.timeout.ms` parameter for
the producer.


-Matthias

On 3/5/18 9:46 PM, 杰 杨 wrote:
> it seems i don't config ProducerConfig in stream application.
> should I config that ?
> 
> 
> 
> 
> funk...@live.com
> 
> From: funk...@live.com
> Date: 2018-03-06 11:23
> To: users
> Subject: when use kafka streams to(topic) method sometime throw error?
> 
> hi:
> I meet a problem today.
> when I use kafka stream to consumer one topic and do mapValues() method,
> and to another topic then .sometimes throw an error
> this is code sample:
> new StreamsBuilder().stream(xxxtopic, Consumed.with(Serdes.String(), 
> Serdes.String())).mapValus(method).to(newTopic).
> sometimes it's work well but sometime it's throw error
> 
> to topic newTopic due to org.apache.kafka.common.errors.TimeoutException: 
> Expiring 6 record(s) for newTopic-2: 30030 ms has passed since last attempt 
> plus backoff time
> 
> 
> funk...@live.com
> 



signature.asc
Description: OpenPGP digital signature


Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-06 Thread Guozhang Wang
Thanks for creating the JIRA ticket.

Streams library follows "event-time" concept by default with the metadata
timestamp extractor, expecting the timestamp set in this field reflects
"when the event happens in real-time":

https://kafka.apache.org/10/documentation/streams/core-concepts#streams_time

Following that expectation, the timestamps Streams used for windowed
aggregation results is the window start time, indicating "events
happened during
this window in real-time resulted in this aggregated value".


Guozhang


On Tue, Mar 6, 2018 at 6:39 AM, Dmitriy Vsekhvalnov 
wrote:

> Guozhang,
>
> here we go with ticket: https://issues.apache.org/jira/browse/KAFKA-6614
>
> i'd also like to continue discussion little bit further about timestamps.
> Was trying to test with broker configured "CreateTime" and got question
> about sink topic timestamps, back to example:
>
> KTable summaries = in
>.groupBy((key, value) -> ..)
>.windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
>.count();
>
> summaries.toStream().to(sink);
>
> Each record written to sink will get timestamp assigned to grouping window
> start time, which quite often will be in the past.
>
> What the logic behind that? Honestly was expected sink messages to get
> "now" timestamp.
>
>
> On Mon, Mar 5, 2018 at 11:48 PM, Guozhang Wang  wrote:
>
> > Sounds great! :)
> >
> > On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov <
> > dvsekhval...@gmail.com
> > > wrote:
> >
> > > Thanks, that's an option, i'll take a look at configuration.
> > >
> > > But yeah, i was thinking same, if streams relies on the fact that
> > internal
> > > topics should use 'CreateTime' configuration, then it is streams
> library
> > > responsibility to configure it.
> > >
> > > I can open a Jira ticket :)
> > >
> > > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Hello Dmitriy,
> > > >
> > > > In your case, you can override this config to CreateTime only for the
> > > > internal topics created by Streams, this is documented in
> > > >
> > > > https://kafka.apache.org/10/javadoc/org/apache/kafka/
> > > > streams/StreamsConfig.html#TOPIC_PREFIX
> > > >
> > > >
> > > > We are also discussing to always override the
> > log.message.timestamp.type
> > > > config for internal topics to CreateTime, I vaguely remember there
> is a
> > > > JIRA open for it in case you are interested in contributing to
> Streams
> > > > library.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> > > > dvsekhval...@gmail.com
> > > > > wrote:
> > > >
> > > > > Which effectively means given scenario is not working with
> > > LogAppendTime,
> > > > > correct? Because all internal re-partition topics will always
> contain
> > > > "now"
> > > > > instead of real timestamp from original payload message?
> > > > >
> > > > > Is kafka-streams designed to work with LogAppendTime at all? It
> > seems a
> > > > lot
> > > > > of stuff will NOT work correctly using
> > > > > built-in ExtractRecordMetadataTimestamp ?
> > > > >
> > > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > If broker configures log.message.timestamp.type=LogAppendTime
> > > > > universally,
> > > > > > it will ignore whatever timestamp set in the message metadata and
> > > > > override
> > > > > > it with the append time. So when the messages are fetched by
> > > downstream
> > > > > > processors which always use the metadata timestamp extractor, it
> > will
> > > > get
> > > > > > the append timestamp set by brokers.
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > > > > dvsekhval...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Guozhang,
> > > > > > >
> > > > > > > interesting, will same logic applies (internal topic rewrite)
> for
> > > > > brokers
> > > > > > > configured with:
> > > > > > >   log.message.timestamp.type=LogAppendTime
> > > > > > >
> > > > > > > ?
> > > > > > >
> > > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <
> > wangg...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Dmitriy,
> > > > > > > >
> > > > > > > > What you have observed is by design, and it maybe a bit
> > confusing
> > > > at
> > > > > > > first
> > > > > > > > place. Let me explain:
> > > > > > > >
> > > > > > > > When you do a group-by aggregation like the above case,
> during
> > > the
> > > > > > > > "groupBy((key,
> > > > > > > > value) -> ..)" stage Streams library will do a
> > > re-partitioning
> > > > by
> > > > > > > > sending the original data stream into an internal repartition
> > > topic
> > > > > > based
> > > > > > > > on the aggregation key defined in the "groupBy" function and
> > > fetch
> > > > > from
> > > > > > > > that topic again. This is similar to a shuffle phase in
> > > 

Re: kafka-streams, late data, tumbling windows aggregations and event time

2018-03-06 Thread Dmitriy Vsekhvalnov
Guozhang,

here we go with ticket: https://issues.apache.org/jira/browse/KAFKA-6614

i'd also like to continue discussion little bit further about timestamps.
Was trying to test with broker configured "CreateTime" and got question
about sink topic timestamps, back to example:

KTable summaries = in
   .groupBy((key, value) -> ..)
   .windowedBy(TimeWindows.of(TimeUnit.HOURS.toMillis(1l)))
   .count();

summaries.toStream().to(sink);

Each record written to sink will get timestamp assigned to grouping window
start time, which quite often will be in the past.

What the logic behind that? Honestly was expected sink messages to get
"now" timestamp.


On Mon, Mar 5, 2018 at 11:48 PM, Guozhang Wang  wrote:

> Sounds great! :)
>
> On Mon, Mar 5, 2018 at 12:28 PM, Dmitriy Vsekhvalnov <
> dvsekhval...@gmail.com
> > wrote:
>
> > Thanks, that's an option, i'll take a look at configuration.
> >
> > But yeah, i was thinking same, if streams relies on the fact that
> internal
> > topics should use 'CreateTime' configuration, then it is streams library
> > responsibility to configure it.
> >
> > I can open a Jira ticket :)
> >
> > On Mon, Mar 5, 2018 at 11:18 PM, Guozhang Wang 
> wrote:
> >
> > > Hello Dmitriy,
> > >
> > > In your case, you can override this config to CreateTime only for the
> > > internal topics created by Streams, this is documented in
> > >
> > > https://kafka.apache.org/10/javadoc/org/apache/kafka/
> > > streams/StreamsConfig.html#TOPIC_PREFIX
> > >
> > >
> > > We are also discussing to always override the
> log.message.timestamp.type
> > > config for internal topics to CreateTime, I vaguely remember there is a
> > > JIRA open for it in case you are interested in contributing to Streams
> > > library.
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Mar 5, 2018 at 10:50 AM, Dmitriy Vsekhvalnov <
> > > dvsekhval...@gmail.com
> > > > wrote:
> > >
> > > > Which effectively means given scenario is not working with
> > LogAppendTime,
> > > > correct? Because all internal re-partition topics will always contain
> > > "now"
> > > > instead of real timestamp from original payload message?
> > > >
> > > > Is kafka-streams designed to work with LogAppendTime at all? It
> seems a
> > > lot
> > > > of stuff will NOT work correctly using
> > > > built-in ExtractRecordMetadataTimestamp ?
> > > >
> > > > On Mon, Mar 5, 2018 at 9:30 PM, Guozhang Wang 
> > > wrote:
> > > >
> > > > > If broker configures log.message.timestamp.type=LogAppendTime
> > > > universally,
> > > > > it will ignore whatever timestamp set in the message metadata and
> > > > override
> > > > > it with the append time. So when the messages are fetched by
> > downstream
> > > > > processors which always use the metadata timestamp extractor, it
> will
> > > get
> > > > > the append timestamp set by brokers.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Mar 5, 2018 at 9:53 AM, Dmitriy Vsekhvalnov <
> > > > > dvsekhval...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Guozhang,
> > > > > >
> > > > > > interesting, will same logic applies (internal topic rewrite) for
> > > > brokers
> > > > > > configured with:
> > > > > >   log.message.timestamp.type=LogAppendTime
> > > > > >
> > > > > > ?
> > > > > >
> > > > > > On Mon, Mar 5, 2018 at 8:33 PM, Guozhang Wang <
> wangg...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hello Dmitriy,
> > > > > > >
> > > > > > > What you have observed is by design, and it maybe a bit
> confusing
> > > at
> > > > > > first
> > > > > > > place. Let me explain:
> > > > > > >
> > > > > > > When you do a group-by aggregation like the above case, during
> > the
> > > > > > > "groupBy((key,
> > > > > > > value) -> ..)" stage Streams library will do a
> > re-partitioning
> > > by
> > > > > > > sending the original data stream into an internal repartition
> > topic
> > > > > based
> > > > > > > on the aggregation key defined in the "groupBy" function and
> > fetch
> > > > from
> > > > > > > that topic again. This is similar to a shuffle phase in
> > distributed
> > > > > > > computing frameworks to make sure the down stream aggregations
> > can
> > > be
> > > > > > done
> > > > > > > in parallel. When the "groupBy" operator sends the messages to
> > this
> > > > > > > repartition topic, it will set in the record metadata the
> > extracted
> > > > > > > timestamp from the payload, and hence for the downstream
> > > aggregation
> > > > > > > operator to read from this repartition topic, it is OK to
> always
> > > use
> > > > > > > the ExtractRecordMetadataTimestamp
> > > > > > > to extract that timestamp and use the extracted value to
> > determine
> > > > > which
> > > > > > > window this record should fall into.
> > > > > > >
> > > > > > > More details can be found in this JIRA:
> > > > > > >
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-4785
> > > > > > >
> > > > > > >
> > > > > > > So the record 

Producing more number of Records than expected

2018-03-06 Thread pravin kumar
I have run wikifeed example. i have three topics:
wikifeedInputtopicDemo2-10 partitions
wikifeedOutputtopicDemo2-10 partitions
sumoutputeventopicDemo2-5 partitions

i have produced 10 records.but in the
inputTopic(wikifeedInputtopicDemo2) it receives more than 10
records.
can someone explain how this happens??

[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1
wikifeedInputtopicDemo2:8:13400
wikifeedInputtopicDemo2:2:13401
wikifeedInputtopicDemo2:5:13400
wikifeedInputtopicDemo2:4:13400
wikifeedInputtopicDemo2:7:13399
wikifeedInputtopicDemo2:1:13399
wikifeedInputtopicDemo2:9:13400
wikifeedInputtopicDemo2:3:13400
wikifeedInputtopicDemo2:6:13400
wikifeedInputtopicDemo2:0:13400

here is my processorTopology code:

//

public static KafkaStreams getWikifeed(){

Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
//properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsBuilder builder= new StreamsBuilder();
KStream inputStream=builder.stream(WIKIFEED_INPUT);
KTable kTable=inputStream
.filter((key, value) -> value.isNew())
.map(((key, value) -> KeyValue.pair(value.getName(),value)))
.groupByKey()
.count(Materialized.as(COUNT_STORE));
kTable.toStream().to(WIKIFEED_OUTPUT,
Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams= new KafkaStreams(builder.build(),properties);

return streams;
}

->


My driver code is in the attachment file.
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class wikifeedDriverExample {

final static String BOOTSTRAP_SERVERS="localhost:9092";
final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda1";

public static void main(String[] args) {
ProducerInput();
ConsumerOutput();
}

public static void ProducerInput(){
String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam",
"lauren", "joseph"};

Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass());

KafkaProducer producer=new KafkaProducer(properties);
Random random=new Random();
IntStream.range(0,random.nextInt(10))
.mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content"))
.forEach(record -> producer.send(new ProducerRecord(WikifeedLambdaexample.WIKIFEED_INPUT,null,record)));
producer.flush();
}

public static void ConsumerOutput() {

Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
   //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
  //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3");

KafkaConsumer consumer = new KafkaConsumer(properties, new StringDeserializer(), new LongDeserializer());
consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_OUTPUT));

while (true) {
consumer.poll(100)

Producing more number of Records than expected

2018-03-06 Thread pravin kumar
I have run wikifeed example. i have three topics:
wikifeedInputtopicDemo2-10 partitions
wikifeedOutputtopicDemo2-10 partitions
sumoutputeventopicDemo2-5 partitions

i have produced 10 records.but in the
inputTopic(wikifeedInputtopicDemo2) it receives more than 10
records.
can someone explain how this happens??

[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1
wikifeedInputtopicDemo2:8:13400
wikifeedInputtopicDemo2:2:13401
wikifeedInputtopicDemo2:5:13400
wikifeedInputtopicDemo2:4:13400
wikifeedInputtopicDemo2:7:13399
wikifeedInputtopicDemo2:1:13399
wikifeedInputtopicDemo2:9:13400
wikifeedInputtopicDemo2:3:13400
wikifeedInputtopicDemo2:6:13400
wikifeedInputtopicDemo2:0:13400

here is my processorTopology code:

//

public static KafkaStreams getWikifeed(){

Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());

properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
//properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

StreamsBuilder builder= new StreamsBuilder();
KStream inputStream=builder.stream(WIKIFEED_INPUT);
KTable kTable=inputStream
.filter((key, value) -> value.isNew())
.map(((key, value) -> KeyValue.pair(value.getName(),value)))
.groupByKey()
.count(Materialized.as(COUNT_STORE));
kTable.toStream().to(WIKIFEED_OUTPUT,
Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams= new KafkaStreams(builder.build(),properties);

return streams;
}

->


My driver code is in the attachment file.


[VOTE] 1.1.0 RC1

2018-03-06 Thread Damian Guy
Hello Kafka users, developers and client-developers,

This is the second candidate for release of Apache Kafka 1.1.0.

This is minor version release of Apache Kakfa. It Includes 29 new KIPs.
Please see the release plan for more details:

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71764913

A few highlights:

* Significant Controller improvements (much faster and session expiration
edge cases fixed)
* Data balancing across log directories (JBOD)
* More efficient replication when the number of partitions is large
* Dynamic Broker Configs
* Delegation tokens (KIP-48)
* Kafka Streams API improvements (KIP-205 / 210 / 220 / 224 / 239)

Release notes for the 1.1.0 release:
http://home.apache.org/~damianguy/kafka-1.1.0-rc1/RELEASE_NOTES.html

*** Please download, test and vote by Friday, March 9th, 5pm PT

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~damianguy/kafka-1.1.0-rc1/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~damianguy/kafka-1.1.0-rc1/javadoc/

* Tag to be voted upon (off 1.1 branch) is the 1.1.0 tag:
https://github.com/apache/kafka/tree/1.1.0-rc1


* Documentation:
http://kafka.apache.org/11/documentation.html

* Protocol:
http://kafka.apache.org/11/protocol.html

* Successful Jenkins builds for the 1.1 branch:
Unit/integration tests: https://builds.apache.org/job/kafka-1.1-jdk7/68
System tests: https://jenkins.confluent.io/job/system-test-kafka/job/1.1/30/

/**

Thanks,
Damian Guy


Re: committing offset metadata in kafka streams

2018-03-06 Thread Stas Chizhov
Thank you, Matthias!

We currently do use kafka consumer and store event time highwatermarks as
offset metadata. This is used during backup procedure, which is to create a
snapshot of the target storage with all events up to certain timestamp and
no other.

As for the API - I guess being able to provide partition-to-metadata map in
the context commit method would do it (to be called from within punctuate
method). BTW as far as I understand if Processor API is used flushing
producers and committing offsets is correlated and both output topic state
and committed offsets do correspond to a state at the moment of some
punctuation. Meaning that if I do have a deterministic processing topology
my output topic is going to be deterministic as well (modulo duplicates of
course).  Am I correct here?

Best regards,
Stanislav.


2018-03-05 2:31 GMT+01:00 Matthias J. Sax :

> You are correct. This is not possible atm.
>
> Note, that commits happen "under the hood" and users cannot commit
> explicitly. Users can only "request" as commit -- this implies that
> Kafka Streams will commit as soon as possible -- but when
> `context#commit()` returns, the commit is not done yet (it only sets a
> flag).
>
> What is your use case for this? How would you want to use this from an
> API point of view?
>
> Feel free to open a feature request JIRA -- we don't have any plans to
> add this atm -- it's the first time anybody asks for this feature. If
> there is a JIRA, maybe somebody picks it up :)
>
>
> -Matthias
>
> On 3/3/18 6:51 AM, Stas Chizhov wrote:
> > Hi,
> >
> > There seems to be no way to commit custom metadata along with offsets
> from
> > within Kafka Streams.
> > Are there any plans to expose this functionality or have I missed
> something?
> >
> > Best regards,
> > Stanislav.
> >
>
>


Kafka cluster loses messages after zookeeper restart

2018-03-06 Thread Булат Юсупов
Hi,

I'm starting a cluster of kafka brokers using Docker (5 brokers for
example, one broker per container). Kafka version 2.12-0.11.0.0, Zookeeper
3.4.10.

*The scenario:*

   - Starting 1st broker with config below

*zoo.cfg*

tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=100
server.1=0.0.0.0:2888:3888

*server.properties*

broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker1_IP:broker1_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824log.retention.check.interval.ms=30
zookeeper.connect=127.0.0.1:2181zookeeper.session.timeout.ms=6000zookeeper.connection.timeout.ms=100group.initial.rebalance.delay.ms=0

*producer.properties*

bootstrap.servers=localhost:9092
compression.type=none

*consumer.properties*

zookeeper.connect=127.0.0.1:2181zookeeper.session.timeout.ms=6000zookeeper.connection.timeout.ms=100group.id=test-consumer-group


   -

   Zookeeper is started in standalone mode, then starts kafka
   -

   Creating topic

/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic my-test-topic1

   - Sending message

echo "test_kafka1" | /opt/kafka/bin/kafka-console-producer.sh --broker-list
localhost:9092 --topic my-test-topic1

   - Checking message

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--from-beginning --topic my-test-topic1 --max-messages 1

*Message is recieved*

   - Describe the topic

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic my-test-topic1 Topic:my-test-topic1 PartitionCount:1
ReplicationFactor:1 Configs: Topic: my-test-topic1 Partition: 0 Leader: 1
Replicas: 1 Isr: 1

   - Starting rest 4 brokers

*zoo.cfg* on each broker from 1st to 5th (only 0.0.0.0:2888:3888 position
differs)

tickTime=2000
initLimit=10
syncLimit=5

dataDir=/opt/zookeeper/data

clientPort=2181
maxClientCnxns=10
minSessionTimeout=4000
maxSessionTimeout=100
server.1=0.0.0.0:2888:3888
server.2=broker2_IP:broker2_2888:broker2_3888
server.3=broker3_IP:broker3_2888:broker3_3888
server.4=broker4_IP:broker4_2888:broker4_3888
server.5=broker5_IP:broker5_2888:broker5_3888

*server.properties* on each broker from 1st to 5th (broker.id are unique,
broker_IP:broker_PORT differs for ech broker)

broker.id=N
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://broker_IP:broker_PORT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824log.retention.check.interval.ms=30
zookeeper.connect=127.0.0.1:2181zookeeper.session.timeout.ms=6000zookeeper.connection.timeout.ms=100group.initial.rebalance.delay.ms=0

*producer.properties* on each broker from 1st to 5th

bootstrap.servers=localhost:9092
compression.type=none

*consumer.properties* on each broker from 1st to 5th

zookeeper.connect=127.0.0.1:2181zookeeper.session.timeout.ms=6000zookeeper.connection.timeout.ms=100group.id=test-consumer-group


   -

   Restarting zookeeper on each broker to take effect of zoo.cfg
   -

   Zookeepers gather into cluster
   -

   Topic moved to broker 5

/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181
--topic my-test-topic1 Topic:my-test-topic1 PartitionCount:1
ReplicationFactor:1 Configs: Topic: my-test-topic1 Partition: 0 Leader: 5
Replicas: 5 Isr: 5

Is it normal behavior? Or should it stay on broker 1?

   - Checking message on each broker

/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--from-beginning --topic my-test-topic1 --max-messages 1

*Message is lost* (message is not lost when topic stays on broker 1, so it
is floating situation)

Could you help me with this problem.

TIA


-- 


*Булат Юсупов */ Инженер-программист / Отдел разработки

biusu...@avtodoria.ru / +7 917 889 99 32
skype: usbulat

ООО "Автодория"
+7 843 524 74 12
Казань, Технопарк в сфере высоких технологий "ИТ-парк", Петербургская, 52,
офис 303
www.avtodoria.ru

*Инновации спасают жизни!*