Can I tell mirrormaker2 to start (re)consuming from a specific offset?

2024-09-12 Thread James Cheng
Hi everyone,

I am using Mirrormaker 2. From what I understand, mirrormaker2 does not store 
its offsets in the source cluster's __consumer_offsets topic. Rather, it stores 
its offsets in the destination cluster in the value defined by 
offset.storage.topic

In the __consumer_offsets world, I could manually change offsets by doing this
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group groupname 
--reset-offsets --to-latest--execute

Is there any similar way to manipulate mirrormaker2, to tell it to start 
(re)consuming from a specific offset in a source topic?

Thanks,
-James



Re: [ANNOUNCE] Apache Kafka 3.2.0

2022-05-19 Thread James Cheng
Bruno,

Congrats on the release!

There is a small typo on the page.
> KIP-791 
> 
>  adds method recordMetada() to the StateStoreContext,

Should be
> KIP-791 
> 
>  adds method recordMetadata() to the StateStoreContext,

I know that the page has already been published, but should we fix that typo?

Thanks!
-James


> On May 17, 2022, at 9:01 AM, Bruno Cadonna  wrote:
> 
> The Apache Kafka community is pleased to announce the release for Apache 
> Kafka 3.2.0
> 
> * log4j 1.x is replaced with reload4j (KAFKA-9366)
> * StandardAuthorizer for KRaft (KIP-801)
> * Send a hint to the partition leader to recover the partition (KIP-704)
> * Top-level error code field in DescribeLogDirsResponse (KIP-784)
> * kafka-console-producer writes headers and null values (KIP-798 and KIP-810)
> * JoinGroupRequest and LeaveGroupRequest have a reason attached (KIP-800)
> * Static membership protocol lets the leader skip assignment (KIP-814)
> * Rack-aware standby task assignment in Kafka Streams (KIP-708)
> * Interactive Query v2 (KIP-796, KIP-805, and KIP-806)
> * Connect APIs list all connector plugins and retrieve their configuration 
> (KIP-769)
> * TimestampConverter SMT supports different unix time precisions (KIP-808)
> * Connect source tasks handle producer exceptions (KIP-779)
> 
> All of the changes in this release can be found in the release notes:
> https://www.apache.org/dist/kafka/3.2.0/RELEASE_NOTES.html
> 
> 
> You can download the source and binary release (Scala 2.12 and 2.13) from:
> https://kafka.apache.org/downloads#3.2.0
> 
> ---
> 
> 
> Apache Kafka is a distributed streaming platform with four core APIs:
> 
> 
> ** The Producer API allows an application to publish a stream of records to
> one or more Kafka topics.
> 
> ** The Consumer API allows an application to subscribe to one or more
> topics and process the stream of records produced to them.
> 
> ** The Streams API allows an application to act as a stream processor,
> consuming an input stream from one or more topics and producing an
> output stream to one or more output topics, effectively transforming the
> input streams to output streams.
> 
> ** The Connector API allows building and running reusable producers or
> consumers that connect Kafka topics to existing applications or data
> systems. For example, a connector to a relational database might
> capture every change to a table.
> 
> 
> With these APIs, Kafka can be used for two broad classes of application:
> 
> ** Building real-time streaming data pipelines that reliably get data
> between systems or applications.
> 
> ** Building real-time streaming applications that transform or react
> to the streams of data.
> 
> 
> Apache Kafka is in use at large and small companies worldwide, including
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
> 
> A big thank you for the following 113 contributors to this release!
> 
> A. Sophie Blee-Goldman, Adam Kotwasinski, Aleksandr Sorokoumov, Alexandre 
> Garnier, Alok Nikhil, aSemy, Bounkong Khamphousone, bozhao12, Bruno Cadonna, 
> Chang, Chia-Ping Tsai, Chris Egerton, Colin P. Mccabe, Colin Patrick McCabe, 
> Cong Ding, David Arthur, David Jacot, David Mao, defhacks, dengziming, Ed B, 
> Edwin, florin-akermann, GauthamM-official, GuoPhilipse, Guozhang Wang, Hao 
> Li, Haoze Wu, Idan Kamara, Ismael Juma, Jason Gustafson, Jason Koch, Jeff 
> Kim, jiangyuan, Joel Hamill, John Roesler, Jonathan Albrecht, Jorge Esteban 
> Quilcate Otoya, Josep Prat, Joseph (Ting-Chou) Lin, José Armando García 
> Sancio, Jules Ivanic, Julien Chanaud, Justin Lee, Justine Olshan, Kamal 
> Chandraprakash, Kate Stanley, keashem, Kirk True, Knowles Atchison, Jr, 
> Konstantine Karantasis, Kowshik Prakasam, kurtostfeld, Kvicii, Lee Dongjin, 
> Levani Kokhreidze, lhunyady, Liam Clarke-Hutchinson, liym, loboya~, Lucas 
> Bradstreet, Ludovic DEHON, Luizfrf3, Luke Chen, Marc Löhe, Matthew Wong, 
> Matthias J. Sax, Michal T, Mickael Maison, Mike Lothian, mkandaswamy, Márton 
> Sigmond, Nick Telford, Niket, Okada Haruki, Paolo Patierno, Patrick Stuedi, 
> Philip Nee, Prateek Agarwal, prince-mahajan, Rajini Sivaram, Randall Hauch, 
> Richard, RivenSun, Rob Leland, Ron Dagostino, Sayantanu Dey, Stanislav 
> Vodetskyi, sunshujie1990, Tamara Skokova, Tim Patterson, Tolga H. Dur, Tom 
> Bentley, Tomonari Yamashita, vamossagar12, Vicky Papavasileiou, Victoria Xia, 
> Vijay Krishna, Vincent Jiang, Walker Carlson, wangyap, Wenhao Ji, Wenjun 
> Ruan, Xiaobing Fang, Xiaoyue Xue, xuexiaoyue, Yang Yu, yasar03, Yu, Zhang 
> Hongyi, zzccctv, 工业废水, 彭小漪
> 
> We welcome your help and feedback. For more in

Re: [ANNOUNCE] Apache Kafka 2.6.1

2021-01-11 Thread James Cheng
Thank you Mickael for running the release. Good job everyone!

-James

Sent from my iPhone

> On Jan 11, 2021, at 2:17 PM, Mickael Maison  wrote:
> 
> The Apache Kafka community is pleased to announce the release for
> Apache Kafka 2.6.1.
> 
> This is a bug fix release and it includes fixes and improvements from
> 41 JIRAs, including a few critical bugs.
> 
> All of the changes in this release can be found in the release notes:
> https://www.apache.org/dist/kafka/2.6.1/RELEASE_NOTES.html
> 
> 
> You can download the source and binary release (Scala 2.12 and Scala 2.13) 
> from:
> https://kafka.apache.org/downloads#2.6.1
> 
> 
> ---
> 
> 
> Apache Kafka is a distributed streaming platform with four core APIs:
> 
> ** The Producer API allows an application to publish a stream records
> to one or more Kafka topics.
> 
> ** The Consumer API allows an application to subscribe to one or more
> topics and process the stream of records produced to them.
> 
> ** The Streams API allows an application to act as a stream processor,
> consuming an input stream from one or more topics and producing an
> output stream to one or more output topics, effectively transforming
> the input streams to output streams.
> 
> ** The Connector API allows building and running reusable producers or
> consumers that connect Kafka topics to existing applications or data
> systems. For example, a connector to a relational database might
> capture every change to a table.
> 
> 
> With these APIs, Kafka can be used for two broad classes of application:
> 
> ** Building real-time streaming data pipelines that reliably get data
> between systems or applications.
> 
> ** Building real-time streaming applications that transform or react
> to the streams of data.
> 
> 
> Apache Kafka is in use at large and small companies worldwide,
> including Capital One, Goldman Sachs, ING, LinkedIn, Netflix,
> Pinterest, Rabobank, Target, The New York Times, Uber, Yelp, and
> Zalando, among others.
> 
> 
> A big thank you for the following 36 contributors to this release!
> 
> A. Sophie Blee-Goldman, John Roesler, Bruno Cadonna, Rajini Sivaram,
> Guozhang Wang, Matthias J. Sax, Chris Egerton, Mickael Maison, Randall
> Hauch, leah, Luke Chen, Jason Gustafson, Konstantine Karantasis,
> Michael Bingham, Lucas Bradstreet, Andrew Egelhofer, Micah Paul Ramos,
> Nikolay, Nitesh Mor, Alex Diachenko, xakassi, Shaik Zakir Hussain,
> Stanislav Kozlovski, Stanislav Vodetskyi, Thorsten Hake, Tom Bentley,
> Vikas Singh, feyman2016, high.lee, Dima Reznik, Colin Patrick McCabe,
> Edoardo Comar, Jim Galasyn, Chia-Ping Tsai, Justine Olshan, Levani
> Kokhreidze
> 
> 
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at
> https://kafka.apache.org/
> 
> Thank you!
> 
> Regards,
> Mickael


Re: [ANNOUNCE] New committer: David Jacot

2020-10-19 Thread James Cheng
Congratulations, David!

-James

> On Oct 16, 2020, at 9:01 AM, Gwen Shapira  wrote:
> 
> The PMC for Apache Kafka has invited David Jacot as a committer, and
> we are excited to say that he accepted!
> 
> David Jacot has been contributing to Apache Kafka since July 2015 (!)
> and has been very active since August 2019. He contributed several
> notable KIPs:
> 
> KIP-511: Collect and Expose Client Name and Version in Brokers
> KIP-559: Make the Kafka Protocol Friendlier with L7 Proxies:
> KIP-570: Add leader epoch in StopReplicaReques
> KIP-599: Throttle Create Topic, Create Partition and Delete Topic Operations
> KIP-496 Added an API for the deletion of consumer offsets
> 
> In addition, David Jacot reviewed many community contributions and
> showed great technical and architectural taste. Great reviews are hard
> and often thankless work - but this is what makes Kafka a great
> product and helps us grow our community.
> 
> Thanks for all the contributions, David! Looking forward to more
> collaboration in the Apache Kafka community.
> 
> -- 
> Gwen Shapira



Re: [ANNOUNCE] New committer: Chia-Ping Tsai

2020-10-19 Thread James Cheng
Congratulations Chia-Ping!

-James

> On Oct 19, 2020, at 10:24 AM, Guozhang Wang  wrote:
> 
> Hello all,
> 
> I'm happy to announce that Chia-Ping Tsai has accepted his invitation to
> become an Apache Kafka committer.
> 
> Chia-Ping has been contributing to Kafka since March 2018 and has made 74
> commits:
> 
> https://github.com/apache/kafka/commits?author=chia7712
> 
> He's also authored several major improvements, participated in the KIP
> discussion and PR reviews as well. His major feature development includes:
> 
> * KAFKA-9654: Epoch based ReplicaAlterLogDirsThread creation.
> * KAFKA-8334: Spiky offsetCommit latency due to lock contention.
> * KIP-331: Add default implementation to close() and configure() for serde
> * KIP-367: Introduce close(Duration) to Producer and AdminClients
> * KIP-338: Support to exclude the internal topics in kafka-topics.sh command
> 
> In addition, Chia-Ping has demonstrated his great diligence fixing test
> failures, his impressive engineering attitude and taste in fixing tricky
> bugs while keeping simple designs.
> 
> Please join me to congratulate Chia-Ping for all the contributions!
> 
> 
> -- Guozhang



Re: [ANNOUNCE] New committer: A. Sophie Blee-Goldman

2020-10-19 Thread James Cheng
Congratulations, Sophie!

-James

> On Oct 19, 2020, at 9:40 AM, Matthias J. Sax  wrote:
> 
> Hi all,
> 
> I am excited to announce that A. Sophie Blee-Goldman has accepted her
> invitation to become an Apache Kafka committer.
> 
> Sophie is actively contributing to Kafka since Feb 2019 and has
> accumulated 140 commits. She authored 4 KIPs in the lead
> 
> - KIP-453: Add close() method to RocksDBConfigSetter
> - KIP-445: In-memory Session Store
> - KIP-428: Add in-memory window store
> - KIP-613: Add end-to-end latency metrics to Streams
> 
> and helped to implement two critical KIPs, 429 (incremental rebalancing)
> and 441 (smooth auto-scaling; not just implementation but also design).
> 
> In addition, she participates in basically every Kafka Streams related
> KIP discussion, reviewed 142 PRs, and is active on the user mailing list.
> 
> Thanks for all the contributions, Sophie!
> 
> 
> Please join me to congratulate her!
> -Matthias
> 



Re: [ANNOUNCE] New Kafka PMC member: Matthias J. Sax

2019-04-19 Thread James Cheng
Congrats!!

-James

Sent from my iPhone

> On Apr 18, 2019, at 2:35 PM, Guozhang Wang  wrote:
> 
> Hello Everyone,
> 
> I'm glad to announce that Matthias J. Sax is now a member of Kafka PMC.
> 
> Matthias has been a committer since Jan. 2018, and since then he continued
> to be active in the community and made significant contributions the
> project.
> 
> 
> Congratulations to Matthias!
> 
> -- Guozhang


Re: [ANNOUNCE] New Committer: Randall Hauch

2019-02-14 Thread James Cheng
Congrats, Randall! Well deserved!

-James

Sent from my iPhone

> On Feb 14, 2019, at 6:16 PM, Guozhang Wang  wrote:
> 
> Hello all,
> 
> The PMC of Apache Kafka is happy to announce another new committer joining
> the project today: we have invited Randall Hauch as a project committer and
> he has accepted.
> 
> Randall has been participating in the Kafka community for the past 3 years,
> and is well known as the founder of the Debezium project, a popular project
> for database change-capture streams using Kafka (https://debezium.io). More
> recently he has become the main person keeping Kafka Connect moving
> forward, participated in nearly all KIP discussions and QAs on the mailing
> list. He's authored 6 KIPs and authored 50 pull requests and conducted over
> a hundred reviews around Kafka Connect, and has also been evangelizing
> Kafka Connect at several Kafka Summit venues.
> 
> 
> Thank you very much for your contributions to the Connect community Randall
> ! And looking forward to many more :)
> 
> 
> Guozhang, on behalf of the Apache Kafka PMC


Re: [ANNOUNCE] New Committer: Vahid Hashemian

2019-01-15 Thread James Cheng
Congrats, Vahid!!

-James

> On Jan 15, 2019, at 2:44 PM, Jason Gustafson  wrote:
> 
> Hi All,
> 
> The PMC for Apache Kafka has invited Vahid Hashemian as a project committer 
> and
> we are
> pleased to announce that he has accepted!
> 
> Vahid has made numerous contributions to the Kafka community over the past
> few years. He has authored 13 KIPs with core improvements to the consumer
> and the tooling around it. He has also contributed nearly 100 patches
> affecting all parts of the codebase. Additionally, Vahid puts a lot of
> effort into community engagement, helping others on the mail lists and
> sharing his experience at conferences and meetups.
> 
> We appreciate the contributions and we are looking forward to more.
> Congrats Vahid!
> 
> Jason, on behalf of the Apache Kafka PMC



Re: [ANNOUNCE] Apache Kafka 2.1.0

2018-11-21 Thread James Cheng
Thanks Dong for running the release, and congrats to everyone in the community!

-James

Sent from my iPhone

> On Nov 21, 2018, at 10:09 AM, Dong Lin  wrote:
> 
> The Apache Kafka community is pleased to announce the release for Apache
> Kafka 2.1.0
> 
> 
> This is a major release and includes significant features from 28 KIPs. It
> contains fixes and improvements from 179 JIRSs, including a few critical
> bug fixes. Here is a summary of some notable changes
> 
> ** Java 11 support
> ** Support for Zstandard, which achieves compression comparable to gzip
> with higher compression and especially decompression speeds(KIP-110)
> ** Avoid expiring committed offsets for active consumer group (KIP-211)
> ** Provide Intuitive User Timeouts in The Producer (KIP-91)
> ** Kafka's replication protocol now supports improved fencing of zombies.
> Previously, under certain rare conditions, if a broker became partitioned
> from Zookeeper but not the rest of the cluster, then the logs of replicated
> partitions could diverge and cause data loss in the worst case (KIP-320)
> ** Streams API improvements (KIP-319, KIP-321, KIP-330, KIP-353, KIP-356)
> ** Admin script and admin client API improvements to simplify admin
> operation (KIP-231, KIP-308, KIP-322, KIP-324, KIP-338, KIP-340)
> ** DNS handling improvements (KIP-235, KIP-302)
> 
> 
> All of the changes in this release can be found in the release notes:
> https://www.apache.org/dist/kafka/2.1.0/RELEASE_NOTES.html
> 
> 
> You can download the source and binary release (Scala ) from:
> https://kafka.apache.org/downloads#2.1.0
> 
> ---
> 
> 
> Apache Kafka is a distributed streaming platform with four core APIs:
> 
> 
> ** The Producer API allows an application to publish a stream records to
> one or more Kafka topics.
> 
> ** The Consumer API allows an application to subscribe to one or more
> topics and process the stream of records produced to them.
> 
> ** The Streams API allows an application to act as a stream processor,
> consuming an input stream from one or more topics and producing an
> output stream to one or more output topics, effectively transforming the
> input streams to output streams.
> 
> ** The Connector API allows building and running reusable producers or
> consumers that connect Kafka topics to existing applications or data
> systems. For example, a connector to a relational database might
> capture every change to a table.
> 
> 
> With these APIs, Kafka can be used for two broad classes of application:
> 
> ** Building real-time streaming data pipelines that reliably get data
> between systems or applications.
> 
> ** Building real-time streaming applications that transform or react
> to the streams of data.
> 
> 
> Apache Kafka is in use at large and small companies worldwide, including
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
> 
> A big thank you for the following 100 contributors to this release!
> 
> Ahmed Al Mehdi, Aleksei Izmalkin, Alex Dunayevsky, Amit Sela, Andras
> Katona, Andy Coates, Anna Povzner, Arjun Satish, Attila Sasvari, Aviem Zur,
> Bibin Sebastian, Bill Bejeck, Bob Barrett, Brandon Kirchner, Bridger
> Howell, Chia-Ping Tsai, Colin Hicks, Colin Patrick McCabe, Dhruvil Shah,
> Dong Lin, Edoardo Comar, Eugen Feller, Ewen Cheslack-Postava, Filipe
> Agapito, Flavien Raynaud, Gantigmaa Selenge, Gardner Vickers, Gitomain,
> Gunnar Morling, Guozhang Wang, hashangayasri, huxi, huxihx, Ismael Juma,
> Jagadesh Adireddi, Jason Gustafson, Jim Galasyn, Jimin Hsieh, Jimmy Casey,
> Joan Goyeau, John Roesler, Jon Lee, jonathanskrzypek, Jun Rao, Kamal
> Chandraprakash, Kevin Lafferty, Kevin Lu, Koen De Groote, Konstantine
> Karantasis, lambdaliu, Lee Dongjin, Lincong Li, Liquan Pei, lucapette,
> Lucas Wang, Maciej Bryński, Magesh Nandakumar, Manikumar Reddy, Manikumar
> Reddy O, Mario Molina, Marko Stanković, Matthias J. Sax, Matthias
> Wessendorf, Max Zheng, Mayank Tankhiwale, mgharat, Michal Dziemianko,
> Michał Borowiecki, Mickael Maison, Mutasem Aldmour, Nikolay, nixsticks,
> nprad, okumin, Radai Rosenblatt, radai-rosenblatt, Rajini Sivaram, Randall
> Hauch, Robert Yokota, Rohan, Ron Dagostino, Sam Lendle, Sandor Murakozi,
> Simon Clark, Stanislav Kozlovski, Stephane Maarek, Sébastien Launay, Sönke
> Liebau, Ted Yu, uncleGen, Vahid Hashemian, Viktor Somogyi, wangshao,
> xinzhg, Xiongqi Wesley Wu, Xiongqi Wu, ying-zheng, Yishun Guan, Yu Yang,
> Zhanxiang (Patrick) Huang
> 
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at
> https://kafka.apache.org/
> 
> Thank you!
> 
> Regards,
> Dong


Re: [ANNOUNCE] New committer: Colin McCabe

2018-10-02 Thread James Cheng
Congrats, Colin!

-James

> On Sep 25, 2018, at 1:39 AM, Ismael Juma  wrote:
> 
> Hi all,
> 
> The PMC for Apache Kafka has invited Colin McCabe as a committer and we are
> pleased to announce that he has accepted!
> 
> Colin has contributed 101 commits and 8 KIPs including significant
> improvements to replication, clients, code quality and testing. A few
> highlights were KIP-97 (Improved Clients Compatibility Policy), KIP-117
> (AdminClient), KIP-227 (Incremental FetchRequests to Increase Partition
> Scalability), the introduction of findBugs and adding Trogdor (fault
> injection and benchmarking tool).
> 
> In addition, Colin has reviewed 38 pull requests and participated in more
> than 50 KIP discussions.
> 
> Thank you for your contributions Colin! Looking forward to many more. :)
> 
> Ismael, for the Apache Kafka PMC



Re: [ANNOUNCE] New Kafka PMC member: Dong Lin

2018-08-21 Thread James Cheng
Congrats Dong!

-James

> On Aug 20, 2018, at 3:54 AM, Ismael Juma  wrote:
> 
> Hi everyone,
> 
> Dong Lin became a committer in March 2018. Since then, he has remained
> active in the community and contributed a number of patches, reviewed
> several pull requests and participated in numerous KIP discussions. I am
> happy to announce that Dong is now a member of the
> Apache Kafka PMC.
> 
> Congratulation Dong! Looking forward to your future contributions.
> 
> Ismael, on behalf of the Apache Kafka PMC



Re: [ANNOUNCE] Apache Kafka 2.0.0 Released

2018-07-30 Thread James Cheng
Congrats and great job, everyone! Thanks Rajini for driving the release!

-James

Sent from my iPhone

> On Jul 30, 2018, at 3:25 AM, Rajini Sivaram  wrote:
> 
> The Apache Kafka community is pleased to announce the release for
> 
> Apache Kafka 2.0.0.
> 
> 
> 
> 
> 
> This is a major release and includes significant new features from
> 
> 40 KIPs. It contains fixes and improvements from 246 JIRAs, including
> 
> a few critical bugs. Here is a summary of some notable changes:
> 
> ** KIP-290 adds support for prefixed ACLs, simplifying access control
> management in large secure deployments. Bulk access to topics,
> consumer groups or transactional ids with a prefix can now be granted
> using a single rule. Access control for topic creation has also been
> improved to enable access to be granted to create specific topics or
> topics with a prefix.
> 
> ** KIP-255 adds a framework for authenticating to Kafka brokers using
> OAuth2 bearer tokens. The SASL/OAUTHBEARER implementation is
> customizable using callbacks for token retrieval and validation.
> 
> **Host name verification is now enabled by default for SSL connections
> to ensure that the default SSL configuration is not susceptible to
> man-in-the middle attacks. You can disable this verification for
> deployments where validation is performed using other mechanisms.
> 
> ** You can now dynamically update SSL trust stores without broker restart.
> You can also configure security for broker listeners in ZooKeeper before
> starting brokers, including SSL key store and trust store passwords and
> JAAS configuration for SASL. With this new feature, you can store sensitive
> password configs in encrypted form in ZooKeeper rather than in cleartext
> in the broker properties file.
> 
> ** The replication protocol has been improved to avoid log divergence
> between leader and follower during fast leader failover. We have also
> improved resilience of brokers by reducing the memory footprint of
> message down-conversions. By using message chunking, both memory
> usage and memory reference time have been reduced to avoid
> OutOfMemory errors in brokers.
> 
> ** Kafka clients are now notified of throttling before any throttling is
> applied
> when quotas are enabled. This enables clients to distinguish between
> network errors and large throttle times when quotas are exceeded.
> 
> ** We have added a configuration option for Kafka consumer to avoid
> indefinite blocking in the consumer.
> 
> ** We have dropped support for Java 7 and removed the previously
> deprecated Scala producer and consumer.
> 
> ** Kafka Connect includes a number of improvements and features.
> KIP-298 enables you to control how errors in connectors, transformations
> and converters are handled by enabling automatic retries and controlling the
> number of errors that are tolerated before the connector is stopped. More
> contextual information can be included in the logs to help diagnose problems
> and problematic messages consumed by sink connectors can be sent to a
> dead letter queue rather than forcing the connector to stop.
> 
> ** KIP-297 adds a new extension point to move secrets out of connector
> configurations and integrate with any external key management system.
> The placeholders in connector configurations are only resolved before
> sending the configuration to the connector, ensuring that secrets are stored
> and managed securely in your preferred key management system and
> not exposed over the REST APIs or in log files.
> 
> ** We have added a thin Scala wrapper API for our Kafka Streams DSL,
> which provides better type inference and better type safety during compile
> time. Scala users can have less boilerplate in their code, notably regarding
> Serdes with new implicit Serdes.
> 
> ** Message headers are now supported in the Kafka Streams Processor API,
> allowing users to add and manipulate headers read from the source topics
> and propagate them to the sink topics.
> 
> ** Windowed aggregations performance in Kafka Streams has been largely
> improved (sometimes by an order of magnitude) thanks to the new
> single-key-fetch API.
> 
> ** We have further improved unit testibility of Kafka Streams with the
> kafka-streams-testutil artifact.
> 
> 
> 
> 
> 
> All of the changes in this release can be found in the release notes:
> 
> https://www.apache.org/dist/kafka/2.0.0/RELEASE_NOTES.html
> 
> 
> 
> 
> 
> You can download the source and binary release (Scala 2.11 and Scala 2.12)
> from:
> 
> https://kafka.apache.org/downloads#2.0.0
> 
> 
> 
> 
> ---
> 
> 
> 
> 
> 
> Apache Kafka is a distributed streaming platform with four core APIs:
> 
> 
> 
> ** The Producer API allows an application to publish a stream records to
> 
> one or more Kafka topics.
> 
> 
> 
> ** The Consumer API allows an application to subscribe to one or more
> 
> topic

Re: [ANNOUNCE] Apache Kafka 1.1.0 Released

2018-03-29 Thread James Cheng
Thanks Damian and Rajini for running the release! Congrats and good job 
everyone!

-James

Sent from my iPhone

> On Mar 29, 2018, at 2:27 AM, Rajini Sivaram  wrote:
> 
> The Apache Kafka community is pleased to announce the release for
> 
> Apache Kafka 1.1.0.
> 
> 
> Kafka 1.1.0 includes a number of significant new features.
> 
> Here is a summary of some notable changes:
> 
> 
> ** Kafka 1.1.0 includes significant improvements to the Kafka Controller
> 
>   that speed up controlled shutdown. ZooKeeper session expiration edge
> cases
> 
>   have also been fixed as part of this effort.
> 
> 
> ** Controller improvements also enable more partitions to be supported on a
> 
>   single cluster. KIP-227 introduced incremental fetch requests, providing
> 
>   more efficient replication when the number of partitions is large.
> 
> 
> ** KIP-113 added support for replica movement between log directories to
> 
>   enable data balancing with JBOD.
> 
> 
> ** Some of the broker configuration options like SSL keystores can now be
> 
>   updated dynamically without restarting the broker. See KIP-226 for
> details
> 
>   and the full list of dynamic configs.
> 
> 
> ** Delegation token based authentication (KIP-48) has been added to Kafka
> 
>   brokers to support large number of clients without overloading Kerberos
> 
>   KDCs or other authentication servers.
> 
> 
> ** Several new features have been added to Kafka Connect, including header
> 
>   support (KIP-145), SSL and Kafka cluster identifiers in the Connect REST
> 
>   interface (KIP-208 and KIP-238), validation of connector names (KIP-212)
> 
>   and support for topic regex in sink connectors (KIP-215). Additionally,
> 
>   the default maximum heap size for Connect workers was increased to 2GB.
> 
> 
> ** Several improvements have been added to the Kafka Streams API, including
> 
>   reducing repartition topic partitions footprint, customizable error
> 
>   handling for produce failures and enhanced resilience to broker
> 
>   unavailability.  See KIPs 205, 210, 220, 224 and 239 for details.
> 
> 
> All of the changes in this release can be found in the release notes:
> 
> 
> 
> https://dist.apache.org/repos/dist/release/kafka/1.1.0/RELEASE_NOTES.html
> 
> 
> 
> 
> You can download the source release from:
> 
> 
> 
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka-1.1.0-src.tgz
> 
> 
> 
> and binary releases from:
> 
> 
> 
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.11-1.1.0.tgz
> 
> (Scala 2.11)
> 
> 
> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.12-1.1.0.tgz
> 
> (Scala 2.12)
> 
> 
> --
> 
> 
> 
> Apache Kafka is a distributed streaming platform with four core APIs:
> 
> 
> 
> ** The Producer API allows an application to publish a stream records to
> 
> one or more Kafka topics.
> 
> 
> 
> ** The Consumer API allows an application to subscribe to one or more
> 
> topics and process the stream of records produced to them.
> 
> 
> 
> ** The Streams API allows an application to act as a stream processor,
> 
> consuming an input stream from one or more topics and producing an output
> 
> stream to one or more output topics, effectively transforming the input
> 
> streams to output streams.
> 
> 
> 
> ** The Connector API allows building and running reusable producers or
> 
> consumers that connect Kafka topics to existing applications or data
> 
> systems. For example, a connector to a relational database might capture
> 
> every change to a table.three key capabilities:
> 
> 
> 
> 
> With these APIs, Kafka can be used for two broad classes of application:
> 
> ** Building real-time streaming data pipelines that reliably get data
> 
> between systems or applications.
> 
> 
> 
> ** Building real-time streaming applications that transform or react to the
> 
> streams of data.
> 
> 
> 
> 
> Apache Kafka is in use at large and small companies worldwide, including
> 
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> 
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
> 
> 
> 
> 
> A big thank you for the following 120 contributors to this release!
> 
> 
> Adem Efe Gencer, Alex Good, Andras Beni, Andy Bryant, Antony Stubbs,
> 
> Apurva Mehta, Arjun Satish, bartdevylder, Bill Bejeck, Charly Molter,
> 
> Chris Egerton, Clemens Valiente, cmolter, Colin P. Mccabe,
> 
> Colin Patrick McCabe, ConcurrencyPractitioner, Damian Guy, dan norwood,
> 
> Daniel Wojda, Derrick Or, Dmitry Minkovsky, Dong Lin, Edoardo Comar,
> 
> ekenny, Elyahou, Eugene Sevastyanov, Ewen Cheslack-Postava, Filipe Agapito,
> 
> fredfp, Gavrie Philipson, Gunnar Morling, Guozhang Wang, hmcl, Hugo Louro,
> 
> huxi, huxihx, Igor Kostiakov, Ismael Juma, Ivan Babrou, Jacek Laskowski,
> 
> Jakub Scholz, Jason Gustafson, Jeff Klukas, Jeff Widman, Jeremy
> Custenborder,
> 
> Jeyhun Karimov, Jiangjie (Becket) Qin, Jiangjie Qin, Jimin Hsie

Re: [ANNOUNCE] New Committer: Dong Lin

2018-03-28 Thread James Cheng
Congrats, Dong!

-James

> On Mar 28, 2018, at 10:58 AM, Becket Qin  wrote:
> 
> Hello everyone,
> 
> The PMC of Apache Kafka is pleased to announce that Dong Lin has accepted
> our invitation to be a new Kafka committer.
> 
> Dong started working on Kafka about four years ago, since which he has
> contributed numerous features and patches. His work on Kafka core has been
> consistent and important. Among his contributions, most noticeably, Dong
> developed JBOD (KIP-112, KIP-113) to handle disk failures and to reduce
> overall cost, added deleteDataBefore() API (KIP-107) to allow users
> actively remove old messages. Dong has also been active in the community,
> participating in KIP discussions and doing code reviews.
> 
> Congratulations and looking forward to your future contribution, Dong!
> 
> Jiangjie (Becket) Qin, on behalf of Apache Kafka PMC



Re: [ANNOUNCE] Apache Kafka 1.0.1 Released

2018-03-06 Thread James Cheng
Congrats, everyone! Thanks for driving the release, Ewen!

-James

> On Mar 6, 2018, at 1:22 PM, Guozhang Wang  wrote:
> 
> Ewen, thanks for driving the release!!
> 
> 
> Guozhang
> 
> On Tue, Mar 6, 2018 at 1:14 PM, Ewen Cheslack-Postava  wrote:
> 
>> The Apache Kafka community is pleased to announce the release for Apache
>> Kafka
>> 1.0.1.
>> 
>> This is a bugfix release for the 1.0 branch that was first released with
>> 1.0.0 about 4 months ago. We've fixed 49 issues since that release. Most of
>> these are non-critical, but in aggregate these fixes will have significant
>> impact. A few of the more significant fixes include:
>> 
>> * KAFKA-6277: Make loadClass thread-safe for class loaders of Connect
>> plugins
>> * KAFKA-6185: Selector memory leak with high likelihood of OOM in case of
>> down conversion
>> * KAFKA-6269: KTable state restore fails after rebalance
>> * KAFKA-6190: GlobalKTable never finishes restoring when consuming
>> transactional messages
>> * KAFKA-6529: Stop file descriptor leak when client disconnects with
>> staged receives
>> * KAFKA-6238: Issues with protocol version when applying a rolling upgrade
>> to 1.0.0
>> 
>> 
>> All of the changes in this release can be found in the release notes:
>> 
>> 
>> https://dist.apache.org/repos/dist/release/kafka/1.0.1/RELEASE_NOTES.html
>> 
>> 
>> 
>> You can download the source release from:
>> 
>> 
>> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
>> kafka-1.0.1-src.tgz
>> 
>> 
>> and binary releases from:
>> 
>> 
>> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
>> kafka_2.11-1.0.1.tgz
>> (Scala 2.11)
>> 
>> https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.1/
>> kafka_2.12-1.0.1.tgz
>> (Scala 2.12)
>> 
>> 
>> ---
>> 
>> 
>> Apache Kafka is a distributed streaming platform with four core APIs:
>> 
>> 
>> ** The Producer API allows an application to publish a stream records to
>> one or more Kafka topics.
>> 
>> 
>> ** The Consumer API allows an application to subscribe to one or more
>> topics and process the stream of records produced to them.
>> 
>> 
>> ** The Streams API allows an application to act as a stream processor,
>> consuming an input stream from one or more topics and producing an output
>> stream to one or more output topics, effectively transforming the input
>> streams to output streams.
>> 
>> 
>> ** The Connector API allows building and running reusable producers or
>> consumers that connect Kafka topics to existing applications or data
>> systems. For example, a connector to a relational database might capture
>> every change to a table.three key capabilities:
>> 
>> 
>> 
>> With these APIs, Kafka can be used for two broad classes of application:
>> 
>> 
>> ** Building real-time streaming data pipelines that reliably get data
>> between systems or applications.
>> 
>> 
>> ** Building real-time streaming applications that transform or react to the
>> streams of data.
>> 
>> 
>> 
>> Apache Kafka is in use at large and small companies worldwide, including
>> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
>> Target, The New York Times, Uber, Yelp, and Zalando, among others.
>> 
>> 
>> 
>> A big thank you for the following 36 contributors to this release!
>> 
>> Alex Good, Andras Beni, Andy Bryant, Arjun Satish, Bill Bejeck, Colin P.
>> Mccabe, Colin Patrick McCabe, ConcurrencyPractitioner, Damian Guy, Daniel
>> Wojda, Dong Lin, Edoardo Comar, Ewen Cheslack-Postava, Filipe Agapito,
>> fredfp, Guozhang Wang, huxihx, Ismael Juma, Jason Gustafson, Jeremy
>> Custenborder, Jiangjie (Becket) Qin, Joel Hamill, Konstantine Karantasis,
>> lisa2lisa, Logan Buckley, Manjula K, Matthias J. Sax, Nick Chiu, parafiend,
>> Rajini Sivaram, Randall Hauch, Robert Yokota, Ron Dagostino, tedyu,
>> Yaswanth Kumar, Yu.
>> 
>> 
>> We welcome your help and feedback. For more information on how to
>> report problems,
>> and to get involved, visit the project website at http://kafka.apache.org/
>> 
>> 
>> Thank you!
>> Ewen
>> 
> 
> 
> 
> -- 
> -- Guozhang



Re: [ANNOUNCE] New Kafka PMC Member: Rajini Sivaram

2018-01-18 Thread James Cheng
Congrats Rajini!

-James

Sent from my iPhone

> On Jan 17, 2018, at 10:48 AM, Gwen Shapira  wrote:
> 
> Dear Kafka Developers, Users and Fans,
> 
> Rajini Sivaram became a committer in April 2017.  Since then, she remained
> active in the community and contributed major patches, reviews and KIP
> discussions. I am glad to announce that Rajini is now a member of the
> Apache Kafka PMC.
> 
> Congratulations, Rajini and looking forward to your future contributions.
> 
> Gwen, on behalf of Apache Kafka PMC


Re: [VOTE] KIP-247: Add public test utils for Kafka Streams

2018-01-18 Thread James Cheng
+1 (non-binding)

-James

Sent from my iPhone

> On Jan 17, 2018, at 6:09 PM, Matthias J. Sax  wrote:
> 
> Hi,
> 
> I would like to start the vote for KIP-247:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams
> 
> 
> -Matthias
> 


Re: [ANNOUNCE] New committer: Matthias J. Sax

2018-01-12 Thread James Cheng
Congrats, Matthias!! Well deserved!

-James

> On Jan 12, 2018, at 2:59 PM, Guozhang Wang  wrote:
> 
> Hello everyone,
> 
> The PMC of Apache Kafka is pleased to announce Matthias J. Sax as our
> newest Kafka committer.
> 
> Matthias has made tremendous contributions to Kafka Streams API since early
> 2016. His footprint has been all over the places in Streams: in the past
> two years he has been the main driver on improving the join semantics
> inside Streams DSL, summarizing all their shortcomings and bridging the
> gaps; he has also been largely working on the exactly-once semantics of
> Streams by leveraging on the transaction messaging feature in 0.11.0. In
> addition, Matthias have been very active in community activity that goes
> beyond mailing list: he's getting the close to 1000 up votes and 100
> helpful flags on SO for answering almost all questions about Kafka Streams.
> 
> Thank you for your contribution and welcome to Apache Kafka, Matthias!
> 
> 
> 
> Guozhang, on behalf of the Apache Kafka PMC



Re: Insanely long recovery time with Kafka 0.11.0.2

2018-01-11 Thread James Cheng
We saw this as well, when updating from 0.10.1.1 to 0.11.0.1.

Have you restarted your brokers since then? Did it take 8h to start up again, 
or did it take its normal 45 minutes?

I don't think it's related to the crash/recovery. Rather, I think it's due to 
the upgrade from 0.10.1.1 to 0.11.0.1

I think 0.11.0.0 introduced some new files in the log directories (maybe the 
.snapshot files?). The first time 0.11.0.0 (or newer) started up, it scanned 
the entire .log files to create... something. It scanned *all* the segments, 
not just the most recent ones. I think that's why it took so long. I think 
normally log recovery only looks at the most recent segments.

We noticed this only on the FIRST boot when running 0.11+. From then on, 
startups were our normal length of time.

In your https://pastebin.com/tZqze4Ya, I see a line like:
[2018-01-05 13:43:34,776] INFO Completed load of log webapi-event-1 with 2 log 
segments, log start offset 1068104 and log end offset 1236587 in 9547 ms 
(kafka.log.Log)

That line says that that partition took 9547ms (9.5 seconds) to load/recover. 
We had large partitions that took 30 minutes to recover, on that first boot. 
When I used strace to see what I/O the broker was doing, it was reading ALL the 
segments for the partitions.

-James



> On Jan 11, 2018, at 10:56 AM, Vincent Rischmann  wrote:
> 
> If anyone else has any idea, I'd love to hear it.
> 
> Meanwhile, I'll resume upgrading my brokers and hope it doesn't crash and/or 
> take so much time for recovery.
> 
> On Sat, Jan 6, 2018, at 7:25 PM, Vincent Rischmann wrote:
>> Hi,
>> 
>> just to clarify: this is the cause of the crash 
>> https://pastebin.com/GuF60kvF in the broker logs, which is why I 
>> referenced https://issues.apache.org/jira/browse/KAFKA-4523
>> 
>> I had this crash some time ago and yesterday was in the process of 
>> upgrading my brokers to 0.11.0.2 in part to address this bug, 
>> unfortunately while stopping this particular broker it crashed.
>> 
>> What I don't understand is why the recovery time after upgrading was so 
>> high. A couple of month ago when a broker crashed due to this bug and 
>> recovered it didn't take nearly as long. In fact, I never had a recovery 
>> that long on any broker, even when they suffered a kernel panic or power 
>> failure.
>> 
>> We have quite a bit of data, however as I said with 0.10.1.1 it "only" 
>> took around 45 minutes. The broker did do a lot of I/O while recovering 
>> (to the point where even just running ls was painfully slow) but afair 
>> it was the same every time a broker recovered. Volume of data hasn't 
>> changed much since the last crash with 0.10.1.1, in fact I removed a lot 
>> of data recently.
>> 
>> Our brokers are running with 3 SATA disks in raid 0 (using mdadm), while 
>> recovering yesterday atop reported around 200MB/s of read throughput. 
>> 
>> Here are some graphs from our monitoring:
>> 
>> - CPU usage https://vrischmann.me/files/fr3/cpu.png
>> - Disk IO https://vrischmann.me/files/fr3/disk_io.png and 
>> https://vrischmann.me/files/fr3/disk_usage.png
>> 
>> On Sat, Jan 6, 2018, at 4:53 PM, Ted Yu wrote:
>>> Ismael:
>>> We're on the same page.
>>> 
>>> 0.11.0.2 was released on 17 Nov 2017.
>>> 
>>> By 'recently' in my previous email I meant the change was newer.
>>> 
>>> Vincent:
>>> Did the machine your broker ran on experience power issue ?
>>> 
>>> Cheers
>>> 
>>> On Sat, Jan 6, 2018 at 7:36 AM, Ismael Juma  wrote:
>>> 
 Hi Ted,
 
 The change you mention is not part of 0.11.0.2.
 
 Ismael
 
 On Sat, Jan 6, 2018 at 3:31 PM, Ted Yu  wrote:
 
> bq. WARN Found a corrupted index file due to requirement failed: Corrupt
> index found, index file
> (/data/kafka/data-processed-15/54942918.index)
> 
> Can you search backward for 54942918.index in the log to see
 if
> we can find the cause for corruption ?
> 
> This part of code was recently changed by :
> 
> KAFKA-6324; Change LogSegment.delete to deleteIfExists and harden log
> recovery
> 
> Cheers
> 
> On Sat, Jan 6, 2018 at 7:18 AM, Vincent Rischmann 
> wrote:
> 
>> Here's an excerpt just after the broker started:
>> https://pastebin.com/tZqze4Ya
>> 
>> After more than 8 hours of recovery the broker finally started. I
 haven't
>> read through all 8 hours of log but the parts I looked at are like the
>> pastebin.
>> 
>> I'm not seeing much in the log cleaner logs either, they look normal.
 We
>> have a couple of compacted topics but seems only the consumer offsets
 is
>> ever compacted (the other topics don't have much traffic).
>> 
>> On Sat, Jan 6, 2018, at 12:02 AM, Brett Rann wrote:
>>> What do the broker logs say its doing during all that time?
>>> 
>>> There are some consumer offset / log cleaner bugs which caused us
>> similarly
>>> log delays. that was easily visible by

Re: [ANNOUNCE] New committer: Onur Karaman

2017-11-06 Thread James Cheng
Congrats Onur! Well deserved!

-James

> On Nov 6, 2017, at 9:24 AM, Jun Rao  wrote:
> 
> Hi, everyone,
> 
> The PMC of Apache Kafka is pleased to announce a new Kafka committer Onur
> Karaman.
> 
> Onur's most significant work is the improvement of Kafka controller, which
> is the brain of a Kafka cluster. Over time, we have accumulated quite a few
> correctness and performance issues in the controller. There have been
> attempts to fix controller issues in isolation, which would make the code
> base more complicated without a clear path of solving all problems. Onur is
> the one who took a holistic approach, by first documenting all known
> issues, writing down a new design, coming up with a plan to deliver the
> changes in phases and executing on it. At this point, Onur has completed
> the two most important phases: making the controller single threaded and
> changing the controller to use the async ZK api. The former fixed multiple
> deadlocks and race conditions. The latter significantly improved the
> performance when there are many partitions. Experimental results show that
> Onur's work reduced the controlled shutdown time by a factor of 100 times
> and the controller failover time by a factor of 3 times.
> 
> Congratulations, Onur!
> 
> Thanks,
> 
> Jun (on behalf of the Apache Kafka PMC)



Re: [ANNOUNCE] Apache Kafka 1.0.0 Released

2017-11-01 Thread James Cheng
th these APIs, Kafka can be used for two broad classes of application:
> 
> ** Building real-time streaming data pipelines that reliably get data between
> systems or applications.
> 
> ** Building real-time streaming applications that transform or react
> to the streams
> of data.
> 
> 
> Apache Kafka is in use at large and small companies worldwide, including
> Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
> Target, The New York Times, Uber, Yelp, and Zalando, among others.
> 
> 
> A big thank you for the following 108 contributors to this release!
> 
> Abhishek Mendhekar, Xi Hu, Andras Beni, Andrey Dyachkov, Andy Chambers,
> Apurva Mehta, Armin Braun, Attila Kreiner, Balint Molnar, Bart De Vylder,
> Ben Stopford, Bharat Viswanadham, Bill Bejeck, Boyang Chen, Bryan Baugher,
> Colin P. Mccabe, Koen De Groote, Dale Peakall, Damian Guy, Dana Powers,
> Dejan Stojadinović, Derrick Or, Dong Lin, Zhendong Liu, Dustin Cote,
> Edoardo Comar, Eno Thereska, Erik Kringen, Erkan Unal, Evgeny Veretennikov,
> Ewen Cheslack-Postava, Florian Hussonnois, Janek P, Gregor Uhlenheuer,
> Guozhang Wang, Gwen Shapira, Hamidreza Afzali, Hao Chen, Jiefang He, Holden
> Karau, Hooman Broujerdi, Hugo Louro, Ismael Juma, Jacek Laskowski, Jakub
> Scholz, James Cheng, James Chien, Jan Burkhardt, Jason Gustafson, Jeff
> Chao, Jeff Klukas, Jeff Widman, Jeremy Custenborder, Jeyhun Karimov,
> Jiangjie Qin, Joel Dice, Joel Hamill, Jorge Quilcate Otoya, Kamal C, Kelvin
> Rutt, Kevin Lu, Kevin Sweeney, Konstantine Karantasis, Perry Lee, Magnus
> Edenhill, Manikumar Reddy, Manikumar Reddy O, Manjula Kumar, Mariam John,
> Mario Molina, Matthias J. Sax, Max Zheng, Michael Andre Pearce, Michael
> André Pearce, Michael G. Noll, Michal Borowiecki, Mickael Maison, Nick
> Pillitteri, Oleg Prozorov, Onur Karaman, Paolo Patierno, Pranav Maniar,
> Qihuang Zheng, Radai Rosenblatt, Alex Radzish, Rajini Sivaram, Randall
> Hauch, Richard Yu, Robin Moffatt, Sean McCauliff, Sebastian Gavril, Siva
> Santhalingam, Soenke Liebau, Stephane Maarek, Stephane Roset, Ted Yu,
> Thibaud Chardonnens, Tom Bentley, Tommy Becker, Umesh Chaudhary, Vahid
> Hashemian, Vladimír Kleštinec, Xavier Léauté, Xianyang Liu, Xin Li, Linhua
> Xin
> 
> 
> We welcome your help and feedback. For more information on how to report
> problems, and to get involved, visit the project website at
> http://kafka.apache.org/
> 
> 
> 
> 
> Thanks,
> Guozhang Wang



How do I instantiate a metrics reporter in Kafka Streams, with custom config?

2017-11-01 Thread James Cheng
Hi, we have a KafkaStreams app. We specify a custom metric reporter by doing:

Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"kafka-broker1:9092");
config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, 
"com.mycompany.MetricReporter");
config.put("custom-key-for-metric-reporter", "value");

Previously, our metric reporter would get passed the properties object upon 
instantiation, and would retrieve its custom config. It appears that in recent 
releases, that in order to apply my metric reporter to the consumer, that I 
have to specify my config as "consumer.custom-key-for-metric-reporter". And for 
the producer, I have to specify it as 
"producer.custom-key-for-metric-reporter". If I don't prefix it with 
"consumer." or "producer." , it appears it gets stripped out from the 
properties object that is passsed to my metric reporter when the 
consumer/producer gets initialized, and so my metric reporter can't get its 
config.

That means that if I have a metrics reporter and I want to collect producer and 
consumer metrics, as well as kafka-streams metrics, that I have to specify my 
custom config 3 times:
1) consumer.custom-key-for-metric-reporter
2) producer.custom-key-for-metric-reporter
3) custom-key-for-metric-reporter

Is that behavior as designed or is that a bug? What is the desired behavior for 
non-recognized keys in the properties object?

And actually, for the metrics.reporter class itself, am I supposed to specify 
it as:

metrics.reporter

or

metric.reporters
producer.metric.reporters
consumer.metric.reporters

Thanks,
-James



Re: In which scenarios would "INVALID_REQUEST" be returned for "Offset Request"

2017-09-24 Thread James Cheng
Your client library might be sending a message that is too old or too new for 
your broker to understand.

What version is your Kafka client library, and what version is your broker?

-James

Sent from my iPhone

> On Sep 22, 2017, at 4:09 PM, Vignesh  wrote:
> 
> Hi,
> 
> In which scenarios would we get "INVALID_REQUEST" for a Version 1 "Offset
> Request"  (https://kafka.apache.org/protocol#The_Messages_Offsets)  ?
> 
> I searched for INVALID_REQUEST in https://github.com/apache/kafka and below
> is the only file that seems related.
> 
> https://github.com/apache/kafka/blob/96ba21e0dfb1a564d5349179d844f020abf1e08b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
> 
> Here, I see that invalid request is returned only on duplicate topic
> partition. Is that the only reason?
> 
> The description for the error is broader though.
> 
> "
> This most likely occurs because of a request being malformed by the client
> library or the message was sent to an incompatible broker. See the broker
> logs for more details.
> 
> "
> 
> Thanks,
> Vignesh.


Re: Metrics: committed offset, client version

2017-09-20 Thread James Cheng
KIP-188 is expected to be in the upcoming 1.0.0 release. It will add 
client-side JMX metrics that show the client version number.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-188+-+Add+new+metrics+to+support+health+checks
 


-James



> On Sep 20, 2017, at 2:37 AM, Stas Chizhov  wrote:
> 
> Hi!
> 
> I am wondering if there are broker/client metrics for:
> - client version (to keep track of clients that needs an upgrade)
> - committed offsets (to detect situations when commits fail systematically
> with everything else being ok)
> 
> Thank you,
> Stanislav.



Re: Kafka Internals Video/Blog

2017-09-20 Thread James Cheng
This recent meetup had a presentation of the internals of the Kafka Controller. 

https://www.meetup.com/Stream-Processing-Meetup-LinkedIn/events/242656767/ 


The video is not yet available, but hopefully will be soon.

-James

> On Sep 20, 2017, at 9:49 AM, Raghav  wrote:
> 
> Hi
> 
> Just wondering if there is any video/blog that goes over Kafka Internal and
> under the hood design and implementation details. I am a newbie and I would
> like to dabble with the code and understand design of it. Just wondering if
> there is any video, blog etc that goes over it ?
> 
> Thanks.
> 
> -- 
> Raghav



Re: Improving Kafka State Store performance

2017-09-16 Thread James Cheng
In addition to the measurements that you are doing yourself, Kafka Streams also 
has its own metrics. They are exposed via JMX, if you have that enabled:

http://kafka.apache.org/documentation/#monitoring 


If you set metrics.recording.level="debug", you can see a bunch of metrics 
around the state stores. Stuff like put-latency-avg, for example.

See http://kafka.apache.org/documentation/#kafka_streams_store_monitoring 


-James

> On Sep 16, 2017, at 6:14 AM, dev loper  wrote:
> 
> Hi Kafka Streams Users,
> 
> I am trying to improve the performance of Kafka Streams State Store
> Persistent Store. In our application we are using Kafka Streams Processor
> API  and using Persistent State Store.. My application when starts up it
> performing well but over a period of time the performance deteriorated. I
> am computing certain results in computeAnalytics method and this method is
> not taking time at all. This method is being called within both process and
> punctuate and I am storing the updated object back to store. Over the
> period of time its taking huge time for completing the punctuate process
> and I could see majority of the time is spent in storing the records and
> Iterating the records. The record size is just 2500 per partition. I am not
> where I am going wrong and how can I improve the performance.
> 
> Below is one such sample log record.
> 
> INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records ::
> 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> 27605toal time Taken to retrieve Records :: 12787 Total time Taken :: 40394
> 
> Below I have given my pseudo code for my processor which exactly resembles
> the code which I am using in my application.
> 
> MyProcessor(){
> 
>  process(Key objectkey, Update eventupdate){
>   long timestamp=context.timestamp();
>   AnalyticeObj storeobj=store.get(objectkey);
> 
>   if( storeobj ===null)
> {
>  storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
> }
> else
>{
>   storeobj.update(eventupdate,timestamp)
>}
> storeobj=storeobj.computeAnalytics();
> 
>   store.put(objectkey,storeobj);
>  context.commit();
> }
> // Every 5 seconds
> punctuate(long timestamp)
> {
> long startTime = System.currentTimeMillis();
> long totalTimeTakenToProcessRecords=0;
> long totalTimeTakenToStoreRecords=0;
> long counter=0;
> KeyValueIterator iter=this.visitStore.all();
> while (iter.hasNext()) {
> KeyValue entry = iter.next();
> 
>if(AnalyticeObj.hasExpired(timestamp)
> store.remove(entry.key)
>  else
>  {
>long processStartTime=System.currentTimeMillis();
> AnalyticeObj storeobj= entry.value.computeAnalytics(timestamp);
> 
> totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords+(System.currentTimeMillis()-processStartTime);
> 
> long storeStartTime=System.currentTimeMillis();
>  store.put(entry.key,storeobj);
> 
> totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(System.currentTimeMillis()-storeStartTime);
>   }
>   counter++;
> }
> logger.info(" Time Metrics for punctuate  "
>" for TimeStamp :: " + "" + timestamp + " processed
> Records :: "
>+ counter +" totalTimeTakenToProcessRecords ::
> "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> "+totalTimeTakenToStoreRecords
>+"toal time Taken to retrieve Records :: "+
> (System.currentTimeMillis() -
> (startTime+totalTimeTakenToProcessRecords+totalTimeTakenToStoreRecords))+"
> Total time Taken :: " + (System.currentTimeMillis() - startTime));
> }
> }



Re: Consumer group metadata retention

2017-07-26 Thread James Cheng
The offsets.retention.minutes value (1440 = 24 hours = 1 day) is a broker level 
configuration, and can't be changed dynamically during runtime. You would have 
to modify the broker configurations, and restart the brokers.

-James

> On Jul 25, 2017, at 9:43 PM, Raghu Angadi  wrote:
> 
> I am writing an exactly-once Kafka sink for Apache Beam.
> In order to avoid duplicates due to retries, it stores a sequential id and
> producer signature in consumer group metadata, and commits it atomically
> with messages (using sendOffsetsToTransaction()).
> 
> I have a couple of clarification questions on partition metadata associated
> with a consumer group.
> 
>   - Looks like a partition number could be larger than number of
>   partitions for a topic. Is this formally supported (which is great!)?
>  - E.g. I was able to set and read partition metadata for partition
>  #20 for a topic with 4 partitions (though `console-consumer-group` didn't
>  quite work.).
>   - This is critical information for a sink. But looks like the metadata
>   gets purged in 24 hours (server config) if it is inactive. Is there a way
>   to set a longer TTL programatically?
> 
> Thanks.
> Raghu.



Re: Tuning up mirror maker for high thruput

2017-07-24 Thread James Cheng
Todd,

I have a question about the OS/broker tuning that you are talking about on the 
source cluster. Aside from mirrormaker (which you say should be running in the 
remote destination datacenter), presumably there will be other consumers in the 
source datacenter as well. How does the OS/broker tuning affect those consumers 
that are close to the source datacenter? Will they continue to function well?

-James

> On Jul 23, 2017, at 7:16 AM, Todd Palino  wrote:
> 
> One of the best pieces of advice I can offer is that you really need to run
> the mirror maker in the same physical/network location as the Kafka cluster
> you are producing to. Latency on the consumer side can be more easily
> absorbed than latency on the producer side, as to assure that we have
> proper message ordering and reliability, we need to restrict in flight
> batches to 1. So that means that our produce connection is contstrained to
> be very thin, and latency makes a huge impact. Meanwhile, on the consume
> side we’re fetching large batches of messages, many at a time, so
> round-trip latency has less of an impact. I really can’t stress this
> enough. We set up some mirror makers in the opposite configuration for
> security reasons, and it’s been a huge problem even with tuning.
> 
> In addition to this, you will want to assure that your OS (and then the
> mirror maker and broker) tuning is taking into account the latency. Here’s
> a good reference for the OS side (for Linux):
> http://linuxczar.net/blog/2016/09/18/bandwidth-delay-product/
> 
> Once you have the OS tuned, you’ll need to adjust the broker tuning on the
> clusters you are consuming from, since that is the high latency side. The
> configuration for that is socket.send.buffer.bytes, and it probably makes
> sense to set this to -1 (which means use the OS configuration). You can do
> the same with socket.receive.buffer.bytes, but it’s not as critical with
> this setup. On the mirror maker, the configuration is on the consumer side,
> and it’s called receive.buffer.bytes. Again, you can set this to -1 to use
> the OS configuration. Make sure to restart the applications after making
> all these changes, of course.
> 
> -Todd
> 
> 
> On Sat, Jul 22, 2017 at 1:27 AM, James Cheng  wrote:
> 
>> Becket Qin from LinkedIn spoke at a meetup about how to tune the Kafka
>> producer. One scenario that he described was tuning for situations where
>> you had high network latency. See slides at https://www.slideshare.net/
>> mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600
>> and video at https://youtu.be/oQe7PpDDdzA
>> 
>> -James
>> 
>> Sent from my iPhone
>> 
>>> On Jul 21, 2017, at 9:25 AM, Sunil Parmar  wrote:
>>> 
>>> We're trying to set up mirror maker to mirror data from EU dc to US dc.
>> The
>>> network delay is ~150 ms. In recent test; we realized that mirror maker
>> is
>>> not keeping up with load and have a lag trending upward all the time.
>>> 
>>> What are configurations that can be tuned up to make it work for the
>> higher
>>> throughput ?
>>> How to decide number of producer and consumer threads ? ( number of topic
>>> partitions / brokers ? )
>>> 
>>> 
>>> *Environment* ( both source and destination cluster )
>>> 
>>> Kafka version 0.9 ( Cloudera 0.9.0.0+kafka2.0.0+188 )
>>> 
>>> queue.size = 1
>>> queue.byte.size = 100MB
>>> 
>>> 2 brokers on source, 3 brokers on destination
>>> 
>>> 
>>> *Mirror maker configs :*
>>> 
>>> Producer properties :
>>> request.timeout.ms=12
>>> block.on.buffer.full=TRUE
>>> retries=20
>>> acks=all
>>> 
>>> 
>>> Consumer properties:
>>> request.timeout.ms=12
>>> auto.offset.reset=latest
>>> enable.auto.commit=false
>>> 
>>> We've configured 4 producer and consumer threads.
>>> There is no security set up as of now so it's all PLAINTEXT.
>>> 
>>> We have 4 topics are white listed to sync from EU to US. Only one of them
>>> is high throughput. We also have a message handler to strip off some
>>> sensitive information from EU to US but it only works on a low thru put
>>> topic; the message handler still try to process the other topics but let
>> it
>>> pass thru.
>>> 
>>> Thanks,
>>> Sunil Parmar
>> 
> 
> 
> 
> -- 
> *Todd Palino*
> Senior Staff Engineer, Site Reliability
> Data Infrastructure Streaming
> 
> 
> 
> linkedin.com/in/toddpalino



Re: Tuning up mirror maker for high thruput

2017-07-22 Thread James Cheng
Becket Qin from LinkedIn spoke at a meetup about how to tune the Kafka 
producer. One scenario that he described was tuning for situations where you 
had high network latency. See slides at 
https://www.slideshare.net/mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600
 and video at https://youtu.be/oQe7PpDDdzA

-James

Sent from my iPhone

> On Jul 21, 2017, at 9:25 AM, Sunil Parmar  wrote:
> 
> We're trying to set up mirror maker to mirror data from EU dc to US dc. The
> network delay is ~150 ms. In recent test; we realized that mirror maker is
> not keeping up with load and have a lag trending upward all the time.
> 
> What are configurations that can be tuned up to make it work for the higher
> throughput ?
> How to decide number of producer and consumer threads ? ( number of topic
> partitions / brokers ? )
> 
> 
> *Environment* ( both source and destination cluster )
> 
> Kafka version 0.9 ( Cloudera 0.9.0.0+kafka2.0.0+188 )
> 
> queue.size = 1
> queue.byte.size = 100MB
> 
> 2 brokers on source, 3 brokers on destination
> 
> 
> *Mirror maker configs :*
> 
> Producer properties :
> request.timeout.ms=12
> block.on.buffer.full=TRUE
> retries=20
> acks=all
> 
> 
> Consumer properties:
> request.timeout.ms=12
> auto.offset.reset=latest
> enable.auto.commit=false
> 
> We've configured 4 producer and consumer threads.
> There is no security set up as of now so it's all PLAINTEXT.
> 
> We have 4 topics are white listed to sync from EU to US. Only one of them
> is high throughput. We also have a message handler to strip off some
> sensitive information from EU to US but it only works on a low thru put
> topic; the message handler still try to process the other topics but let it
> pass thru.
> 
> Thanks,
> Sunil Parmar


Re: Consumer offsets partitions size much bigger than others

2017-07-18 Thread James Cheng
It's possible that the log-cleaning thread has crashed. That is the thread that 
implements log compaction.

Look in the log-cleaner.log file in your kafka debuglog directory to see if 
there is any indication that it has crashed (error messages, stack traces, etc).

What version of kafka are you using? 0.10 and prior had some bugs in the 
log-cleaner thread that might sometimes cause it to crash. Those were fixed in 
later versions, but it's always possible there might still be more bugs there.

I notice that your __consumer_offsets topic only has replication-factor=1. How 
many brokers are in your cluster? You should increase the replication factor to 
3. 

Older versions of kafka would try to auto-create the __consumer_offsets topic 
with replication-factor 3 but if there were fewer than 3 brokers in the 
cluster, then they would simply use the number of brokers in the cluster. What 
that means is that if your cluster only had 1 broker running at the time the 
topic was auto-created, that it would be created with replication-factor 1. 
This has been fixed in later brokers, so that it will always create topics with 
the specified number of replicas or will throw loud errors in the event you 
don't have enough brokers.

-James

> On Jul 18, 2017, at 8:44 AM, Luciano Afranllie  
> wrote:
> 
> Hi
> 
> One of our Kafka brokers was running out of disk space and when we checked
> the file size in the kafka log dir we observed the following
> 
> $ du -h . --max-depth=2 | grep '__consumer_offsets'
> 4.0K./kafka-logs/__consumer_offsets-16
> 4.0K./kafka-logs/__consumer_offsets-40
> 35G ./kafka-logs/__consumer_offsets-44
> 4.0K./kafka-logs/__consumer_offsets-8
> 4.0K./kafka-logs/__consumer_offsets-38
> 4.0K./kafka-logs/__consumer_offsets-20
> 4.0K./kafka-logs/__consumer_offsets-34
> 4.0K./kafka-logs/__consumer_offsets-18
> 4.0K./kafka-logs/__consumer_offsets-32
> 251G./kafka-logs/__consumer_offsets-14
> 4.0K./kafka-logs/__consumer_offsets-4
> 4.0K./kafka-logs/__consumer_offsets-26
> 4.0K./kafka-logs/__consumer_offsets-12
> 4.0K./kafka-logs/__consumer_offsets-30
> 4.0K./kafka-logs/__consumer_offsets-6
> 4.0K./kafka-logs/__consumer_offsets-2
> 4.0K./kafka-logs/__consumer_offsets-24
> 4.0K./kafka-logs/__consumer_offsets-36
> 4.0K./kafka-logs/__consumer_offsets-46
> 4.0K./kafka-logs/__consumer_offsets-42
> 4.0K./kafka-logs/__consumer_offsets-22
> 4.0K./kafka-logs/__consumer_offsets-0
> 4.0K./kafka-logs/__consumer_offsets-28
> 4.0K./kafka-logs/__consumer_offsets-10
> 4.0K./kafka-logs/__consumer_offsets-48
> 
> As you can see, two of the log files (partition 44 and 14) have a huge
> size. Do you have a hint to understand what could be happening here? May be
> for some reason this partitions are not being compacted?
> 
> By the way, this is the description of the __consumer_offsets topic.
> 
> # ./bin/kafka-topics.sh --describe --zookeeper x.x.x.x:2181 --topic
> __consumer_offsets
> Topic:__consumer_offsetsPartitionCount:50   ReplicationFactor:1
> 
> Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=uncompressed
>Topic: __consumer_offsets   Partition: 0Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 1Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 2Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 3Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 4Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 5Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 6Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 7Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 8Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 9Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 10   Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 11   Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 12   Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 13   Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 14   Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 15   Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 16   Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 17   Leader: 2
> Replicas: 2 Isr: 2
>Topic: __consumer_offsets   Partition: 18   Leader: 1
> Replicas: 1 Isr: 1
>Topic: __consumer_offsets   Partition: 19   Leader: 

Re: Mirroring multiple clusters into one

2017-07-06 Thread James Cheng
Answers inline below.

-James

Sent from my iPhone

> On Jul 7, 2017, at 1:18 AM, Vahid S Hashemian  
> wrote:
> 
> James,
> 
> Thanks for sharing your thoughts and experience.
> Could you please also confirm whether
> - you do any encryption for the mirrored data?
Not at the Kafka level. The data goes over a VPN.

> - you have a many-to-one mirroring similar to what I described?
> 

Yes, we mirror multiple source clusters to a single target cluster. We have a 
topic naming convention where our topics are prefixed with their cluster name, 
so as long as we follow that convention, each source topic gets mirrored to a 
unique target topic. That is, we try not to have multiple mirrormakers writing 
to a single target topic. 

Our topic names in the target cluster get prefixed with the string "mirror." 
And then we never mirror topics that start with "mirror." This prevents us from 
creating mirroring loops.

> Thanks.
> --Vahid
> 
> 
> 
> From:   James Cheng 
> To: users@kafka.apache.org
> Cc: dev 
> Date:   07/06/2017 12:37 PM
> Subject:Re: Mirroring multiple clusters into one
> 
> 
> 
> I'm not sure what the "official" recommendation is. At TiVo, we *do* run 
> all our mirrormakers near the target cluster. It works fine for us, but 
> we're still fairly inexperienced, so I'm not sure how strong of a data 
> point we should be.
> 
> I think the thought process is, if you are mirroring from a source cluster 
> to a target cluster where there is a WAN between the two, then whichever 
> request goes across the WAN has a higher chance of intermittent failure 
> than the one over the LAN. That means that if mirrormaker is near the 
> source cluster, the produce request over the WAN to the target cluster may 
> fail. If the mirrormaker is near the target cluster, then the fetch 
> request over the WAN to the source cluster may fail.
> 
> Failed fetch requests don't have much impact on data replication, it just 
> delays it. Whereas a failure during a produce request may introduce 
> duplicates.
> 
> Becket Qin from LinkedIn did a presentation on tuning producer performance 
> at a meetup last year, and I remember he specifically talked about 
> producing over a WAN as one of the cases where you have to tune settings. 
> Maybe that presentation will give more ideas about what to look at. 
> https://www.slideshare.net/mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600
> 
> 
> -James
> 
> Sent from my iPhone
> 
>> On Jul 6, 2017, at 1:00 AM, Vahid S Hashemian 
>  wrote:
>> 
>> The literature suggests running the MM on the target cluster when 
> possible 
>> (with the exception of when encryption is required for transferred 
> data).
>> I am wondering if this is still the recommended approach when mirroring 
>> from multiple clusters to a single cluster (i.e. multiple MM instances).
>> Is there anything in particular (metric, specification, etc.) to 
> consider 
>> before making a decision?
>> 
>> Thanks.
>> --Vahid
>> 
>> 
> 
> 
> 
> 


Re: Mirroring multiple clusters into one

2017-07-06 Thread James Cheng
I'm not sure what the "official" recommendation is. At TiVo, we *do* run all 
our mirrormakers near the target cluster. It works fine for us, but we're still 
fairly inexperienced, so I'm not sure how strong of a data point we should be.

I think the thought process is, if you are mirroring from a source cluster to a 
target cluster where there is a WAN between the two, then whichever request 
goes across the WAN has a higher chance of intermittent failure than the one 
over the LAN. That means that if mirrormaker is near the source cluster, the 
produce request over the WAN to the target cluster may fail. If the mirrormaker 
is near the target cluster, then the fetch request over the WAN to the source 
cluster may fail.

Failed fetch requests don't have much impact on data replication, it just 
delays it. Whereas a failure during a produce request may introduce duplicates.

Becket Qin from LinkedIn did a presentation on tuning producer performance at a 
meetup last year, and I remember he specifically talked about producing over a 
WAN as one of the cases where you have to tune settings. Maybe that 
presentation will give more ideas about what to look at. 
https://www.slideshare.net/mobile/JiangjieQin/producer-performance-tuning-for-apache-kafka-63147600

-James

Sent from my iPhone

> On Jul 6, 2017, at 1:00 AM, Vahid S Hashemian  
> wrote:
> 
> The literature suggests running the MM on the target cluster when possible 
> (with the exception of when encryption is required for transferred data).
> I am wondering if this is still the recommended approach when mirroring 
> from multiple clusters to a single cluster (i.e. multiple MM instances).
> Is there anything in particular (metric, specification, etc.) to consider 
> before making a decision?
> 
> Thanks.
> --Vahid
> 
> 


Re: mirroring Kafka while preserving the order

2017-06-29 Thread James Cheng
MirrorMaker acts as a consumer+producer. So it will consume from the source 
topic and produce to the destination topic. That means that the destination 
partition is chosen using the same technique as the normal producer:

* if the source record has a key, the key will be hashed and the hash will be 
used to choose a partition. If the source partition was chosen using some 
different hashing algorithm or a custom partitioner, then you may end up 
writing to a different destination.
* if the source record does NOT have a key, then the destination partition will 
be randomly chosen.

MirrorMaker supports custom message handlers. You can use those to map the 
source partition to the destination partition, which will allow you to avoid 
the above two problems.  Here's an example of how to do it. 
https://github.com/gwenshap/kafka-examples/tree/master/MirrorMakerHandler

-James

Sent from my iPhone

> On Jun 29, 2017, at 9:57 AM, Tom Bentley  wrote:
> 
> I believe so. You need to be careful that the mirror maker producer doesn't
> reorder messages; in particular if retries > 0 then
> max.in.flight.requests.per.connection must be 1. If
> retries=0 then it doesn't matter what max.in.flight.requests.per.connection
> is.
> 
> 
> 
>> On 29 June 2017 at 05:52, Sunil Parmar  wrote:
>> 
>> Is it possible to configure mirror maker using message handler to preserve
>> the order of messages in each topic partition. In the particular use case
>> we're looking at both source and destination topics have same number of
>> partitions.
>> 
>> Sunil
>> 


Re: Slow Consumer Group Startup

2017-06-13 Thread James Cheng
Bryan,

This sounds related to 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance
 and https://issues.apache.org/jira/browse/KAFKA-4925.

-James

> On Jun 13, 2017, at 7:02 AM, Bryan Baugher  wrote:
> 
> The topics already exist prior to starting any of the consumers
> 
> On Mon, Jun 12, 2017 at 9:35 PM J Pai  wrote:
> 
>> When are the topics on which these consumer groups consume, created?
>> 
>> -Jaikiran
>> On 13-Jun-2017, at 3:18 AM, Bryan Baugher  wrote:
>> 
>> Hi everyone,
>> 
>> We are currently experiencing slow startup times for our consumer groups
>> (16-32 processes for a hundred or more partitions) in the range of minutes
>> (3-15 minutes), where little to no messages are consumed before suddenly
>> everything just starts working at full speed.
>> 
>> I'm currently using Kafka 0.9.0.1 but we are in the middle of upgrading to
>> Kafka 0.10.2.1. We also using the newer kafka consumer API and group
>> management on a simple Apache Storm topology. We don't make use of Storm's
>> kafka spout but instead wrote a simple one ourselves.
>> 
>> Using the kafka AdminClient I was able to poll for consumer group summary
>> information. What I've found is that the group seems to sit
>> in PreparingRebalance state for minutes before finally becoming Stable
>> which then everything starts processing quickly. I've also enabled debug
>> logging around the consumer's coordinator classes but didn't see anything
>> to indicate the issue.
>> 
>> I'm hoping that just upgrading to 0.10 or tweaking how we use our consumer
>> in Apache Storm is the problem but are there any pointers on things I
>> should look at or try?
>> 
>> Bryan
>> 
>> 



Re: [ANNOUNCE] New committer: Damian Guy

2017-06-09 Thread James Cheng
Congrats Damian!

-James

> On Jun 9, 2017, at 1:34 PM, Guozhang Wang  wrote:
> 
> Hello all,
> 
> 
> The PMC of Apache Kafka is pleased to announce that we have invited Damian
> Guy as a committer to the project.
> 
> Damian has made tremendous contributions to Kafka. He has not only
> contributed a lot into the Streams api, but have also been involved in many
> other areas like the producer and consumer clients, broker-side
> coordinators (group coordinator and the ongoing transaction coordinator).
> He has contributed more than 100 patches so far, and have been driving on 6
> KIP contributions.
> 
> More importantly, Damian has been a very prolific reviewer on open PRs and
> has been actively participating on community activities such as email lists
> and slack overflow questions. Through his code contributions and reviews,
> Damian has demonstrated good judgement on system design and code qualities,
> especially on thorough unit test coverages. We believe he will make a great
> addition to the committers of the community.
> 
> 
> Thank you for your contributions, Damian!
> 
> 
> -- Guozhang, on behalf of the Apache Kafka PMC



Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2017-04-26 Thread James Cheng
Ramya, Todd, Jiefu, David,

Sorry to drag up an ancient thread. I was looking for something in my email 
archives, and ran across this, and I might have solved part of these mysteries.

I ran across this post that talked about seeing weirdly large allocations when 
incorrect requests are accidentally sent to a port expecting a binary protocol. 
https://rachelbythebay.com/w/2016/02/21/malloc/

I took those finding and applied them to the weird big numbers you were seeing.

Ramya, Jiefu, about your allocation of 1347375956:
1347375956 converted to hex is 504F5354
504F5354 converted to ascii is the letters "POST"
So, someone sent a POST request to your Kafka broker by accident!

David, about your allocation of 1550939497:
1550939497 converted to hex is 5C717569
5C717569 converted to ascii is "\qui"
Maybe that's the beginning of the word "\quit"? Is there some protocol that 
uses the word "\quit"? Like IRC or SMTP or IMAP something? I'm not sure.

Anyway, thought you might find that interesting!

-James




> On Dec 12, 2016, at 9:39 AM, Todd Palino  wrote:
> 
> Are you actually getting requests that are 1.3 GB in size, or is something
> else happening, like someone trying to make HTTP requests against the Kafka
> broker port?
> 
> -Todd
> 
> 
> On Mon, Dec 12, 2016 at 4:19 AM, Ramya Ramamurthy <
> ramyaramamur...@teledna.com> wrote:
> 
>> We have got exactly the same problem.
>> nvalid receive (size = 1347375956 larger than 104857600).
>> 
>> When trying to increase the size, Java Out of Memory Exception.
>> Did you find a work around for the same ??
>> 
>> Thanks.
>> 
> 
> 

On Tue, Jul 14, 2015 at 11:18 AM, JIEFU GONG mailto:jg...@berkeley.edu>> wrote:

> @Gwen
> I am having a very very similar issue where I am attempting to send a
> rather small message and it's blowing up on me (my specific error is:
> Invalid receive (size = 1347375956 larger than 104857600)). I tried to
> change the relevant settings but it seems that this particular request is
> of 1340 mbs (and davids will be 1500 mb) and attempting to change the
> setting will give you another error saying there is not enough memory in
> the java heap. Any insight here?



> 
> -- 
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
> 
> 
> 
> linkedin.com/in/toddpalino



Re: ISR churn

2017-03-22 Thread James Cheng
Marcos, Radu, 

Are either of you seeing messages saying "Cached zkVersion [...] not equal to 
that in zookeeper"? If so, you may be hitting 
https://issues.apache.org/jira/browse/KAFKA-3042 


Not sure if that helps you, since I haven't been able it isolate a cause. I 
think it's related to flaky connections to the zookeeper cluster.

-James


> On Mar 22, 2017, at 11:46 AM, Marcos Juarez  wrote:
> 
> We're seeing the same exact pattern of ISR shrinking/resizing, mostly on 
> partitions with the largest volume, with thousands of messages per second.  
> It happens at least a hundred times a day in our production cluster. We do 
> have hundreds of topics in our cluster, most of them with 20 or more 
> partitions, but most of them see only a few hundred messages per minute.  
> 
> We're running Kafka 0.10.0.1, and we thought upgrading to the 0.10.1.1 
> version would fix the issue, but we've already deployed that version to our 
> staging cluster, and we're seeing the same problem.  We still haven't tried 
> out the latest 0.10.2.0 version, but I don't see any evidence pointing to a 
> fix for that.
> 
> This ticket seems to have some similar details, but it doesn't seem like 
> there has been follow-up, and there's no target release for fixing:
> 
> https://issues.apache.org/jira/browse/KAFKA-4674 
> 
> 
> Jun Ma, what exactly did you do to failover the controller to a new broker? 
> If that works for you, I'd like to try it in our staging clusters.
> 
> Thanks,
> 
> Marcos Juarez
> 
> 
> 
> 
> 
> On Wed, Mar 22, 2017 at 11:55 AM, Jun MA  > wrote:
> I have similar issue with our cluster. We don’t know the root cause but we 
> have some interesting observation.
> 
> 1. We do see correlation between ISR churn and fetcher connection 
> close/create.
> 
> 
> 2. We’ve tried to add a broker which doesn’t have any partitions on it 
> dedicate to the controller (rolling restart existing brokers and failover the 
> controller to the newly added broker), and that indeed eliminate the random 
> ISR churn. We have a cluster of 6 brokers (besides the dedicated controller) 
> and each one has about 300 partitions on it. I suspect that kafka broker 
> cannot handle running controller + 300 partitions.
> 
> Anyway that’s so far what I got, I’d also like to know how to debug this.
> We’re running kafka 0.9.0.1 with heap size 8G.
> 
> Thanks,
> Jun
> 
>> On Mar 22, 2017, at 7:06 AM, Manikumar > > wrote:
>> 
>> Any erros related to zookeeper seesion timeout? We can also check for
>> excesssive GC.
>> Some times this may due to forming multiple controllers due to soft
>> failures.
>> You can check ActiveControllerCount on brokers, only one broker in the
>> cluster should have 1.
>> Also check for network issues/partitions
>> 
>> On Wed, Mar 22, 2017 at 7:21 PM, Radu Radutiu > > wrote:
>> 
>>> Hello,
>>> 
>>> Does anyone know how I can debug high ISR churn on the kafka leader on a
>>> cluster without traffic? I have 2 topics on a 4 node cluster  (replica 4
>>> and replica 3) and both show constant changes of the number of insync
>>> replicas:
>>> 
>>> [2017-03-22 15:30:10,945] INFO Partition [__consumer_offsets,0] on broker
>>> 2: Expanding ISR for partition __consumer_offsets-0 from 2,4 to 2,4,5
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:31:41,193] INFO Partition [__consumer_offsets,0] on broker
>>> 2: Shrinking ISR for partition [__consumer_offsets,0] from 2,4,5 to 2,5
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:31:41,195] INFO Partition [__consumer_offsets,0] on broker
>>> 2: Expanding ISR for partition __consumer_offsets-0 from 2,5 to 2,5,4
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:35:03,443] INFO Partition [__consumer_offsets,0] on broker
>>> 2: Shrinking ISR for partition [__consumer_offsets,0] from 2,5,4 to 2,5
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:35:03,445] INFO Partition [__consumer_offsets,0] on broker
>>> 2: Expanding ISR for partition __consumer_offsets-0 from 2,5 to 2,5,4
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:37:01,443] INFO Partition [__consumer_offsets,0] on broker
>>> 2: Shrinking ISR for partition [__consumer_offsets,0] from 2,5,4 to 2,4
>>> (kafka.cluster.Partition)
>>> [2017-03-22 15:37:01,445] INFO Partition [__consumer_offsets,0] on broker
>>> 2: Expanding ISR for partition __consumer_offsets-0 from 2,4 to 2,4,5
>>> (kafka.cluster.Partition)
>>> 
>>> and
>>> 
>>> [2017-03-22 15:09:52,646] INFO Partition [topic1,0] on broker 5: Shrinking
>>> ISR for partition [topic1,0] from 5,2,4 to 5,4 (kafka.cluster.Partition)
>>> [2017-03-22 15:09:52,648] INFO Partition [topic1,0] on broker 5: Expanding
>>> ISR for partition topic1-0 from 5,4 to 5,4,2 (kafka.cluster.Partition)
>>> [2017-03-22 15:24:05,646] INFO Partition [topic1,0] on broker 5: Shrinking
>>> ISR for partition [topic1,0] from

Re: Offset commit request failing

2017-03-17 Thread James Cheng
I think it's due to the high number of partitions and the high number of 
consumers in the group. The group coordination info to keep track of the 
assignments actually happens via a message that travels through the 
__consumer_offsets topic. So with so many partitions and consumers, the message 
gets too big to go through the topic.

There is a long thread here that discusses it. I don't remember what specific 
actions came out of that discussion. 
http://search-hadoop.com/m/Kafka/uyzND1yd26N1rFtRd1?subj=+DISCUSS+scalability+limits+in+the+coordinator

-James

Sent from my iPhone

> On Mar 15, 2017, at 9:40 AM, Robert Quinlivan  wrote:
> 
> I should also mention that this error was seen on broker version 0.10.1.1.
> I found that this condition sounds somewhat similar to KAFKA-4362
> , but that issue was
> submitted in 0.10.1.1 so they appear to be different issues.
> 
> On Wed, Mar 15, 2017 at 11:11 AM, Robert Quinlivan 
> wrote:
> 
>> Good morning,
>> 
>> I'm hoping for some help understanding the expected behavior for an offset
>> commit request and why this request might fail on the broker.
>> 
>> *Context:*
>> 
>> For context, my configuration looks like this:
>> 
>>   - Three brokers
>>   - Consumer offsets topic replication factor set to 3
>>   - Auto commit enabled
>>   - The user application topic, which I will call "my_topic", has a
>>   replication factor of 3 as well and 800 partitions
>>   - 4000 consumers attached in consumer group "my_group"
>> 
>> 
>> *Issue:*
>> 
>> When I attach the consumers, the coordinator logs the following error
>> message repeatedly for each generation:
>> 
>> ERROR [Group Metadata Manager on Broker 0]: Appending metadata message for
>> group my_group generation 2066 failed due to org.apache.kafka.common.
>> errors.RecordTooLargeException, returning UNKNOWN error code to the
>> client (kafka.coordinator.GroupMetadataManager)
>> 
>> *Observed behavior:*
>> 
>> The consumer group does not stay connected long enough to consume
>> messages. It is effectively stuck in a rebalance loop and the "my_topic"
>> data has become unavailable.
>> 
>> 
>> *Investigation:*
>> 
>> Following the Group Metadata Manager code, it looks like the broker is
>> writing to a cache after it writes an Offset Commit Request to the log
>> file. If this cache write fails, the broker then logs this error and
>> returns an error code in the response. In this case, the error from the
>> cache is MESSAGE_TOO_LARGE, which is logged as a RecordTooLargeException.
>> However, the broker then sets the error code to UNKNOWN on the Offset
>> Commit Response.
>> 
>> It seems that the issue is the size of the metadata in the Offset Commit
>> Request. I have the following questions:
>> 
>>   1. What is the size limit for this request? Are we exceeding the size
>>   which is causing this request to fail?
>>   2. If this is an issue with metadata size, what would cause abnormally
>>   large metadata?
>>   3. How is this cache used within the broker?
>> 
>> 
>> Thanks in advance for any insights you can provide.
>> 
>> Regards,
>> Robert Quinlivan
>> Software Engineer, Signal
>> 
> 
> 
> 
> -- 
> Robert Quinlivan
> Software Engineer, Signal


Re: How does offsets.retention.minutes work

2017-03-16 Thread James Cheng
Yes, that is correct. I filed a JIRA about that issue here:

https://issues.apache.org/jira/browse/KAFKA-4682

-James

> On Mar 15, 2017, at 8:51 PM, tao xiao  wrote:
> 
> Hi team,
> 
> I know that Kafka deletes offset for a consumer group after a period of
> time (configured by offsets.retention.minutes) if the consumer group is
> inactive for this amount of time. I want to understand the definition of
> "inactive". I came across this post[1] and it suggests that no offset
> committed for this amount of time is considered inactive. Does it mean that
> even I have a consumer group that connects to Kafka all the time but no
> offset is committed for that amount of time, the offset will still be
> deleted?
> 
> 
> 
> 
> [1]
> http://stackoverflow.com/questions/39131465/how-does-an-offset-expire-for-an-apache-kafka-consumer-group



Re: Clarification on min.insync.replicas​

2017-03-07 Thread James Cheng

> On Mar 7, 2017, at 12:18 PM, James Cheng  wrote:
> 
> 
>> On Mar 7, 2017, at 7:44 AM, Shrikant Patel  wrote:
>> 
>> Thanks for clarification. I am seeing strange behavior in that case,
>> 
>> When I set min.insync.replicas=2 in my server.properties (restart the 
>> server) and set the acks=all on producer, I am still able to publish to 
>> topic even when only leader is up (none of followers are alive). With this 
>> configuration I was hoping to see NotEnoughReplicasException.
>> 
>> When I set min.insync.replicas=2 specifically on my topic and set the 
>> acks=all on producer, I get error NotEnoughReplicasException when less than 
>> 2 replicas are live.
>> 
>> Because of this behavior I thought min.insync.replicas=2 in my 
>> server.properties does not work and we needed it at explicit topic level.
>> 
>> This looks like bug?? Anyone facing this issue.
>> 
> 
> Did you try min.insync.replicas=2 in server.properties and acks=all on 
> producer? min.insync.replicas only applies when acks=all.
> 

Oops, sorry, I didn't read properly. I see that you already tried to do that.

When the broker starts up, it prints out a message to the logs showing all the 
configuration settings. I often use that to double check that I specified my 
settings properly in server.properties.

-James

> -James
> 
>> 
>> -Original Message-
>> From: Todd Palino [mailto:tpal...@gmail.com]
>> Sent: Monday, March 06, 2017 6:48 PM
>> To: users@kafka.apache.org
>> Subject: Re: Clarification on min.insync.replicas​
>> 
>> Default broker configurations do not show in the topic overrides (which is 
>> what you are showing with the topics tool). It is more accurate to say that 
>> the min.insync.replicas setting in your server.properties file is what will 
>> apply to every topic (regardless of when it is created), if there exists no 
>> topic override for that configuration for that config.
>> 
>> -Todd
>> 
>> 
>> On Mon, Mar 6, 2017 at 4:38 PM, Shrikant Patel  wrote:
>> 
>>> Hi All,
>>> 
>>> Need details about min.insync.replicas​ in the server.properties.
>>> 
>>> I thought once I add this to server.properties, all subsequent topic
>>> create should have this as default value.
>>> 
>>> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topic
>>> s.bat --zookeeper localhost:2181/chroot/cluster1 --create --topic test
>>> --partition 3 --replication-factor 3 Created topic "test".
>>> 
>>> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topic
>>> s.bat --zookeeper localhost:2181/chroot/cluster1 --describe --topic
>>> test
>>> Topic:test  PartitionCount:3ReplicationFactor:3 Configs:
>>> 
>>> No min.insync.replicas is set on the topic.
>>> 
>>> Why do I have explicit provide this configuratoin when creating topic?
>>> So whats the purpose of this in server.properties??
>>> 
>>> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topic
>>> s.bat --zookeeper localhost:2181/chroot/cluster1 --create --topic test
>>> --partition 3 --replication-factor 3 --config min.insync.replicas=3
>>> Created topic "test".
>>> 
>>> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topic
>>> s.bat --zookeeper localhost:2181/chroot/cluster1 --describe --topic
>>> test
>>> Topic:test  PartitionCount:3ReplicationFactor:3
>>> Configs:min.insync.replicas=3
>>>   Topic: test Partition: 0Leader: 1   Replicas: 1,2,0
>>> Isr: 1,2,0
>>>   Topic: test Partition: 1Leader: 2   Replicas: 2,0,1
>>> Isr: 2,0,1
>>>   Topic: test Partition: 2Leader: 0   Replicas: 0,1,2
>>> Isr: 0,1,2
>>> 
>>> Thanks
>>> Shri
>>> 
>>> 
>>> 
>>> This e-mail and its contents (to include attachments) are the property
>>> of National Health Systems, Inc., its subsidiaries and affiliates,
>>> including but not limited to Rx.com Community Healthcare Network, Inc.
>>> and its subsidiaries, and may contain confidential and proprietary or
>>> privileged information. If you are not the intended recipient of this
>>> e-mail, you are hereby notified that any unauthorized disclosure,
>>> copying, or distribution of this e-mail or of its attachments, or the
>>> taking of any unauthorized action based on information contained herein i

Re: Clarification on min.insync.replicas​

2017-03-07 Thread James Cheng

> On Mar 7, 2017, at 7:44 AM, Shrikant Patel  wrote:
> 
> Thanks for clarification. I am seeing strange behavior in that case,
> 
> When I set min.insync.replicas=2 in my server.properties (restart the server) 
> and set the acks=all on producer, I am still able to publish to topic even 
> when only leader is up (none of followers are alive). With this configuration 
> I was hoping to see NotEnoughReplicasException.
> 
> When I set min.insync.replicas=2 specifically on my topic and set the 
> acks=all on producer, I get error NotEnoughReplicasException when less than 2 
> replicas are live.
> 
> Because of this behavior I thought min.insync.replicas=2 in my 
> server.properties does not work and we needed it at explicit topic level.
> 
> This looks like bug?? Anyone facing this issue.
> 

Did you try min.insync.replicas=2 in server.properties and acks=all on 
producer? min.insync.replicas only applies when acks=all.

-James

> 
> -Original Message-
> From: Todd Palino [mailto:tpal...@gmail.com]
> Sent: Monday, March 06, 2017 6:48 PM
> To: users@kafka.apache.org
> Subject: Re: Clarification on min.insync.replicas​
> 
> Default broker configurations do not show in the topic overrides (which is 
> what you are showing with the topics tool). It is more accurate to say that 
> the min.insync.replicas setting in your server.properties file is what will 
> apply to every topic (regardless of when it is created), if there exists no 
> topic override for that configuration for that config.
> 
> -Todd
> 
> 
> On Mon, Mar 6, 2017 at 4:38 PM, Shrikant Patel  wrote:
> 
>> Hi All,
>> 
>> Need details about min.insync.replicas​ in the server.properties.
>> 
>> I thought once I add this to server.properties, all subsequent topic
>> create should have this as default value.
>> 
>> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topic
>> s.bat --zookeeper localhost:2181/chroot/cluster1 --create --topic test
>> --partition 3 --replication-factor 3 Created topic "test".
>> 
>> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topic
>> s.bat --zookeeper localhost:2181/chroot/cluster1 --describe --topic
>> test
>> Topic:test  PartitionCount:3ReplicationFactor:3 Configs:
>> 
>> No min.insync.replicas is set on the topic.
>> 
>> Why do I have explicit provide this configuratoin when creating topic?
>> So whats the purpose of this in server.properties??
>> 
>> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topic
>> s.bat --zookeeper localhost:2181/chroot/cluster1 --create --topic test
>> --partition 3 --replication-factor 3 --config min.insync.replicas=3
>> Created topic "test".
>> 
>> C:\JAVA_INSTALLATION\kafka\kafka_2.11-0.10.1.1>bin\windows\kafka-topic
>> s.bat --zookeeper localhost:2181/chroot/cluster1 --describe --topic
>> test
>> Topic:test  PartitionCount:3ReplicationFactor:3
>> Configs:min.insync.replicas=3
>>Topic: test Partition: 0Leader: 1   Replicas: 1,2,0
>> Isr: 1,2,0
>>Topic: test Partition: 1Leader: 2   Replicas: 2,0,1
>> Isr: 2,0,1
>>Topic: test Partition: 2Leader: 0   Replicas: 0,1,2
>> Isr: 0,1,2
>> 
>> Thanks
>> Shri
>> 
>> 
>> 
>> This e-mail and its contents (to include attachments) are the property
>> of National Health Systems, Inc., its subsidiaries and affiliates,
>> including but not limited to Rx.com Community Healthcare Network, Inc.
>> and its subsidiaries, and may contain confidential and proprietary or
>> privileged information. If you are not the intended recipient of this
>> e-mail, you are hereby notified that any unauthorized disclosure,
>> copying, or distribution of this e-mail or of its attachments, or the
>> taking of any unauthorized action based on information contained herein is 
>> strictly prohibited.
>> Unauthorized use of information contained herein may subject you to
>> civil and criminal prosecution and penalties. If you are not the
>> intended recipient, please immediately notify the sender by telephone
>> at
>> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>> 
> 
> 
> 
> --
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
> 
> 
> 
> linkedin.com/in/toddpalino
> This e-mail and its contents (to include attachments) are the property of 
> National Health Systems, Inc., its subsidiaries and affiliates, including but 
> not limited to Rx.com Community Healthcare Network, Inc. and its 
> subsidiaries, and may contain confidential and proprietary or privileged 
> information. If you are not the intended recipient of this e-mail, you are 
> hereby notified that any unauthorized disclosure, copying, or distribution of 
> this e-mail or of its attachments, or the taking of any unauthorized action 
> based on information contained herein is strictly prohibited. Unauthorized 
> use of information contained herein may subject you to civil and criminal 
> prosecution and penalties. If you

Re: Question about messages in __consumer_offsets topic

2017-02-23 Thread James Cheng
Yup, this got fixed in 0.10.2

https://issues.apache.org/jira/browse/KAFKA-2000 


-James

> On Feb 23, 2017, at 11:10 AM, Jeff Widman  wrote:
> 
> The topic deletion only triggers tombstone on brokers >= 0.10.2, correct? I
> thought there was an outstanding bug report for this in lower versions...
> 
> On Wed, Feb 22, 2017 at 6:17 PM, Hans Jespersen  wrote:
> 
>> The __consumer_offsets topic should also get a tombstone message as soon as
>> a topic is deleted.
>> 
>> -hans
>> 
>> /**
>> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> * h...@confluent.io (650)924-2670
>> */
>> 
>> On Wed, Feb 22, 2017 at 5:59 PM, Jun MA  wrote:
>> 
>>> Hi Todd,
>>> 
>>> Thank you so much for your reply. I assume that the broker will produce
>>> the tombstone to __consumer_offsets topic when the offset expires? I’m
>>> curious how broker notices the offset expires? Does it store the offset
>>> message in memory and periodically check if someone expires?
>>> 
>>> Thanks,
>>> Jun
>>> 
 On Feb 22, 2017, at 4:37 PM, Todd Palino  wrote:
 
 __consumer_offsets is a log-compacted topic, and a NULL body indicates
>> a
 delete tombstone. So it means to delete the entry that matches the key
 (group, topic, partition tuple).
 
 -Todd
 
 
 
 On Wed, Feb 22, 2017 at 3:50 PM, Jun MA 
>> wrote:
 
> Hi guys,
> 
> I’m trying to consume from __consumer_offsets topic to get exact
>>> committed
> offset of each consumer. Normally I can see messages like:
> 
> [eds-els-recopp-jenkins-01-5651,eds-incre-staging-1,0]::[
> OffsetMetadata[29791925,NO_METADATA],CommitTime
> 1487090167367,ExpirationTime 1487176567367],
> 
> which make sense to me. But sometimes I see messages like:
> 
> [eds-elssearchindex-curiosity-stg-10892,eds-incre-v2-
>>> staging-els,0]::NULL.
> 
> Can someone explains what is NULL means here and why a NULL value get
> published to __consumer_offsets?
> 
> We’re running kafka 0.9.0.1 and we use org.apache.kafka.common.
> serialization.ByteArrayDeserializer and GroupMetadataManager.
>>> OffsetsMessageFormatter
> to parse the message.
> 
> Thanks,
> Jun
 
 
 
 
 --
 *Todd Palino*
 Staff Site Reliability Engineer
 Data Infrastructure Streaming
 
 
 
 linkedin.com/in/toddpalino
>>> 
>>> 
>> 



Re: [ANNOUNCE] Apache Kafka 0.10.2.0 Released

2017-02-22 Thread James Cheng
Woohoo! Thanks for running the release, Ewen!

-James

> On Feb 22, 2017, at 12:33 AM, Ewen Cheslack-Postava  wrote:
> 
> The Apache Kafka community is pleased to announce the release for Apache
> Kafka 0.10.2.0. This is a feature release which includes the completion
> of 15 KIPs, over 200 bug fixes and improvements, and more than 500 pull
> requests merged.
> 
> All of the changes in this release can be found in the release notes:
> https://archive.apache.org/dist/kafka/0.10.2.0/RELEASE_NOTES.html
> 
> Apache Kafka is a distributed streaming platform with four four core
> APIs:
> 
> ** The Producer API allows an application to publish a stream records to
> one or more Kafka topics.
> 
> ** The Consumer API allows an application to subscribe to one or more
> topics and process the stream of records produced to them.
> 
> ** The Streams API allows an application to act as a stream processor,
> consuming an input stream from one or more topics and producing an
> output
> stream to one or more output topics, effectively transforming the input
> streams to output streams.
> 
> ** The Connector API allows building and running reusable producers or
> consumers that connect Kafka topics to existing applications or data
> systems. For example, a connector to a relational database might capture
> every change to a table.three key capabilities:
> 
> 
> With these APIs, Kafka can be used for two broad classes of application:
> 
> ** Building real-time streaming data pipelines that reliably get data
> between systems or applications.
> 
> ** Building real-time streaming applications that transform or react to
> the
> streams of data.
> 
> 
> You can download the source release from
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka-0.10.2.0-src.tgz
> 
> and binary releases from
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
> (experimental 2.12 artifact)
> 
> Thanks to the 101 contributors on this release!
> 
> Akash Sethi, Alex Loddengaard, Alexey Ozeritsky, amethystic, Andrea
> Cosentino, Andrew Olson, Andrew Stevenson, Anton Karamanov, Antony
> Stubbs, Apurva Mehta, Arun Mahadevan, Ashish Singh, Balint Molnar, Ben
> Stopford, Bernard Leach, Bill Bejeck, Colin P. Mccabe, Damian Guy, Dan
> Norwood, Dana Powers, dasl, Derrick Or, Dong Lin, Dustin Cote, Edoardo
> Comar, Edward Ribeiro, Elias Levy, Emanuele Cesena, Eno Thereska, Ewen
> Cheslack-Postava, Flavio Junqueira, fpj, Geoff Anderson, Guozhang Wang,
> Gwen Shapira, Hikiko Murakami, Himani Arora, himani1, Hojjat Jafarpour,
> huxi, Ishita Mandhan, Ismael Juma, Jakub Dziworski, Jan Lukavsky, Jason
> Gustafson, Jay Kreps, Jeff Widman, Jeyhun Karimov, Jiangjie Qin, Joel
> Koshy, Jon Freedman, Joshi, Jozef Koval, Json Tu, Jun He, Jun Rao,
> Kamal, Kamal C, Kamil Szymanski, Kim Christensen, Kiran Pillarisetty,
> Konstantine Karantasis, Lihua Xin, LoneRifle, Magnus Edenhill, Magnus
> Reftel, Manikumar Reddy O, Mark Rose, Mathieu Fenniak, Matthias J. Sax,
> Mayuresh Gharat, MayureshGharat, Michael Schiff, Mickael Maison,
> MURAKAMI Masahiko, Nikki Thean, Olivier Girardot, pengwei-li, pilo,
> Prabhat Kashyap, Qian Zheng, Radai Rosenblatt, radai-rosenblatt, Raghav
> Kumar Gautam, Rajini Sivaram, Rekha Joshi, rnpridgeon, Ryan Pridgeon,
> Sandesh K, Scott Ferguson, Shikhar Bhushan, steve, Stig Rohde Døssing,
> Sumant Tambe, Sumit Arrawatia, Theo, Tim Carey-Smith, Tu Yang, Vahid
> Hashemian, wangzzu, Will Marshall, Xavier Léauté, Xavier Léauté, Xi Hu,
> Yang Wei, yaojuncn, Yuto Kawamura
> 
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at
> http://kafka.apache.org/
> 
> Thanks,
> Ewen



Re: KIP-122: Add a tool to Reset Consumer Group Offsets

2017-02-16 Thread James Cheng
Yeah, that's a good point.

Some of the operations might make sense on multiple partitions at once. Moving 
to a timestamp might apply to all partitions, moving backwards and forwards by 
N offsets might apply to all partitions.

However, moving to a specific offset ("set to offset 43") would most likely 
only make sense to one partition at time. It might make sense to *require* 
topic and partition, when moving to a specific offset.

-James

> On Feb 8, 2017, at 3:36 PM, Gwen Shapira  wrote:
> 
> Just to clarify, we'll need to allow specifying topic and partition. I
> don't think we want this on ALL partitions at once.
> 
> On Wed, Feb 8, 2017 at 3:35 PM, Gwen Shapira  wrote:
>> That's what I'd like to see. For example, suppose a Connect task fails
>> because it can't deserialize an event from a partition. Stop
>> connector, move offset forward, start connector. Boom!
>> 
>> 
>> On Wed, Feb 8, 2017 at 3:22 PM, Matthias J. Sax  
>> wrote:
>>> I am not sure about --reset-plus and --reset-minus
>>> 
>>> Would this skip n messages forward/backward for each partitions?
>>> 
>>> 
>>> -Matthias
>>> 
>>> On 2/8/17 2:23 PM, Jorge Esteban Quilcate Otoya wrote:
 Great. I think I got the idea. What about this options:
 
 Scenarios:
 
 1. Current status
 
 ´kafka-consumer-groups.sh --reset-offset --group cg1´
 
 2. To Datetime
 
 ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-datetime
 2017-01-01T00:00:00.000´
 
 3. To Period
 
 ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-period P2D´
 
 4. To Earliest
 
 ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-earliest´
 
 5. To Latest
 
 ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to-latest´
 
 6. Minus 'n' offsets
 
 ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-minus n´
 
 7. Plus 'n' offsets
 
 ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-plus n´
 
 8. To specific offset
 
 ´kafka-consumer-groups.sh --reset-offset --group cg1 --reset-to x´
 
 Scopes:
 
 a. All topics used by Consumer Group
 
 Don't specify --topics
 
 b. Specific List of Topics
 
 Add list of values in --topics t1,t2,tn
 
 c. One Topic, all Partitions
 
 Add one topic and no partitions values: --topic t1
 
 d. One Topic, List of Partitions
 
 Add one topic and partitions values: --topic t1 --partitions 0,1,2
 
 About Reset Plan (JSON file):
 
 I think is still valid to have the option to persist reset configuration as
 a file, but I agree to give the option to run the tool without going down
 to the JSON file.
 
 Execution options:
 
 1. Without execution argument (No args):
 
 Print out results (reset plan)
 
 2. With --execute argument:
 
 Run reset process
 
 3. With --output argument:
 
 Save result in a JSON format.
 
 4. Only with --execute option and --reset-file (path to JSON)
 
 Reset based on file
 
 4. Only with --verify option and --reset-file (path to JSON)
 
 Verify file values with current offsets
 
 I think we can remove --generate-and-execute because is a bit clumsy.
 
 With this options we will be able to execute with manual JSON 
 configuration.
 
 
 El mié., 8 feb. 2017 a las 22:43, Ben Stopford ()
 escribió:
 
> Yes - using a tool like this to skip a set of consumer groups over a
> corrupt/bad message is definitely appealing.
> 
> B
> 
> On Wed, Feb 8, 2017 at 9:37 PM Gwen Shapira  wrote:
> 
>> I like the --reset-to-earliest and --reset-to-latest. In general,
>> since the JSON route is the most challenging for users, we want to
>> provide a lot of ways to do useful things without going there.
>> 
>> Two things that can help:
>> 
>> 1. A lot of times, users want to skip few messages that cause issues
>> and continue. maybe just specifying the topic, partition and delta
>> will be better than having to find the offset and write a JSON and
>> validate the JSON etc.
>> 
>> 2. Thinking if there are other common use-cases that we can make easy
>> rather than just one generic but not very usable method.
>> 
>> Gwen
>> 
>> On Wed, Feb 8, 2017 at 3:25 AM, Jorge Esteban Quilcate Otoya
>>  wrote:
>>> Thanks for the feedback!
>>> 
>>> @Onur, @Gwen:
>>> 
>>> Agree. Actually at the first draft I considered to have it inside
>>> ´kafka-consumer-groups.sh´, but I decide to propose it as a standalone
>> tool
>>> to describe it clearly and focus it on reset functionality.
>>> 
>>> But now that you mentioned, it does make sense to have it in
>>> ´kafka-consumer-groups.sh´. How would be a consistent way 

Is anyone running Kafka on CoreOS?

2017-02-10 Thread James Cheng
Hi,

(This question is kinda Kafka related, but mostly CoreOS related, so sorry if 
this is the wrong place to ask this.)

Is anyone running Kafka on CoreOS?

We run Kafka in docker containers on CoreOS. CoreOS has an OS-update policy 
where they will automatically install new OS updates, during which they will 
reboot (which turns off all containers, and then starts them back up again).

We are trying to make sure that:
1) Upon OS shutdown, that Kafka does a controlled shutdown
2) Upon OS startup on a node, that we do not reboot a second node until Kafka 
is fully started up and in all ISRs on the first node.

Is anyone else doing this and if so, can you share some ideas/approaches?

We have an ugly solution to #1, which is to hook into the systemd dependency 
chain to allow shutdown. But we don't have a great way to do 2).

Thanks,
-James



Re: Kafka docs for current trunk

2017-02-01 Thread James Cheng
+1

In particular, this would help when there is a doc change submitted to trunk 
which is applicable to the currently released version. It would help the change 
get out there faster.

-James

> On Feb 1, 2017, at 9:03 AM, Guozhang Wang  wrote:
> 
> +1
> 
> Guozhang
> 
> 
> On Wed, Feb 1, 2017 at 1:30 AM, Damian Guy  wrote:
> 
>> +1
>> 
>> On Wed, 1 Feb 2017 at 07:48 Michael Noll  wrote:
>> 
>>> Thanks for bringing this up, Matthias.
>>> 
>>> +1
>>> 
>>> On Wed, Feb 1, 2017 at 8:15 AM, Gwen Shapira  wrote:
>>> 
 +1
 
 On Tue, Jan 31, 2017 at 5:57 PM, Matthias J. Sax <
>> matth...@confluent.io>
 wrote:
> Hi,
> 
> I want to collect feedback about the idea to publish docs for current
> trunk version of Apache Kafka.
> 
> Currently, docs are only published for official release. Other
>> projects
> also have docs for current SNAPSHOT version. So the question rises,
>> if
> this would be helpful for Kafka community, too.
> 
> The idea would be, to update SNAPSHOT docs (web page and JavaDocs)
>> on a
> daily basis based on trunk (of course, fully automated).
> 
> 
> Looking forward to your feedback.
> 
> 
> -Matthias
> 
> 
 
 
>>> 
>> 
> 
> 
> 
> -- 
> -- Guozhang



Re: [ANNOUNCE] New committer: Grant Henke

2017-01-11 Thread James Cheng
Congrats, Grant!!

-James

> On Jan 11, 2017, at 11:51 AM, Gwen Shapira  wrote:
> 
> The PMC for Apache Kafka has invited Grant Henke to join as a
> committer and we are pleased to announce that he has accepted!
> 
> Grant contributed 88 patches, 90 code reviews, countless great
> comments on discussions, a much-needed cleanup to our protocol and the
> on-going and critical work on the Admin protocol. Throughout this, he
> displayed great technical judgment, high-quality work and willingness
> to contribute where needed to make Apache Kafka awesome.
> 
> Thank you for your contributions, Grant :)
> 
> -- 
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog



Re: Under-replicated Partitions while rolling Kafka nodes in AWS

2017-01-05 Thread James Cheng

> On Jan 5, 2017, at 7:55 AM, Jack Lund  wrote:
> 
> Hello, all.
> 
> We're running multiple Kafka clusters in AWS, and thus multiple Zookeeper
> clusters as well. When we roll out changes to our zookeeper nodes (which
> involves changes to the AMI, which means terminating the zookeeper instance
> and bringing up a new one in its place) we have to restart our Kafka
> brokers one at a time so they can pick up the new zookeeper IP address.
> 

FYI, zookeeper 3.4.8 fixes the issue where you have to restart zookeeper nodes 
when their DNS mapping changes. I'm not sure how it affects restarting kafka 
though, when the zookeeper DNS changes.

https://zookeeper.apache.org/doc/r3.4.8/releasenotes.html 

https://issues.apache.org/jira/browse/ZOOKEEPER-1506 


> What we've noticed is that, as the brokers are restarted, we get alerts for
> under-replicated partitions, which seems strange since it seems like the
> shutdown process should take care of moving any replicas and the leadership
> election process.
> 

During a controlled shutdown, you are right that *leadership* is moved from one 
broker to another. But the replica list does not change. A topic assigned to 
brokers 1 2 3 for example will only live on 1 2 3. If broker 1 is the leader 
for the topic, then during controlled shutdown of 1, leadership may move to 2 
or 3. But a broker 4 would never automatically take over as replica for the 
topic.

You can build such functionality yourself, if you wanted. You could, for 
example, move the topic to 2 3 4 before shutting down 1, and then move it back 
to 1 2 3 once 1 is back up. But that's a bunch of work you've have to do 
yourself.

-James

> This is causing us some pain because it means that we get pages whenever we
> roll out changes to Zookeeper.
> 
> Does anybody have any ideas why this would be happening, and how we can
> avoid it?
> 
> Thanks.
> 
> -Jack Lund
> Braintree Payments



Re: Lost message with Kafka configuration

2017-01-05 Thread James Cheng

> On Jan 5, 2017, at 8:23 AM, Hoang Bao Thien  wrote:
> 
> Yes, the problem is from producer configuration. And James Cheng has told
> me how to fix it.
> However I still get other poblem with a large file:
> 
> org.apache.kafka.common.errors.TimeoutException: Batch containing 36
> record(s) expired due to timeout while requesting metadata from brokers for
> MyTopic-0
> 

kafka-console-producer.sh defaults to retries=0. If there is a timeout, as that 
error indicates, I think it drops the messages it was trying to send.

As a test, try setting retries to something high, by doing "--producer-property 
retries="

See the description of "retries" at 
http://kafka.apache.org/documentation/#producerconfigs 
<http://kafka.apache.org/documentation/#producerconfigs>.

-James


> Best regards,
> 
> On Thu, Jan 5, 2017 at 10:23 AM, Protoss Hu 
> wrote:
> 
>> You mean the messages were lost on the way to broker before the broker
>> actually received?
>> 
>> Protoss Hu
>> Blog: http://hbprotoss.github.io/
>> Weibo: http://weibo.com/hbprotoss
>> 
>> 2017年1月5日 +0800 PM4:53 James Cheng ,写道:
>>> kafka-console-producer.sh defaults to acks=0, which means that the
>> producer essentially throws messages at the broker and doesn't wait/retry
>> to make sure they are properly received.
>>> 
>>> In the kafka-console-producer.sh usage text:
>>> --request-required-acks >> request required acks> requests (default: 0)
>>> 
>>> Try re-running your test with "--request-required-acks -1" or
>> "--request-required-acks all" (They are equivalent) This will tell the
>> broker to wait for messages to be fully saved to all replicas before
>> returning an acknowledgement to the producer. You can read more about acks
>> in the producer configuration section of the kafka docs (
>> http://kafka.apache.org/documentation/#producerconfigs <
>> http://kafka.apache.org/documentation/#producerconfigs>)
>>> 
>>> -James
>>> 
>>>> On Jan 4, 2017, at 1:25 AM, Hoang Bao Thien 
>> wrote:
>>>> 
>>>> Hi all,
>>>> 
>>>> I have a problem with losing messages from Kafka.
>>>> The situation is as follows: I put a csv file with 286701 rows (size =
>>>> 110MB) into Kafka producer with command:
>>>> $ cat test.csv | kafka-console-producer.sh --broker-list localhost:9092
>>>> --topic MyTopic > /dev/null
>>>> 
>>>> and then count the number of lines from the Kafka consumer
>>>> (kafka-console-consumer.sh --zookeeper localhost:2181 --topic MyTopic
>>>> --from-beginning)
>>>> However, I only get about 260K-270K, and this number of received
>> messages
>>>> changes for each test.
>>>> 
>>>> My configuration in the "config/server.properties" has some minor
>> change
>>>> compared to the original file:
>>>> 
>>>> log.retention.check.interval.hours=24
>>>> log.retention.hours=168
>>>> delete.topic.enable = true
>>>> 
>>>> The remaining configurations are the same as default value.
>>>> 
>>>> Could you please explain why the messages were lost in Kafka? And how
>> to
>>>> fix this problem please?
>>>> 
>>>> Thanks a lot.
>>>> 
>>>> Best regards
>>>> ,
>>>> Alex
>>> 
>> 



Re: Is this a bug or just unintuitive behavior?

2017-01-05 Thread James Cheng

> On Jan 5, 2017, at 12:57 PM, Jeff Widman  wrote:
> 
> Thanks James and Hans.
> 
> Will this also happen when we expand the number of partitions in a topic?
> 
> That also will trigger a rebalance, the consumer won't subscribe to the
> partition until the rebalance finishes, etc.
> 
> So it'd seem that any messages published to the new partition in between
> the partition creation and the rebalance finishing won't be consumed by any
> consumers that have offset=latest
> 

It hadn't occured to me until you mentioned it, but yes, I think it'd also 
happen in those cases.

In the kafka consumer javadocs, they provide a list of things that would cause 
a rebalance:
http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)
 
<http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.Collection,
 org.apache.kafka.clients.consumer.ConsumerRebalanceListener)>

"As part of group management, the consumer will keep track of the list of 
consumers that belong to a particular group and will trigger a rebalance 
operation if one of the following events trigger -

Number of partitions change for any of the subscribed list of topics
Topic is created or deleted
An existing member of the consumer group dies
A new member is added to an existing consumer group via the join API
"

I'm guessing that this would affect any of those scenarios.

-James


> 
> 
> 
> On Thu, Jan 5, 2017 at 12:40 AM, James Cheng  wrote:
> 
>> Jeff,
>> 
>> Your analysis is correct. I would say that it is known but unintuitive
>> behavior.
>> 
>> As an example of a problem caused by this behavior, it's possible for
>> mirrormaker to miss messages on newly created topics, even thought it was
>> subscribed to them before topics were creted.
>> 
>> See the following JIRAs:
>> https://issues.apache.org/jira/browse/KAFKA-3848 <
>> https://issues.apache.org/jira/browse/KAFKA-3848>
>> https://issues.apache.org/jira/browse/KAFKA-3370 <
>> https://issues.apache.org/jira/browse/KAFKA-3370>
>> 
>> -James
>> 
>>> On Jan 4, 2017, at 4:37 PM, h...@confluent.io wrote:
>>> 
>>> This sounds exactly as I would expect things to behave. If you consume
>> from the beginning I would think you would get all the messages but not if
>> you consume from the latest offset. You can separately tune the metadata
>> refresh interval if you want to miss fewer messages but that still won't
>> get you all messages from the beginning if you don't explicitly consume
>> from the beginning.
>>> 
>>> Sent from my iPhone
>>> 
>>>> On Jan 4, 2017, at 6:53 PM, Jeff Widman  wrote:
>>>> 
>>>> I'm seeing consumers miss messages when they subscribe before the topic
>> is
>>>> actually created.
>>>> 
>>>> Scenario:
>>>> 1) kafka 0.10.1.1 cluster with allow-topic no topics, but supports topic
>>>> auto-creation as soon as a message is published to the topic
>>>> 2) consumer subscribes using topic string or a regex pattern. Currently
>> no
>>>> topics match. Consumer offset is "latest"
>>>> 3) producer publishes to a topic that matches the string or regex
>> pattern.
>>>> 4) broker immediately creates a topic, writes the message, and also
>>>> notifies the consumer group that a rebalance needs to happen to assign
>> the
>>>> topic_partition to one of the consumers..
>>>> 5) rebalance is fairly quick, maybe a second or so
>>>> 6) a consumer is assigned to the newly-created topic_partition
>>>> 
>>>> At this point, we've got a consumer steadily polling the recently
>> created
>>>> topic_partition. However, the consumer.poll() never returns any messages
>>>> published between topic creation and when the consumer was assigned to
>> the
>>>> topic_partition. I'm guessing this may be because when the consumer is
>>>> assigned to the topic_partition it doesn't find any, so it uses the
>> latest
>>>> offset, which happens to be after the messages that were published to
>>>> create the topic.
>>>> 
>>>> This is surprising because the consumer technically was subscribed to
>> the
>>>> topic before the messages were produced, so you'd think the consumer
>> would
>>>> receive these messages.
>>>> 
>>>> Is this known behavior? A bug in Kafka broker? Or a bug in my client
>>>> library?
>> 
>> 



Re: Lost message with Kafka configuration

2017-01-05 Thread James Cheng
kafka-console-producer.sh defaults to acks=0, which means that the producer 
essentially throws messages at the broker and doesn't wait/retry to make sure 
they are properly received.

In the kafka-console-producer.sh usage text:
--request-required-acksrequests (default: 0)

Try re-running your test with "--request-required-acks -1" or  
"--request-required-acks all" (They are equivalent) This will tell the broker 
to wait for messages to be fully saved to all replicas before returning an 
acknowledgement to the producer. You can read more about acks in the producer 
configuration section of the kafka docs 
(http://kafka.apache.org/documentation/#producerconfigs 
)

-James

> On Jan 4, 2017, at 1:25 AM, Hoang Bao Thien  wrote:
> 
> Hi all,
> 
> I have a problem with losing messages from Kafka.
> The situation is as follows: I put a csv file with 286701 rows (size =
> 110MB)  into Kafka producer with command:
> $ cat test.csv | kafka-console-producer.sh --broker-list localhost:9092
> --topic MyTopic > /dev/null
> 
> and then count the number of lines from the Kafka consumer
> (kafka-console-consumer.sh --zookeeper localhost:2181 --topic MyTopic
> --from-beginning)
> However, I only get about 260K-270K, and this number of received messages
> changes for each test.
> 
> My configuration in the "config/server.properties" has some minor change
> compared to the original file:
> 
> log.retention.check.interval.hours=24
> log.retention.hours=168
> delete.topic.enable = true
> 
> The remaining configurations are the same as default value.
> 
> Could you please explain why the messages were lost in Kafka? And how to
> fix this problem please?
> 
> Thanks a lot.
> 
> Best regards
> ,
> Alex



Re: Is this a bug or just unintuitive behavior?

2017-01-05 Thread James Cheng
Jeff,

Your analysis is correct. I would say that it is known but unintuitive behavior.

As an example of a problem caused by this behavior, it's possible for 
mirrormaker to miss messages on newly created topics, even thought it was 
subscribed to them before topics were creted.

See the following JIRAs:
https://issues.apache.org/jira/browse/KAFKA-3848 

https://issues.apache.org/jira/browse/KAFKA-3370 


-James

> On Jan 4, 2017, at 4:37 PM, h...@confluent.io wrote:
> 
> This sounds exactly as I would expect things to behave. If you consume from 
> the beginning I would think you would get all the messages but not if you 
> consume from the latest offset. You can separately tune the metadata refresh 
> interval if you want to miss fewer messages but that still won't get you all 
> messages from the beginning if you don't explicitly consume from the 
> beginning.
> 
> Sent from my iPhone
> 
>> On Jan 4, 2017, at 6:53 PM, Jeff Widman  wrote:
>> 
>> I'm seeing consumers miss messages when they subscribe before the topic is
>> actually created.
>> 
>> Scenario:
>> 1) kafka 0.10.1.1 cluster with allow-topic no topics, but supports topic
>> auto-creation as soon as a message is published to the topic
>> 2) consumer subscribes using topic string or a regex pattern. Currently no
>> topics match. Consumer offset is "latest"
>> 3) producer publishes to a topic that matches the string or regex pattern.
>> 4) broker immediately creates a topic, writes the message, and also
>> notifies the consumer group that a rebalance needs to happen to assign the
>> topic_partition to one of the consumers..
>> 5) rebalance is fairly quick, maybe a second or so
>> 6) a consumer is assigned to the newly-created topic_partition
>> 
>> At this point, we've got a consumer steadily polling the recently created
>> topic_partition. However, the consumer.poll() never returns any messages
>> published between topic creation and when the consumer was assigned to the
>> topic_partition. I'm guessing this may be because when the consumer is
>> assigned to the topic_partition it doesn't find any, so it uses the latest
>> offset, which happens to be after the messages that were published to
>> create the topic.
>> 
>> This is surprising because the consumer technically was subscribed to the
>> topic before the messages were produced, so you'd think the consumer would
>> receive these messages.
>> 
>> Is this known behavior? A bug in Kafka broker? Or a bug in my client
>> library?



Why does consumer.subscribe(Pattern) require a ConsumerRebalanceListener?

2017-01-03 Thread James Cheng
Hi,

I was looking at the docs for the consumer, and noticed that when calling 
subscribe() with a regex Pattern, that you are required to pass in a 
ConsumerRebalanceListener. On the other hand, when you use a fixed set of topic 
names (Collection), the ConsumerRebalanceListener is optional (that is, 
there is a subscribe(Collection) that does not require a 
ConsumerRebalanceListener)

http://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#subscribe(java.util.regex.Pattern,%20org.apache.kafka.clients.consumer.ConsumerRebalanceListener)
 


Why does the regex one require a rebalance listener, whereas the fixed-topic 
one does not? Is it to force the user to think through what happens as new 
topic/partitions appear and disappear?

-James



When using mirrormaker, how are people creating topics?

2016-12-05 Thread James Cheng
Hi,

We are using mirrormaker to mirror topics from one cluster to another, and I 
wanted to get some advice from the community on how people are doing mirroring. 
In particular, how are people dealing with topic creation?

Do you turn on auto-topic creation in your destination clusters 
(auto.create.topics.enable=true)?

If not, do you manually create the individual destination topics?

If so, how does that work with mirroring based on a whitelist (regex)?

The way we are doing it right now is, we have our regex in a file somewhere. 
The regex is used in 2 ways:
1) Passed to mirrormaker, to do the mirroring.
2) Passed to a program which looks up all the topics on the source cluster, 
finds the ones that match the regex, and then creates them on the destination 
cluster. (We have auto-topic creation turned off 
auto.create.topics.enable=false)

One downside of that approach is there a potential race, where if the regex 
changes, then mirrormaker (in #1) might start trying to produce to a new 
destination topic before the topic was created (by #2).

Some other hand-wavy ideas that came to mind might be:
* handling topic creation in a MirrorMakerMessageHandler
* handling topic creation in an interceptor

Anyway, was hoping to get some thoughts from people who are already doing this.

Thanks!
-James



Re: [ANNOUNCE] New committer: Jiangjie (Becket) Qin

2016-10-31 Thread James Cheng
Congrats, Becket!

-James

> On Oct 31, 2016, at 10:35 AM, Joel Koshy  wrote:
> 
> The PMC for Apache Kafka has invited Jiangjie (Becket) Qin to join as a
> committer and we are pleased to announce that he has accepted!
> 
> Becket has made significant contributions to Kafka over the last two years.
> He has been deeply involved in a broad range of KIP discussions and has
> contributed several major features to the project. He recently completed
> the implementation of a series of improvements (KIP-31, KIP-32, KIP-33) to
> Kafka’s message format that address a number of long-standing issues such
> as avoiding server-side re-compression, better accuracy for time-based log
> retention, log roll and time-based indexing of messages.
> 
> Congratulations Becket! Thank you for your many contributions. We are
> excited to have you on board as a committer and look forward to your
> continued participation!
> 
> Joel



Re: Read all record from a Topic.

2016-07-13 Thread James Cheng
Jean-Baptiste,

I wrote a blog post recently on this exact subject.

https://logallthethings.com/2016/06/28/how-to-read-to-the-end-of-a-kafka-topic/

Let me know if you find it useful.

-James

Sent from my iPhone

> On Jul 13, 2016, at 7:16 AM, g...@netcourrier.com wrote:
> 
> Hi,
> 
> 
> I'm using a compacted Kafka Topic to save the state of my application. When 
> the application crashes/restarts I can restore its state by reading the Kafka 
> topic.
> 
> 
> 
> However I need to read it completely, especially up to most recent record, to 
> be sure to restore all data.
> 
> 
> 
> Is there a standard way to to that? I've checked the Kafka streams code and 
> found that the class ProcessorStateManager seems to be doing something 
> similar.
> 
> 
> 
> It first gets the last offset by doing:
> 
> // calculate the end offset of the partition // TODO: this is a bit hacky to 
> first seek then position to get the end offset
> 
> restoreConsumer.seekToEnd(singleton(storePartition));
> 
> long endOffset = restoreConsumer.position(storePartition); 
> 
> 
> 
> Then it polls the records until reaching the endoffset (there is also an 
> other limit but I think it is related to an other use case).
> 
> 
> 
> I guess it works, but the TODO message makes me wonder if it is a good 
> solution and if it will continue to work in future releases.
> 
> 
> 
> Thanks for your help,
> 
> 
> 
> Jean-Baptiste
> 
> 
> 
> 
> 


Re: kafka + autoscaling groups fuckery

2016-07-03 Thread James Cheng
Charity,

I'm not sure about the specific problem you are having, but about Kafka on AWS, 
Netflix did a talk at a meetup about their Kafka installation on AWS. There 
might be some useful information in there. There is a video stream as well as 
slides, and maybe you can get in touch with the speakers. Look in the comment 
section for links to the slides and video. 

Kafka at Netflix
http://www.meetup.com//http-kafka-apache-org/events/220355031/?showDescription=true

There's also a talk about running Kafka on Mesos, which might be relevant.

Kafka on Mesos
http://www.meetup.com//http-kafka-apache-org/events/222537743/?showDescription=true

-James

Sent from my iPhone

> On Jul 2, 2016, at 5:15 PM, Charity Majors  wrote:
> 
> Gwen, thanks for the response.
> 
> 1.1 Your life may be a bit simpler if you have a way of starting a new
> 
>> broker with the same ID as the old one - this means it will
>> automatically pick up the old replicas and you won't need to
>> rebalance. Makes life slightly easier in some cases.
> 
> Yeah, this is definitely doable, I just don't *want* to do it.  I really
> want all of these to share the same code path: 1) rolling all nodes in an
> ASG to pick up a new AMI, 2) hardware failure / unintentional node
> termination, 3) resizing the ASG and rebalancing the data across nodes.
> 
> Everything but the first one means generating new node IDs, so I would
> rather just do that across the board.  It's the solution that really fits
> the ASG model best, so I'm reluctant to give up on it.
> 
> 
>> 1.2 Careful not too rebalance too many partitions at once - you only
>> have so much bandwidth and currently Kafka will not throttle
>> rebalancing traffic.
> 
> Nod, got it.  This is def something I plan to work on hardening once I have
> the basic nut of things working (or if I've had to give up on it and accept
> a lesser solution).
> 
> 
>> 2. I think your rebalance script is not rebalancing the offsets topic?
>> It still has a replica on broker 1002. You have two good replicas, so
>> you are no where near disaster, but make sure you get this working
>> too.
> 
> Yes, this is another problem I am working on in parallel.  The Shopify
> sarama library  uses the
> __consumer_offsets topic, but it does *not* let you rebalance or resize the
> topic when consumers connect, disconnect, or restart.
> 
> "Note that Sarama's Consumer implementation does not currently support
> automatic consumer-group rebalancing and offset tracking"
> 
> I'm working on trying to get the sarama-cluster to do something here.  I
> think these problems are likely related, I'm not sure wtf you are
> *supposed* to do to rebalance this god damn topic.  It also seems like we
> aren't using a consumer group which sarama-cluster depends on to rebalance
> a topic.  I'm still pretty confused by the 0.9 "consumer group" stuff.
> 
> Seriously considering downgrading to the latest 0.8 release, because
> there's a massive gap in documentation for the new stuff in 0.9 (like
> consumer groups) and we don't really need any of the new features.
> 
> A common work-around is to configure the consumer to handle "offset
>> out of range" exception by jumping to the last offset available in the
>> log. This is the behavior of the Java client, and it would have saved
>> your consumer here. Go client looks very low level, so I don't know
>> how easy it is to do that.
> 
> Erf, this seems like it would almost guarantee data loss.  :(  Will check
> it out tho.
> 
> If I were you, I'd retest your ASG scripts without the auto leader
>> election - since your own scripts can / should handle that.
> 
> Okay, this is straightforward enough.  Will try it.  And will keep tryingn
> to figure out how to balance the __consumer_offsets topic, since I
> increasingly think that's the key to this giant mess.
> 
> If anyone has any advice there, massively appreciated.
> 
> Thanks,
> 
> charity.


Re: Halting because log truncation is not allowed for topic __consumer_offsets

2016-06-26 Thread James Cheng
Peter, can you add some of your observations to those JIRAs? You seem to have a 
good understanding of the problem. Maybe there is something that can be 
improved in the codebase to prevent this from happening, or reduce the impact 
of it.

Wanny, you might want to add a "me too" to the JIRAs as well.

FYI, I was the original filer of one of KAFKA-3410. My repro only talks about 
the case where a broker was completely wiped, but we've seen it happen during 
normal operation as well.

-James

Sent from my iPhone

> On Jun 26, 2016, at 1:41 PM, Peter Davis  wrote:
> 
> We have seen this several times and it's quite frustrating.  It seems to 
> happen due to the fact that the leader for a partition writes to followers 
> ahead of committing itself, especially for a topic like __consumer_offsets 
> that is written with acks=all.  If a brief network interruption occurs (as 
> seems to happen quite regularly for us in a virtualized environment), for a 
> low-to-medium-throughput topic like __consumer_offsets, the follower may 
> recover "too quickly" -- it has more messages than the leader because it was 
> written ahead, but recovers before enough more messages are written to the 
> leader and remaining ISR such that the leader gains a higher high water mark. 
>  So the replica halts due to a supposed unclean leader election.  Usually, 
> just waiting a minute before restarting the halted broker solves the issue: 
> more messages have been written, the leader has a higher HW, and the replica 
> will happily truncate itself and recover.  At least, that is my theory -- 
> it's been a journey understanding Kafka's details well enough!
> 
> This happens with 0.10.0 and occurs even with min.insync.replicas=2 (majority 
> of 3 replicas).  In fact the problem can be amplified by setting min.isr: if 
> fewer than minimum replicas are available, then it can be impossible to write 
> more messages to the leader as above, so the only way to recover is to delete 
> data files from the halted follower.  Similar for very low-throughput topics. 
>  At the same time, without min.insync.replicas enforcing a quorum, the risk 
> of a true unclean leader election or data loss is increased -- a double edged 
> sword!
> 
> It seems related to https://issues.apache.org/jira/browse/KAFKA-3410 or 
> https://issues.apache.org/jira/browse/KAFKA-3861 but happens even under 
> normal conditions (no data loss required!)
> 
> Anyone else have suggestions?  **Brokers halting due to a simple network 
> hiccup is, shall we say, not good!**
> 
> -Peter
> 
> 
>> On Jun 25, 2016, at 12:28 PM, Morellato, Wanny  
>> wrote:
>> 
>> Hi all,
>> 
>> My kafka brokers (0.9.0.1) are refusing to restart and they return the 
>> following error
>> 
>> Halting because log truncation is not allowed for topic __consumer_offsets, 
>> Current leader 11's latest offset 79445540 is less than replica 13's latest 
>> offset 79445565 (kafka.server.ReplicaFetcherThread)
>> 
>> Deleting the topic __consumer_offsets from those servers seam to fix the 
>> problem…
>> 
>> From what I understand this should result in some duplicate delivery…
>> If that is the case, is it possible to configure kafka in a way that it will 
>> automatically recover from this type of failure?
>> 
>> Thanks
>> 
>> Wanny
>> 
>> 
>> 
>> 
>> This e-mail message is authorized for use by the intended recipient only and 
>> may contain information that is privileged and confidential. If you received 
>> this message in error, please call us immediately at (425) 590-5000 and ask 
>> to speak to the message sender. Please do not copy, disseminate, or retain 
>> this message unless you are the intended recipient. In addition, to ensure 
>> the security of your data, please do not send any unencrypted credit card or 
>> personally identifiable information to this email address. Thank you.


Re: 10MB message

2016-06-15 Thread James Cheng
Igor,

This article talks about what to think about if putting large messages into 
Kafka: http://ingest.tips/2015/01/21/handling-large-messages-kafka/

The summary is that Kafka is not optimized for handling large messages, but if 
you really want to, it's possible to do it.

That website is having issues right now, so it may take a (long) while to load. 
I've notified the website owner to let her know.

-James

> On Jun 14, 2016, at 1:33 AM, R Krishna  wrote:
> 
> There are options to compress on the wire and in the topic.
> 
> On Tue, May 31, 2016 at 8:35 AM, Igor Kravzov 
> wrote:
> 
>> In our system some data can be as big as 10MB.
>> Is it OK to send 10 MB message through Kafka?  What configuration
>> parameters should I check/set?
>> It is going to be one topic with one consumer - Apache NiFi GetKafka
>> processor.
>> Is one partition enough?
>> 
> 
> 
> 
> -- 
> Radha Krishna, Proddaturi
> 253-234-5657



Re: Will segments on no-traffic topics get deleted/compacted?

2016-05-24 Thread James Cheng
I ran some tests, and also looked through the code. log.roll.ms isn't 
sufficient.

Segments appear to get rolled only if

log.roll.ms or segment.ms has passed
log.segment.bytes has passed

But those rules are only evaluated when a new record is appended to the log. So 
it appears that for in-active topics, segments will never roll.

See 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L383
and
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L617-L634

Tom, does that sound right?

-James


> On May 20, 2016, at 6:05 AM, Tom Crayford  wrote:
> 
> Hi there,
> 
> The missing piece is the config `log.roll.hours` or it's alternative `
> log.roll.ms`. Log segments are by default rolled once a week, regardless of
> activity, but you can tune that down as you like.
> 
> Thanks
> 
> Tom Crayford
> Heroku Kafka
> 
> On Fri, May 20, 2016 at 12:49 AM, James Cheng  wrote:
> 
>> Time-based log retention only happens on old log segments. And log
>> compaction only happens on old segments as well.
>> 
>> Currently, I believe segments only roll whenever a new record is written
>> to the log. That is, during the write of the new record is when the current
>> segment is evaluated to see if it should be rolled. Is that true?
>> 
>> That means that if there is *no* traffic on a topic, that the messages on
>> disk may persist past the log retention time, or past the log compaction
>> time. Is that the case? If so, is there any way to trigger rolling of a
>> segment without active traffic on the topic?
>> 
>> Thanks!
>> -James
>> 
>> 



Will segments on no-traffic topics get deleted/compacted?

2016-05-19 Thread James Cheng
Time-based log retention only happens on old log segments. And log compaction 
only happens on old segments as well.

Currently, I believe segments only roll whenever a new record is written to the 
log. That is, during the write of the new record is when the current segment is 
evaluated to see if it should be rolled. Is that true?

That means that if there is *no* traffic on a topic, that the messages on disk 
may persist past the log retention time, or past the log compaction time. Is 
that the case? If so, is there any way to trigger rolling of a segment without 
active traffic on the topic?

Thanks!
-James



Do consumer offsets stored in zookeeper ever get cleaned up?

2016-05-19 Thread James Cheng
I know that when offsets get stored in Kafka, they get cleaned up based on the 
offsets.retention.minutes config setting. This happens when using the new 
consumer, or when using the old consumer but offsets.storage=kafka.

If using the old consumer where offsets are stored in Zookeeper, do old offsets 
ever get removed?

Thanks,
-James



Re: unknown (kafka) offsets after restart

2016-05-06 Thread James Cheng
Is the log compaction thread correctly working? The offsets are stored in a log 
compacted topic, and we have seen issues where the log cleaner thread dies and 
therefore the offsets topic just grows forever, which means it will take a long 
time to read in the topic.

You can look in the log-cleaner.log debuglog file to see if there are any error 
messages there.

-James


> On May 6, 2016, at 6:28 AM, Jörg Wagner  wrote:
> 
> After a bit more looking into this we found out that the Offsetmanager is 
> single threaded and due to our setup (few, powerful servers: rather bad for 
> kafka I know..) it seems we are limiting ourselves severely by using kafka 
> offsets.
> 
> Any more insight is still welcome of course.
> 
> 
>  Forwarded Message 
> Subject:  unknown (kafka) offsets after restart
> Date: Fri, 6 May 2016 14:12:24 +0200
> From: Jörg Wagner 
> Reply-To: users@kafka.apache.org
> To:   users@kafka.apache.org
> 
> 
> 
> We're using Kafka 0.8.2 and are puzzled by the offset behaviour when
> they are stored in kafka topics.
> 
> Upon restart of the Kafka cluster (e.g. due to reconfiguration) it can
> happen that the offsets are unknown and therefore stop consumers from
> consuming without knowing their offset.
> 
> kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group group
> --zookeeper localhost:2181
> Could not fetch offset for [topic,19] due to
> kafka.common.OffsetsLoadInProgressException.
> 
> Group   Topic  Pid Offset
> logSize Lag Owner
> group   topic0   unknown 326606  unknown none
> 
> I currently have no other solution to this than to wait.. but it takes a
> very long time (hours.. the servers are hopelessly underutilized!), I
> would be grateful for any advice.
> 
> Thanks
> Jörg
> 
> 
> 



Re: Consumers disappearing form __consumer_offsets

2016-04-11 Thread James Cheng
This may be related to offsets.retention.minutes.

offsets.retention.minutes
Log retention window in minutes for offsets topic

It defaults to 1440 minutes = 24 hours.

-James

> On Apr 11, 2016, at 1:36 PM, Morellato, Wanny  
> wrote:
>
> Hi,
>
> I am trying to figure out why some of my consumers disappears from the list 
> of active consumers…
> This is happening in my QA environment where sometimes no messages get 
> published over the weekend.
>
> I am wondering if it is related to the default 24 hours 
> log.cleaner.delete.retention.ms
> If that is the case… what would be the best way to increase that value just 
> for the __consumer_offsets topic?
>
>
> Thanks
>
> Wanny
>
>
> 
>
> This e-mail message is authorized for use by the intended recipient only and 
> may contain information that is privileged and confidential. If you received 
> this message in error, please call us immediately at (425) 590-5000 and ask 
> to speak to the message sender. Please do not copy, disseminate, or retain 
> this message unless you are the intended recipient. In addition, to ensure 
> the security of your data, please do not send any unencrypted credit card or 
> personally identifiable information to this email address. Thank you.




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Reg. Partition Rebalancing

2016-03-29 Thread James Cheng

> On Mar 29, 2016, at 10:33 AM, Todd Palino  wrote:
>
> There’s two things that people usually mean when they talk about
> rebalancing.
>
> One is leader reelection, or preferred replica election, which is sometimes
> confusingly referred to as “leader rebalance”. This is when we ask the
> controller in the cluster to pick the preferred replica for all partitions
> and change which broker is the leader (as appropriate). This is very useful
> when you have to take a broker offline for maintenance, as you can quickly
> make it take over leadership for partitions once it is back online. The
> controller picks the preferred leader pretty simply: the replica list is an
> array, and the controller picks the first broker listed in the array that
> is currently in sync as preferred. This means that the PLE is
> deterministic, and will always give you the same partition leadership
> (assuming the replicas are in sync).
>
> There is an admin CLI command to trigger the preferred replica election
> manually. There is also a broker configuration
> “auto.leader.rebalance.enable” which you can set to have the broker
> automatically perform the PLE when needed. DO NOT USE THIS OPTION. There
> are serious performance issues when doing so, especially on larger
> clusters. It needs some development work that has not been fully identified
> yet.
>

Todd,

What do you mean specifically by "serious performance issues"? I know that if 
you enable "auto.leader.rebalance.enable", then the broker(s) will do the 
reassignment whenever they want, at a time you can't predict. And the 
reassignment can move a lot of data around the cluster, and therefore it is 
undesireable to move so much data around at unpredictable times.

Is that the main performance issue you were talking about, or was there 
something else?

-James


> The other thing we mean by rebalance is partition rebalancing, or changing
> which brokers are replicas for a given partition to spread out the
> partitions according to some algorithm. This is something that you want to
> do when you add a broker to a cluster (or remove it), to redistribute load
> within the cluster. It’s also useful periodically to make sure you have a
> good spread of load, especially as topics change patterns (like ramping new
> features).
>
> While there are admin tools to perform partition reassignments, the brokers
> are not yet that great about rebalancing partitions. There is also
> currently no automated way of doing this, which is OK because it involves
> moving a lot of data around. Internally at LinkedIn we have some scripts we
> use for more intelligently balancing partitions to assure even balances
> based on a number of criteria. I’m hoping to have more to say about this
> later this week.
>
> -Todd
>
>
> On Tue, Mar 29, 2016 at 7:27 AM, Srikanth Chandika 
> wrote:
>
>> Hi,
>>
>> I am new to kafka I am testing all the options in kafka.
>> I am confused about the re-balancing?
>> How and where to configure the re-balancing option?
>>
>> Regards,
>> Srikanth
>>
>
>
>
> --
> *—-*
> *Todd Palino*
> Staff Site Reliability Engineer
> Data Infrastructure Streaming
>
>
>
> linkedin.com/in/toddpalino




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: kafka 0.9.0.1: FATAL exception on startup

2016-03-22 Thread James Cheng
Hi, we ran into this problem too. The only way we were able to bypass this was 
by stopping Kafka and deleting the log directory of the affected partition. 
Which means, we lost data for that partition on this broker.

-James

> On Mar 8, 2016, at 1:07 AM, Anatoly Deyneka  wrote:
>
> Hi,
>
> I need your advice how to start server in the next situation:
> It fails on startup with FATAL error:
> [2016-03-07 16:30:53,495] FATAL Fatal error during KafkaServerStartable 
> startup. Prepare to shutdown (kafka.server.KafkaServerStartable)
> kafka.common.InvalidOffsetException: Attempt to append an offset (211046544) 
> to position 40048 no larger than the last offset appended (211046546) to 
> xyz/000210467262.index.
>at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>at kafka.log.LogSegment.recover(LogSegment.scala:188)
>at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:778)
>at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:777)
>at kafka.log.Log.loadSegments(Log.scala:160)
>at kafka.log.Log.(Log.scala:90)
>at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>at java.lang.Thread.run(Thread.java:744)
>
> http://stackoverflow.com/questions/35849673/kafka-0-9-0-1-fails-with-fatal-exception-on-startup
>
> Thanks,
> Anatoly
> 
>
> The information contained in this email is strictly confidential and for the 
> use of the addressee only, unless otherwise indicated. If you are not the 
> intended recipient, please do not read, copy, use or disclose to others this 
> message or any attachment. Please also notify the sender by replying to this 
> email or by telephone (+44 (0)20 7896 0011) and then delete the email and any 
> copies of it. Opinions, conclusions (etc) that do not relate to the official 
> business of this company shall be understood as neither given nor endorsed by 
> it. IG Group Holdings plc is a company registered in England and Wales under 
> number 04677092. VAT registration number 761 2978 07. Registered Office: 
> Cannon Bridge House, 25 Dowgate Hill, London EC4R 2YA. Listed on the London 
> Stock Exchange. Its subsidiaries IG Markets Limited and IG Index Limited are 
> authorised and regulated by the Financial Conduct Authority (IG Markets 
> Limited FCA registration number 195355 and IG Index Limited FCA registration 
> number 114059).-




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Multi-threaded consumer?

2016-03-22 Thread James Cheng
Here's a good introductory blog post on the 0.9.0 consumer:

http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client

It shows the basics of using the consumer, as well as a section where they 
launch 3 threads, each with one consumer, to consume a single topic.

-James

> On Mar 22, 2016, at 5:21 PM, BYEONG-GI KIM  wrote:
>
> Hello.
>
> I'd like to know how to implement a multi-threaded consumer, which retrieve
> message(s) from a topic per thread.
>
> I read the Kafka Consumer 0.9.0.1 API document from
> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html,
> and I copied and pasted the example source code from here. And I tried to
> manage threads via ThreadPoolTaskExecutor, but exception occurrd while
> executing multi-thread.
>
> The API document mentioned that multi-threaded access should be properly
> synchronized, but I think the example code seems missing something for
> that. My understanding of Kafka is probably bad so that I guess I may be
> using the API and functions of kafka wrongly...
>
>
> Could you give me a good sample code how to develop the multi-threaded
> consumer or any advice? Do I need to use commitSync() method somewhere to
> solve the problem?
>
> Thanks in advance!
>
> Best regards
>
> bgkim




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


What happens if controlled shutdown can't complete within controlled.shutdown.max.retries attempts?

2016-03-20 Thread James Cheng
The broker has the following parameters related to controlled shutdown:

controlled.shutdown.enable  Enable controlled shutdown of the server
boolean truemedium
controlled.shutdown.max.retries Controlled shutdown can fail for multiple 
reasons. This determines the number of retries when such failure happens  
int 3   medium
controlled.shutdown.retry.backoff.msBefore each retry, the system needs 
time to recover from the state that caused the previous failure (Controller 
fail over, replica lag etc). This config determines the amount of time to wait 
before retrying. long5000medium


If the broker attempts controlled shutdown and then fails, and this happens 
controlled.shutdown.max.retries number of times, what happens? Does it go back 
into active service and continue participating in the cluster, as if controlled 
shutdown never happened? Or does it do an uncontrolled shutdown?

Thanks,
-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Questions about unclean leader election and "Halting because log truncation is not allowed"

2016-03-15 Thread James Cheng
Anthony,

I filed https://issues.apache.org/jira/browse/KAFKA-3410 to track this.

-James

> On Feb 25, 2016, at 2:16 PM, Anthony Sparks  
> wrote:
>
> Hello James,
>
> We received this exact same error this past Tuesday (we are on 0.8.2).  To
> answer at least one of your bullet points -- this is a valid scenario. We
> had the same questions, I'm starting to think this is a bug -- thank you
> for the reproducing steps!
>
> I looked over the Release Notes to see if maybe there were some fixes in
> newer versions -- this bug fix looked the most related:
> https://issues.apache.org/jira/browse/KAFKA-2143
>
> Thank you,
>
> Tony
>
> On Thu, Feb 25, 2016 at 3:46 PM, James Cheng  wrote:
>
>> Hi,
>>
>> I ran into a scenario where one of my brokers would continually shutdown,
>> with the error message:
>> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting
>> because log truncation is not allowed for topic test, Current leader 1's
>> latest offset 0 is less than replica 2's latest offset 151
>> (kafka.server.ReplicaFetcherThread)
>>
>> I managed to reproduce it with the following scenario:
>> 1. Start broker1, with unclean.leader.election.enable=false
>> 2. Start broker2, with unclean.leader.election.enable=false
>>
>> 3. Create topic, single partition, with replication-factor 2.
>> 4. Write data to the topic.
>>
>> 5. At this point, both brokers are in the ISR. Broker1 is the partition
>> leader.
>>
>> 6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2
>> gets dropped out of ISR. Broker1 is still the leader. I can still write
>> data to the partition.
>>
>> 7. Shutdown Broker1. Hard or controlled, doesn't matter.
>>
>> 8. rm -rf the log directory of broker1. (This simulates a disk replacement
>> or full hardware replacement)
>>
>> 9. Resume broker2. It attempts to connect to broker1, but doesn't succeed
>> because broker1 is down. At this point, the partition is offline. Can't
>> write to it.
>>
>> 10. Resume broker1. Broker1 resumes leadership of the topic. Broker2
>> attempts to join ISR, and immediately halts with the error message:
>> [2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting
>> because log truncation is not allowed for topic test, Current leader 1's
>> latest offset 0 is less than replica 2's latest offset 151
>> (kafka.server.ReplicaFetcherThread)
>>
>> I am able to recover by setting unclean.leader.election.enable=true on my
>> brokers.
>>
>> I'm trying to understand a couple things:
>> * Is my scenario a valid supported one, or is it along the lines of "don't
>> ever do that"?
>> * In step 10, why is broker1 allowed to resume leadership even though it
>> has no data?
>> * In step 10, why is it necessary to stop the entire broker due to one
>> partition that is in this state? Wouldn't it be possible for the broker to
>> continue to serve traffic for all the other topics, and just mark this one
>> as unavailable?
>> * Would it make sense to allow an operator to manually specify which
>> broker they want to become the new master? This would give me more control
>> over how much data loss I am willing to handle. In this case, I would want
>> broker2 to become the new master. Or, is that possible and I just don't
>> know how to do it?
>> * Would it be possible to make unclean.leader.election.enable to be a
>> per-topic configuration? This would let me control how much data loss I am
>> willing to handle.
>>
>> Btw, the comment in the source code for that error message indicates:
>>
>> https://github.com/apache/kafka/blob/01aeea7c7bca34f1edce40116b7721335938b13b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L164-L166
>>
>>  // Prior to truncating the follower's log, ensure that doing so is
>> not disallowed by the configuration for unclean leader election.
>>  // This situation could only happen if the unclean election
>> configuration for a topic changes while a replica is down. Otherwise,
>>  // we should never encounter this situation since a non-ISR leader
>> cannot be elected if disallowed by the broker configuration.
>>
>> But I don't believe that happened. I never changed the configuration. But
>> I did venture into "unclean leader election" territory, so I'm not sure if
>> the comment still applies.
>>
>> Thanks,
>> -James
>>
>>
>>
>> 
>>
>>

Re: Uneven GC behavior between nodes

2016-03-05 Thread James Cheng
Your partitions are balanced, but is your data being evenly written across all 
the partitions? How are you producing data? Are you producing them with keys? 
Is it possible that the majority of the messages being written to just a few 
partitions, and so the brokers for those partitions are seeing more load than 
the others?

-James

> On Mar 4, 2016, at 9:15 AM, Cees de Groot  wrote:
>
> We're seeing something funny in one of our production clusters that I
> cannot explain away. Everything works fine, but as we're ramping up on
> Kafka, I really want to get at the root cause before we push a ton of
> traffic through it :)
>
> We have 6 nodes over three DCs in the cluster. Currently it's running a
> light load of two topics, one with small (KB) messages, one with variable
> sized (KB-MB) messages, both with 64 partitions and 3 replicas. All topics,
> including __consumer_offsets, have been rebalanced with a script we wrote
> to make sure that the replicas are spread out over the three datacenters
> and that leadership is evenly balanced, so we can continue to operate if we
> lose one DC. Producers use Consul to find an initial broker (round-robin
> through the local DC), Consumers use the 0.9.0.1 client.
>
> The funny thing is that in each DC, one broker graphs "normal" JVM heap
> behavior - a sawtooth of the expected garbage creation/collection cycle.
> The other one essentially stays flat. The flat-lining brokers also show
> less incoming traffic when graphing the OS' received bytes. Everything else
> - incoming, outgoing messages, etcetera, shows up as essentially the same
> on the graphs.
>
> I've been digging around for a bit, but can't find anything obvious that
> would cause the differences in memory pressure. Assuming that Kafka brokers
> pre-allocate buffers, I'd expect not too much garbage being generated. Is
> the flatline the expected behavior and the sawtooth the unexpected one?
> What could cause the difference?
>
> Thanks for any pointers :-)
>
>
> --
>
> *Cees de Groot*
> PRINCIPAL SOFTWARE ENGINEER
> [image: PagerDuty logo] 
> pagerduty.com
> c...@pagerduty.com 
> +1(416)435-4085
>
> [image: Twitter] [image: FaceBook]
> [image: Google+]
> [image: LinkedIn]
> [image: Blog]
> 




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Writing a Producer from Scratch

2016-03-03 Thread James Cheng
Stephen,

There is a mailing list for kafka client developers that you may find useful: 
https://groups.google.com/forum/#!forum/kafka-clients

The d...@kafka.apache.org mailing list might also 
be a good resource: http://kafka.apache.org/contact.html

Lastly, do you have any way to do HTTP calls on your platform? There exist some 
REST servers that you speak HTTP to and then they will produce to Kafka on your 
behalf. Here is one: http://docs.confluent.io/2.0.1/kafka-rest/docs/index.html

-James

On Mar 3, 2016, at 2:47 AM, Hopson, Stephen 
mailto:stephen.hop...@gb.unisys.com>> wrote:

Hi,
Not sure if this is the right forum for this question, but if it not I’m sure 
someone will direct me to the proper one.
Also, I am new to Kafka (but not new to computers).

I want to write a kafka producer client for a Unisys OS 2200 mainframe. I need 
to write it in C, and since I have no access to Windows / Unix / Linux 
libraries, I have to develop the interface at the lowest level.

So far, I have downloaded a kafka server with associated zookeeper (kafka 
_2.10-0.8.2.2). Note I have downloaded the Windows version and have it running 
on my laptop, successfully tested on the same laptop with the provided provider 
and consumer clients.

I have developed code to open a TCP session to the kafka server which appears 
to work and I have attempted to send a metadata request which does not appear 
to work. When I say it does not appear to work, I mean that I send the message 
and then I sit on a retrieve, which eventually times out ( I do seem to get one 
character in the receive buffer of 0235 octal). The message format I am using 
is the one described by the excellent document by Jay Creps / Gwen Shapira 
athttps://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  However, it is not clear what level of kafka these message formats are 
applicable for.

Can anybody offer me any advice or suggestions as to how to progress?

PS is the CRC mandatory in the Producer messages?
Many thanks in advance.

Stephen Hopson | Infrastructure Architect | Enterprise Solutions
Unisys | +44 (0)1908 805010 | +44 (0)7557 303321 | 
stephen.hop...@gb.unisys.com



THIS COMMUNICATION MAY CONTAIN CONFIDENTIAL AND/OR OTHERWISE PROPRIETARY 
MATERIAL and is for use only by the intended recipient. If you received this in 
error, please contact the sender and delete the e-mail and its attachments from 
all devices.




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: About the number of partitions

2016-03-02 Thread James Cheng
Kim,

Here's a good blog post from Confluent with advice on how to choose the number 
of partitions.

http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

-James


> On Mar 1, 2016, at 4:11 PM, BYEONG-GI KIM  wrote:
>
> Hello.
>
> I have questions about how many partitions are optimal while using kafka.
> As far as I know, even if there are multiple consumers that belong to a
> consumer group, say *group_A*, only one consumer can receive a kafka
> message produced by a producer if there is a partition. So, as a result,
> multiple partitions are required in order to distribute the message to all
> the consumers in group_A if I want the consumers to get the message.
>
> Is it right?
>
> I'm considering developing several kafka consumer applications, e.g.,
> message saver, message analyzer, etc., so a message from a producer must be
> consumed by those kinds of consumers.
>
> Any advice and help would be really appreciated.
>
> Thanks in advance!
>
> Best regards
>
> Kim




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Unavailable partitions (Leader: -1 and ISR is empty) and we can't figure out how to get them back online

2016-03-01 Thread James Cheng
Hi,

We have 44 partitions in our cluster that are unavailable. kafka-topics.sh is 
reporting them with Leader: -1, and with no brokers in the ISR. Zookeeper says 
that broker 5 should be the partition leader for this topic partition. These 
are topics with replication-factor 1. Most of the topics have little to no data 
in them, so they are low-traffic topics. We currently cannot produce to them. 
And I can't find anything in the logs that seems to explain why the broker is 
not taking the partitions back online. Can anyone help?

Relevant log lines are attached below.

Questions:
* What does Leader: -1 mean?
* Why doesn't the broker take the partition back online?
* Is there more debugging/logging that I can turn on?
* unclean.leader.election.enable=false right now, although during a previous 
boot of the broker, we set it to true to get some partitions back online. These 
ones never came back online.

Thanks,
-James

in zookeeper
---
$ get /brokers/topics/the.topic.name
{"version":1,"partitions":{"0":[5]}}

server.log

[2016-03-01 06:29:13,869] WARN Found an corrupted index file, 
/TivoData/kafka/the.topic.name-0/.index, deleting and 
rebuilding index... (kafka.log.Log)
[2016-03-01 06:29:13,870] INFO Recovering unflushed segment 0 in log 
the.topic.name-0. (kafka.log.Log)
[2016-03-01 06:29:13,870] INFO Completed load of log the.topic.name-0 with log 
end offset 0 (kafka.log.Log)


state-change.log
-
state-change.log.2016-03-01-06:[2016-03-01 06:34:20,498] TRACE Broker 5 cached 
leader info 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 for partition [the.topic.name,0] in response to UpdateMetadata request sent by 
controller 2 epoch 20 with correlation id 0 (state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:20,695] TRACE Broker 5 
received LeaderAndIsr request 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 correlation id 1 from controller 2 epoch 20 for partition [the.topic.name,0] 
(state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:20,957] TRACE Broker 5 
handling LeaderAndIsr request correlationId 1 from controller 2 epoch 20 
starting the become-follower transition for partition [the.topic.name,0] 
(state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:23,531] ERROR Broker 5 
received LeaderAndIsrRequest with correlation id 1 from controller 2 epoch 20 
for partition [the.topic.name,0] but cannot become follower since the new 
leader -1 is unavailable. (state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:30,075] TRACE Broker 5 
completed LeaderAndIsr request correlationId 1 from controller 2 epoch 20 for 
the become-follower transition for partition [the.topic.name,0] 
(state.change.logger)
state-change.log.2016-03-01-06:[2016-03-01 06:34:30,458] TRACE Broker 5 cached 
leader info 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 for partition [the.topic.name,0] in response to UpdateMetadata request sent by 
controller 2 epoch 20 with correlation id 2 (state.change.logger)

state-change.log on the controller:
[2016-03-01 06:34:15,077] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 2 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,145] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 5 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,200] TRACE Broker 2 cached leader info 
(LeaderAndIsrInfo:(Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20),ReplicationFactor:1),AllReplicas:5)
 for partition [the.topic.name,0] in response to UpdateMetadata request sent by 
controller 2 epoch 20 with correlation id 9144 (state.change.logger)
[2016-03-01 06:34:15,276] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 4 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,418] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 1 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,484] TRACE Controller 2 epoch 20 sending UpdateMetadata 
request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to broker 3 for 
partition the.topic.name-0 (state.change.logger)
[2016-03-01 06:34:15,585] TRACE Controller 2 epoch 20 changed state of replica 
5 for partition [the.topic.name,0] from OfflineReplica to OnlineReplica 
(state.change.logger)
[2016-03-01 06:34:15,606] TRACE Controller 2 epoch 20 sending become-follower 
LeaderAndIsr request (Leader:-1,ISR:,LeaderEpoch:56,ControllerEpoch:20) to 
broker 5 for partition [the.topic.name,0] (state.change.logger)
[2016-03-01 06

Re: Kafka Rest Proxy

2016-03-01 Thread James Cheng
Jan,

I don't use the rest proxy, but Confluent has a mailing list where you can 
probably get more info:

Here's the direct link: 
https://groups.google.com/forum/#!forum/confluent-platform

And it is linked off of here: http://www.confluent.io/developer#documentation

-James

> On Mar 1, 2016, at 3:25 AM, Jan Omar  wrote:
>
> Hey guys,
>
> Is someone using the kafka rest proxy from confluent?
>
> We have an issue, that all messages for a certain topic end up in the same 
> partition. Has anyone faced this issue before? We're not using a custom   
> partitioner class, so it's using the default partitioner. We're sending 
> messages without a specific partition and without a key, like this:
>
> curl -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data 
> '{"records":[{"value":{"foo":"bar"}}]}' "http://x.x.x.x:8082/topics/testme";
>
> and yet for some reason every message ends up in partition 11...
>
> It's a test topic with 30 partitions on 3 brokers and our rest (producer) 
> config is very simple:
>
> id=1
> zookeeper.connect=zookeeper-four.acc:2181...etc
>
> Any help would be appreciated.
>
> Thanks!
>
>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Questions about unclean leader election and "Halting because log truncation is not allowed"

2016-02-25 Thread James Cheng
Hi,

I ran into a scenario where one of my brokers would continually shutdown, with 
the error message:
[2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because log 
truncation is not allowed for topic test, Current leader 1's latest offset 0 is 
less than replica 2's latest offset 151 (kafka.server.ReplicaFetcherThread)

I managed to reproduce it with the following scenario:
1. Start broker1, with unclean.leader.election.enable=false
2. Start broker2, with unclean.leader.election.enable=false

3. Create topic, single partition, with replication-factor 2.
4. Write data to the topic.

5. At this point, both brokers are in the ISR. Broker1 is the partition leader.

6. Ctrl-Z on broker2. (Simulates a GC pause or a slow network) Broker2 gets 
dropped out of ISR. Broker1 is still the leader. I can still write data to the 
partition.

7. Shutdown Broker1. Hard or controlled, doesn't matter.

8. rm -rf the log directory of broker1. (This simulates a disk replacement or 
full hardware replacement)

9. Resume broker2. It attempts to connect to broker1, but doesn't succeed 
because broker1 is down. At this point, the partition is offline. Can't write 
to it.

10. Resume broker1. Broker1 resumes leadership of the topic. Broker2 attempts 
to join ISR, and immediately halts with the error message:
[2016-02-25 00:29:39,236] FATAL [ReplicaFetcherThread-0-1], Halting because log 
truncation is not allowed for topic test, Current leader 1's latest offset 0 is 
less than replica 2's latest offset 151 (kafka.server.ReplicaFetcherThread)

I am able to recover by setting unclean.leader.election.enable=true on my 
brokers.

I'm trying to understand a couple things:
* Is my scenario a valid supported one, or is it along the lines of "don't ever 
do that"?
* In step 10, why is broker1 allowed to resume leadership even though it has no 
data?
* In step 10, why is it necessary to stop the entire broker due to one 
partition that is in this state? Wouldn't it be possible for the broker to 
continue to serve traffic for all the other topics, and just mark this one as 
unavailable?
* Would it make sense to allow an operator to manually specify which broker 
they want to become the new master? This would give me more control over how 
much data loss I am willing to handle. In this case, I would want broker2 to 
become the new master. Or, is that possible and I just don't know how to do it?
* Would it be possible to make unclean.leader.election.enable to be a per-topic 
configuration? This would let me control how much data loss I am willing to 
handle.

Btw, the comment in the source code for that error message indicates:
https://github.com/apache/kafka/blob/01aeea7c7bca34f1edce40116b7721335938b13b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala#L164-L166

  // Prior to truncating the follower's log, ensure that doing so is not 
disallowed by the configuration for unclean leader election.
  // This situation could only happen if the unclean election configuration 
for a topic changes while a replica is down. Otherwise,
  // we should never encounter this situation since a non-ISR leader cannot 
be elected if disallowed by the broker configuration.

But I don't believe that happened. I never changed the configuration. But I did 
venture into "unclean leader election" territory, so I'm not sure if the 
comment still applies.

Thanks,
-James





This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Discrepancy between JMX OfflinePartitionCount and kafka-topics.sh?

2016-02-09 Thread James Cheng
I ran into kind of a similar discrepancy, but about UnderReplicatedPartitions.

kafka-topics.sh and zookeeper were saying that we had underreplicated 
partitions.

But JMX said that there were none. I took one of the partitions that ZK was 
saying was under-replicated and I ran DumpLogSegments on the leader broker and 
the broker that was out of the ISR, and it showed that the partition was fully 
replicated to the follower.

So JMX+DumpLogSegments disagreed with kafka-topics.sh+zookeeper.

So that is the opposite scenario of yours.

Mine was on 0.9.0.

-James


> On Feb 9, 2016, at 12:02 PM, Eric Ogren  wrote:
>
> Hello -
>
> I am seeing that the node in our Kafka cluster currently elected as
> controller is reporting 1 offline partition via JMX
> (kafka.controller:type=KafkaController,name=OfflinePartitionsCount).
> However, when I use kaka-topics to find the offline partition
> (bin/kafka-topics.sh
> --zookeeper localhost:2181 --unavailable-partitions --describe), nothing is
> dumped out. We also haven't been seeing any errors around unavailable
> partitions in any of our producers.
>
> Is there a good way to introspect the controller's state and figure out
> what it thinks the offline partition is?
>
> This is for Kafka 0.8.2.1.
>
> thanks
> Eric




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Detecting broker version programmatically

2016-02-04 Thread James Cheng

> On Feb 4, 2016, at 8:28 PM, Manikumar Reddy  wrote:
>
> Currently it is available through JMX Mbean. It is not available on wire
> protocol/requests.
>

The name of the JMX Mbean is kafka.server:type=app-info,id=4

Not sure what the id=4 means.

-James

> Pending JIRAs related to this:
> https://issues.apache.org/jira/browse/KAFKA-2061
>
> On Fri, Feb 5, 2016 at 4:31 AM,  wrote:
>
>> Is there a way to detect the broker version (even at a high level 0.8 vs
>> 0.9) using the kafka-clients Java library?
>>
>> --
>> Best regards,
>> Marko
>> www.kafkatool.com
>>
>>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: at-least-once delivery

2016-01-30 Thread James Cheng

> On Jan 30, 2016, at 4:21 AM, Franco Giacosa  wrote:
>
> Sorry, this solved my questions: "Setting a value greater than zero will
> cause the client to resend any record whose send fails with a potentially
> transient error. Note that this retry is no different than if the client
> resent the record upon receiving the error. Allowing retries will
> potentially change the ordering of records because if two records are sent
> to a single partition, and the first fails and is retried but the second
> succeeds, then the second record may appear first."
>

Franco,

Also, you can avoid the message reordering issue in that description by setting 
max.in.flight.requests.per.connector to 1.

This slide deck has good guidelines on the types of things you are talking 
about:
http://www.slideshare.net/JiangjieQin/no-data-loss-pipeline-with-apache-kafka-49753844

-James

> 2016-01-30 13:18 GMT+01:00 Franco Giacosa :
>
>> Hi,
>>
>> The at-least-once delivery is generated in part by the network fails and
>> the retries (that may generate duplicates) right?
>>
>> In the event of a duplicated (there was an error but the first message
>> landed ok on the partition P1) the producer will recalculate the partition
>> on the retry? is this done automatically?
>>
>> If in the retry the partition doesn't change and there is only 1 Producer,
>> will the duplicated be written next to the original? I mean if I poll()
>> they will come one after the other?
>>
>>
>>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: MongoDB Kafka Connect driver

2016-01-29 Thread James Cheng
Not sure if this will help anything, but just throwing it out there.

The Maxwell and mypipe projects both do CDC from MySQL and support 
bootstrapping. The way they do it is kind of "eventually consistent".

1) At time T1, record coordinates of the end of the binlog as of T1.
2) At time T2, do a full dump of the database into Kafka.
3) Connect back to the binlog in the coordinates recorded in step #1, and emit 
all those records into Kafka.

As Jay mentioned, MySQL supports full row images. At the start of step #3, the 
kafka topic contains all rows as of time T2. It is possible that during step 
#3, that you will emit rows that changed between T1 and T2. From the point of 
view of the consumer of the kafka topic, they would see rows that went "back in 
time". However, as step #3 progresses, and the consumer keeps reading, those 
rows would eventually converge down to their final state.

Maxwell: https://github.com/zendesk/maxwell
mypipe: https://github.com/mardambey/mypipe

Does that idea help in any way? Btw, a reason it is done this way is that it is 
"difficult" to do #1 and #2 above in a coordinated way without locking the 
database or without adding additional outside dependencies (LVM snapshots, 
being a specific one).

Btw, I glanced at some docs about the Mongodb oplog. It seems that each oplog 
contains
1) A way to identify the document that the change applies to.
2) A series of mongodb commands (set, unset) to alter the document in #1 to 
become the new document.

Thoughts:
For #1, does it identify a particular "version" of a document? (I don't know 
much about mongodb). If so, you might be able to use it to determine if the 
change should even be attempted to be applied to the object.
For #2, doesn't that mean you'll need "understand" mongodb's syntax and 
commands? Although maybe it is simply sets/unsets/deletes, in which case it's 
maybe pretty simple.

-James

> On Jan 29, 2016, at 9:39 AM, Jay Kreps  wrote:
>
> Also, most database provide a "full logging" option that let's you capture
> the whole row in the log (I know Oracle and MySQL have this) but it sounds
> like Mongo doesn't yet. That would be the ideal solution.
>
> -Jay
>
> On Fri, Jan 29, 2016 at 9:38 AM, Jay Kreps  wrote:
>
>> Ah, agreed. This approach is actually quite common in change capture,
>> though. For many use cases getting the final value is actually preferable
>> to getting intermediates. The exception is usually if you want to do
>> analytics on something like number of changes.
>>
>> On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava 
>> wrote:
>>
>>> Jay,
>>>
>>> You can query after the fact, but you're not necessarily going to get the
>>> same value back. There could easily be dozens of changes to the document
>>> in
>>> the oplog so the delta you see may not even make sense given the current
>>> state of the document. Even if you can apply it the delta, you'd still be
>>> seeing data that is newer than the update. You can of course take this
>>> shortcut, but it won't give correct results. And if the data has been
>>> deleted since then, you won't even be able to write the full record... As
>>> far as I know, the way the op log is exposed won't let you do something
>>> like pin a query to the state of the db at a specific point in the op log
>>> and you may be reading from the beginning of the op log, so I don't think
>>> there's a way to get correct results by just querying the DB for the full
>>> documents.
>>>
>>> Strictly speaking you don't need to get all the data in memory, you just
>>> need a record of the current set of values somewhere. This is what I was
>>> describing following those two options -- if you do an initial dump to
>>> Kafka, you could track only offsets in memory and read back full values as
>>> needed to apply deltas, but this of course requires random reads into your
>>> Kafka topic (but may perform fine in practice depending on the workload).
>>>
>>> -Ewen
>>>
>>> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps  wrote:
>>>
 Hey Ewen, how come you need to get it all in memory for approach (1)? I
 guess the obvious thing to do would just be to query for the record
 after-image when you get the diff--e.g. just read a batch of changes and
 multi-get the final values. I don't know how bad the overhead of this
>>> would
 be...batching might reduce it a fair amount. The guarantees for this are
 slightly different than the pure oplog too (you get the current value
>>> not
 every necessarily every intermediate) but that should be okay for most
 uses.

 -Jay

 On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
>>> e...@confluent.io>
 wrote:

> Sunny,
>
> As I said on Twitter, I'm stoked to hear you're working on a Mongo
> connector! It struck me as a pretty natural source to tackle since it
 does
> such a nice job of cleanly exposing the op log.
>
> Regarding the problem of only getting deltas, unfortunate

Re: Accumulating data in Kafka Connect source tasks

2016-01-29 Thread James Cheng

> On Jan 29, 2016, at 7:06 AM, Randall Hauch  wrote:
>
> On January 28, 2016 at 7:07:02 PM, Ewen Cheslack-Postava (e...@confluent.io) 
> wrote:
> Randall,
>
> Great question. Ideally you wouldn't need this type of state since it
> should really be available in the source system. In your case, it might
> actually make sense to be able to grab that information from the DB itself,
> although that will also have issues if, for example, there have been
> multiple schema changes and you can no longer get a previous schema from
> the current state of the tables.
> I agree that ideally connectors would be stateless, or at least have no need 
> for maintaining state across restarts. Unfortunately, that’s not always 
> possible.
>
> Reading the log but using the current schema does pose a problem if/when the 
> schema has evolved since the point in the log that we’re currently reading. 
> This is far more of an issue if you’re playing catch up and there’s been 
> non-compatible schema changes.
>
> Case in point: when MySQL inserts/updates/removes a row from a table, it 
> writes an event in the log that includes (a) a table identifier and (b) the 
> row values in column-order. There is no other information. Column renames 
> might be okay, but adding or removing columns will likely result in 
> mismatching the row values to the appropriate columns.
>
> Luckily, MySQL includes the DDL statements in the log, so my connector parses 
> these as part of processing and builds up the schema state as it goes along. 
> This works beautifully, with the only issue being how to persist and recover 
> this after restarts.
>
>
> The offset storage is probably pretty close to what you're looking for,
> although we obviously structure that very narrowly. Adding in some sort of
> other state store is an interesting idea, though I'd be curious how many
> other systems encounter similar challenges. I think one way to do this
> without huge changes and in a way that properly handles offset commits
> would be to expose a small API for setting local state and have Connect
> store that state right in the same topic (and message) as offsets. To
> handle offset commits and reviving tasks that hit a fault, we would just
> grab the current state as part of the process of committing offsets. Then
> offsets would just be a special case of that more general state.
>
> However, I'm also wary of doing something like this. Right now every worker
> has to consume the entire offsets topic. This works fine because offsets,
> while capable of being pretty complex, are generally pretty small such that
> there's no concern having to tail it on all workers (and no concern for the
> load on brokers leading those partitions). Once you provide a generic state
> storage mechanism without clear constraints on how it should be used,
> someone will come along and abuse it. Also, with offsets it is very clear
> (to the connector developer) which task should write to which keys (where
> the key here is the partition of the source partitioned stream). With
> support for arbitrary state, ownership of different subsets of the key
> space is very unclear. I think you may not have that particular problem
> because you probably only have 1 partition anyway since you are reading a
> transaction log.
>
> In any case, you're right that this doesn't exist today. There is one very
> hacky way to get around this, which is to store that schema information in
> your "offsets". This may not be *too* bad -- it'll increase the size of
> offset data, but probably doesn't affect much else. The data size may not
> be that bad as long as offsets aren't committed too frequently. In terms of
> performance, I'm assuming these schema changes are relatively rare, and you
> can just include the same schema object in every offset you create during
> the periods between schema changes so you (and the GC) are probably only
> doing a negligible amount of extra work.
>
> Hmm, it sound like hammering accumulated state into the offsets could be 
> pretty problematic and potentially risky, especially if the state has very 
> different size and frequency characteristics than the offsets.
>
> Re: creating a consumer, Connect doesn't provide any utilities to do that
> since the goal is to handle everything Kafka-related for the connector
> developer so they can just focus on getting the data from the other system!
> We could consider exposing some of the worker config though, which I
> imagine is all you really need -- it'd just be convenient to have the
> connection info for the Kafka brokers.
> Having a way to get the worker config would be awesome, and IMO it a nice 
> minimalistic approach. If you think this is a good idea, I can log a JIRA and 
> take it to the dev list. I’m willing to work on it, too.
>
> I’m starting to think that storing state on a separate dedicated topic is the 
> best option, at least for me. First, connector tasks can easily record their 
> state by simply adding more Sou

Re: Accumulating data in Kafka Connect source tasks

2016-01-28 Thread James Cheng

> On Jan 28, 2016, at 5:06 PM, Ewen Cheslack-Postava  wrote:
>
> Randall,
>
> Great question. Ideally you wouldn't need this type of state since it
> should really be available in the source system. In your case, it might
> actually make sense to be able to grab that information from the DB itself,
> although that will also have issues if, for example, there have been
> multiple schema changes and you can no longer get a previous schema from
> the current state of the tables.
>
> The offset storage is probably pretty close to what you're looking for,
> although we obviously structure that very narrowly. Adding in some sort of
> other state store is an interesting idea, though I'd be curious how many
> other systems encounter similar challenges. I think one way to do this
> without huge changes and in a way that properly handles offset commits
> would be to expose a small API for setting local state and have Connect
> store that state right in the same topic (and message) as offsets. To
> handle offset commits and reviving tasks that hit a fault, we would just
> grab the current state as part of the process of committing offsets. Then
> offsets would just be a special case of that more general state.
>
> However, I'm also wary of doing something like this. Right now every worker
> has to consume the entire offsets topic. This works fine because offsets,
> while capable of being pretty complex, are generally pretty small such that
> there's no concern having to tail it on all workers (and no concern for the
> load on brokers leading those partitions). Once you provide a generic state
> storage mechanism without clear constraints on how it should be used,
> someone will come along and abuse it. Also, with offsets it is very clear
> (to the connector developer) which task should write to which keys (where
> the key here is the partition of the source partitioned stream). With
> support for arbitrary state, ownership of different subsets of the key
> space is very unclear. I think you may not have that particular problem
> because you probably only have 1 partition anyway since you are reading a
> transaction log.
>
> In any case, you're right that this doesn't exist today. There is one very
> hacky way to get around this, which is to store that schema information in
> your "offsets". This may not be *too* bad -- it'll increase the size of
> offset data, but probably doesn't affect much else. The data size may not
> be that bad as long as offsets aren't committed too frequently. In terms of
> performance, I'm assuming these schema changes are relatively rare, and you
> can just include the same schema object in every offset you create during
> the periods between schema changes so you (and the GC) are probably only
> doing a negligible amount of extra work.
>
> Re: creating a consumer, Connect doesn't provide any utilities to do that
> since the goal is to handle everything Kafka-related for the connector
> developer so they can just focus on getting the data from the other system!
> We could consider exposing some of the worker config though, which I
> imagine is all you really need -- it'd just be convenient to have the
> connection info for the Kafka brokers.
>
> Finally, I'd love to know which DB you're reading the transaction log from
> and if you're planning on open sourcing the connector:)
>

+1! I'd like to know what DB you're working on too!

-James

> -Ewen
>
> On Thu, Jan 28, 2016 at 6:12 AM, Randall Hauch  wrote:
>
>> Rather than leave this thread so open ended, perhaps I can narrow down to
>> what I think is the best approach. These accumulations are really just
>> additional information from the source that don’t get written to the normal
>> topics. Instead, each change to the accumulated state can be emitted as
>> source records on a dedicated topic. That is very straightforward with the
>> existing Kafka Connect.
>>
>> The challenge I’m struggling with is how a task can/should, upon startup,
>> *consume* that stream to rebuild its state. I can set up my own Kafka
>> consumer for that topic, but IIUC now my connector config has to include
>> much of the same information included in the Kafka Connect workers
>> configuration.
>>
>> Am I just missing how a connector can see the worker configuration
>> properties? Or is there a way that Kafka Connect can help me create a Kafka
>> consumer?
>>
>> Best regards,
>>
>> Randall Hauch
>>
>> On January 28, 2016 at 12:11:07 AM, Randall Hauch (rha...@gmail.com)
>> wrote:
>> I’m creating a custom Kafka Connect source connector, and I’m running into
>> a situation for which Kafka Connect doesn’t seem to provide a solution out
>> of the box. I thought I’d first post to the users list in case I’m just
>> missing a feature that’s already there.
>>
>> My connector’s SourceTask implementation is reading a relational database
>> transaction log. That log contains schema changes and row changes, and the
>> row changes include a reference to the table and the row val

Re: Offset storage issue with kafka(0.8.2.1)

2016-01-27 Thread James Cheng

> On Jan 27, 2016, at 8:25 PM, Sivananda Reddys Thummala Abbigari 
>  wrote:
>
> Hi,
>
> # *Kafka Version*: 0.8.2.1
>
> # *My consumer.propeties have the following properties*:
>exclude.internal.topics=false
>offsets.storage=kafka
>dual.commit.enabled=false
>
> # With the above configuration the offsets should be stored in kafka
> instead of zookeeper but I see that offsets are stored in
> zookeeper(verified using "get path" command).
>
> # I can see that "__consumer_offsets" topic is created but when I run
> "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
> __consumer_offsets --from-beginning" nothing is returned which means the
> offsest data is not stored in the  "__consumer_offsets" topic
>
> I want to store the offsets only in kafka. Could you please let me know if
> I am missing anything?.
>
> Thank you,
> Siva.

Siva,
I think you need to pass exclude.internal.topics=false to 
kafka-console-consumer.sh. Also, the messages in the __consumer_offsets topic 
are stored in a binary (I think) encoded format, so you'll need to specify a 
message formatter.

>From slide 32 of this deck 
>http://www.slideshare.net/jjkoshy/offset-management-in-kafka,

./bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper 
localhost:2181 --formatter "kafka.server.OffsetManager$OffsetMessageFormatter" 
--consumer.config consumer.properties

And make sure your consumer.properties has exclude.internal.topics=false.

-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Kafka + ZooKeeper on the same hardware?

2016-01-18 Thread James Cheng

> On Jan 18, 2016, at 12:21 PM, Dick Davies  wrote:
>
> Started an Ansible playbook using the Confluent platform RPM distro,
> and it seems that co-locates zookeepers
> on the brokers.
>
> So I'm assuming it's fine (at least on 0.9.x for the reasons Todd mentioned).
>
> Does anyone know if the Confluent distro is supposed to be production-ready?
> I switched our test setup over to the Confluent distro to get
> RPM-based installs etc and got a 3-node
> cluster up in HA, but there were a few things I had to roll myself
> (init scripts etc) which I'd have expected
> in a full release.
>
> (sorry if this is OT, I don't know of a specific Confluent mailing list).
>

Confluent-specific mailing list is linked off of here:
http://www.confluent.io/developer#documentation

-James

>
>
> On 14 January 2016 at 18:31, Todd Palino  wrote:
>> I’d say it depends on load and usage. It can definitely be done, and we’ve
>> done it here in places, though we don’t anymore. Part of the luxury of
>> being able to get the hardware we want. In general, it’s probably easier to
>> do with 0.9 and Kafka-committed offsets, because the consumers don’t need
>> to talk to ZK as much. It’s probably even even easier with the new
>> consumer, but I can’t speak to that at all.
>>
>> One of the gotchas is that ZK really should have its transaction log on an
>> isolate device so that sequential writes do not require seeks. This could
>> be a separate disk or an SSD drive. An example of a really bad place to put
>> it would be on the same device as your Kafka log segments :) Depending on
>> your load, it may not be critical to use a separate device.
>>
>> As Gwen noted, it all comes down to load. Your availability will be fine,
>> you just need to figure out if the services can share the load.
>>
>> -Todd
>>
>>
>> On Thu, Jan 14, 2016 at 9:25 AM, Gwen Shapira  wrote:
>>
>>> It depends on load :)
>>> As long as there is no contention, you are fine.
>>>
>>> On Thu, Jan 14, 2016 at 6:06 AM, Erik Forsberg  wrote:
>>>
 Hi!

 Pondering how to configure Kafka clusters and avoid having too many
 machines to manage.. Would it be recommended to run say a 3 node kafka
 cluster where you also run your 3 node zookeeper cluster on the same
 machines?

 I guess the answer is that "it depends on load", but would be interested
 in any opinions on this anyway.

 Thanks!
 \EF

>>>
>>
>>
>>
>> --
>> *—-*
>> *Todd Palino*
>> Staff Site Reliability Engineer
>> Data Infrastructure Streaming
>>
>>
>>
>> linkedin.com/in/toddpalino




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: how to reset kafka offset in zookeeper

2015-12-19 Thread James Cheng
This page describes what Kafka stores in Zookeeper:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

It looks like the info for a particular consumer groupId is stored at:

/consumers//

According to 
https://community.cloudera.com/t5/Cloudera-Labs/Kafka-Parcels/td-p/20392, 
Cloudera's default config is to put a zookeeper chroot of "/kafka"

So you might have to look at 

/kafka/consumers/

-James


> On Dec 19, 2015, at 3:14 PM, Todd Palino  wrote:
> 
> There’s no simple command. You’ll need to use either zookeeper-shell.sh or
> zkCli.sh or something similar that lets you explore and edit Zookeeper and
> do a recursive delete on the group name in the consumers tree. I’m not sure
> how Cloudera’s interface differs, however, or if they provide a separate
> tool for deleting a consumer group.
> 
> -Todd
> 
> 
> On Sat, Dec 19, 2015 at 11:34 AM, Akhilesh Pathodia <
> pathodia.akhil...@gmail.com> wrote:
> 
>> What is the command  to delete  group from zookeeper? I dont find
>> /consumer/ directory? I am using cloudera, is there any place on cloudera
>> manager where I can delete the group?
>> 
>> Thanks
>> 
>> On Sat, Dec 19, 2015 at 11:47 PM, Todd Palino  wrote:
>> 
>>> If what you want to do is reset to smallest, all you need to do is stop
>> the
>>> consumer, delete the group from Zookeeper, and restart the consumer. It
>>> will automatically create the group again.
>>> 
>>> You only need to export the offsets first if you later need to reset to
>>> where you were in the partitions.
>>> 
>>> -Todd
>>> 
>>> On Saturday, December 19, 2015, Akhilesh Pathodia <
>>> pathodia.akhil...@gmail.com> wrote:
>>> 
 What is the process for deleting the consumer group from zookeeper?
>>> Should
 I export offset, delete and then import?
 
 Thanks,
 Akhilesh
 
 On Fri, Dec 18, 2015 at 11:32 PM, Todd Palino >>> > wrote:
 
> Yes, that’s right. It’s just work for no real gain :)
> 
> -Todd
> 
> On Fri, Dec 18, 2015 at 9:38 AM, Marko Bonaći <
>>> marko.bon...@sematext.com
 >
> wrote:
> 
>> Hmm, I guess you're right Tod :)
>> Just to confirm, you meant that, while you're changing the exported
 file
> it
>> might happen that one of the segment files becomes eligible for
>>> cleanup
> by
>> retention, which would then make the imported offsets out of range?
>> 
>> Marko Bonaći
>> Monitoring | Alerting | Anomaly Detection | Centralized Log
>>> Management
>> Solr & Elasticsearch Support
>> Sematext  | Contact
>> 
>> 
>> On Fri, Dec 18, 2015 at 6:29 PM, Todd Palino >>> > wrote:
>> 
>>> That works if you want to set to an arbitrary offset, Marko.
>>> However
 in
>> the
>>> case the OP described, wanting to reset to smallest, it is better
>>> to
> just
>>> delete the consumer group and start the consumer with
 auto.offset.reset
>> set
>>> to smallest. The reason is that while you can pull the current
 smallest
>>> offsets from the brokers and set them in Zookeeper for the
>>> consumer,
 by
>> the
>>> time you do that the smallest offset is likely no longer valid.
>>> This
>> means
>>> you’re going to resort to the offset reset logic anyways.
>>> 
>>> -Todd
>>> 
>>> 
>>> On Fri, Dec 18, 2015 at 7:10 AM, Marko Bonaći <
> marko.bon...@sematext.com 
>>> 
>>> wrote:
>>> 
 You can also do this:
 1. stop consumers
 2. export offsets from ZK
 3. make changes to the exported file
 4. import offsets to ZK
 5. start consumers
 
 e.g.
 bin/kafka-run-class.sh kafka.tools.ExportZkOffsets --group
 group-name
 --output-file /tmp/zk-offsets --zkconnect localhost:2181
 bin/kafka-run-class.sh kafka.tools.ImportZkOffsets --input-file
 /tmp/zk-offsets --zkconnect localhost:2181
 
 Marko Bonaći
 Monitoring | Alerting | Anomaly Detection | Centralized Log
> Management
 Solr & Elasticsearch Support
 Sematext  | Contact
 
 
 On Fri, Dec 18, 2015 at 4:06 PM, Jens Rantil <
>>> jens.ran...@tink.se
 >
>>> wrote:
 
> Hi,
> 
> I noticed that a consumer in the new consumer API supports
 setting
>> the
> offset for a partition to beginning. I assume doing so also
>>> would
>>> update
> the offset in Zookeeper eventually.
> 
> Cheers,
> Jens
> 
> On Friday, December 18, 2015, Akhilesh Pathodia <
> pathodia.akhil...@gmail.com >
> wrote:
> 
>> Hi,
>> 
>> I want to reset the kafka offset in zookeeper so that the
> consumer
>>> will
>> start reading messages

Re: kafka-connect-jdbc: ids, timestamps, and transactions

2015-12-18 Thread James Cheng
Mark, what database are you using?

If you are using MySQL...



There is a not-yet-finished Kafka MySQL Connector at 
https://github.com/wushujames/kafka-mysql-connector. It tails the MySQL binlog, 
and so will handle the situation you describe.

But, as I mentioned, I haven't finished it yet.

If you are using MySQL and don't specifically need/want Kafka Connect, then 
there are a bunch of other options. There is a list of them at 
https://github.com/wushujames/mysql-cdc-projects. But, I'd recommend using the 
Kafka Connect framework, since it was built for this exact purpose.



-James

> On Dec 18, 2015, at 12:08 PM, Mark Drago  wrote:
>
> Ewen,
>
> Thanks for the reply.  We'll proceed while keeping all of your points in
> mind.  I looked around for a more focused forum for the jdbc connector
> before posting here but didn't come across the confluent-platform group.
> I'll direct any more questions about the jdbc connector there.  I'll also
> close the github issue with a link to this thread.
>
> Thanks again,
> Mark.
>
> On Wed, Dec 16, 2015 at 9:51 PM Ewen Cheslack-Postava 
> wrote:
>
>> Mark,
>>
>> There are definitely limitations to using JDBC for change data capture.
>> Using a database-specific implementation, especially if you can read
>> directly off the database's log, will be able to handle more situations
>> like this. Cases like the one you describe are difficult to address
>> efficiently working only with simple queries.
>>
>> The JDBC connector offers a few different modes for handling incremental
>> queries. One of them uses both a timestamp and a unique ID, which will be
>> more robust to issues like these. However, even with both, you can still
>> come up with variants that can cause issues like the one you describe. You
>> also have the option of using a custom query which might help if you can do
>> something smarter by making assumptions about your table, but for now
>> that's pretty limited for constructing incremental queries since the
>> connector doesn't provide a way to track offset columns with custom
>> queries. I'd like to improve the support for this in the future, but at
>> some point it starts making sense to look at database-specific connectors.
>>
>> (By the way, this gets even messier once you start thinking about the
>> variety of different isolation levels people may be using...)
>>
>> -Ewen
>>
>> P.S. Where to ask these questions is a bit confusing since Connect is part
>> of Kafka. In general, for specific connectors I'd suggest asking on the
>> corresponding mailing list for the project, which in the case of the JDBC
>> connector would be the Confluent Platform mailing list here:
>> https://groups.google.com/forum/#!forum/confluent-platform
>>
>> On Wed, Dec 16, 2015 at 5:27 AM, Mark Drago  wrote:
>>
>>> I had asked this in a github issue but I'm reposting here to try and get
>> an
>>> answer from a wider audience.
>>>
>>> Has any thought gone into how kafka-connect-jdbc will be impacted by SQL
>>> transactions committing IDs and timestamps out-of-order?  Let me give an
>>> example with two connections.
>>>
>>> 1: begin transaction
>>> 1: insert (get id 1)
>>> 2: begin transaction
>>> 2: insert (get id 2)
>>> 2: commit (recording id 2)
>>> kafka-connect-jdbc runs and thinks it has handled everything through id 2
>>> 1: commit (recording id 1)
>>>
>>> This would result in kafka-connect-jdbc missing id 1. The same thing
>> could
>>> happen with timestamps. I've read through some of the kafka-connect-jdbc
>>> code and I think it may be susceptible to this problem, but I haven't run
>>> it or verified that it would be an issue. Has this come up before? Are
>>> there plans to deal with this situation?
>>>
>>> Obviously something like bottled-water for postgresql would handle this
>>> nicely as it would get the changes once they're committed.
>>>
>>>
>>> Thanks for any insight,
>>>
>>> Mark.
>>>
>>>
>>> Original github issue:
>>> https://github.com/confluentinc/kafka-connect-jdbc/issues/27
>>>
>>
>>
>>
>> --
>> Thanks,
>> Ewen
>>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


0.8.2 high level consumer with one-time-use group.id's?

2015-12-15 Thread James Cheng
When using the 0.8.2 high level consumer, what is the impact of creating many 
one-time use groupIds and checkpointing offsets using those?

I have a use case where upon every boot, I want to consume an entire topic from 
the very beginning, all partitions. We are using the high level consumer for 
convenience in handling leader discovery and rebalancing, but we do not need 
need consumer groups functionality.

We do not need checkpointing of offsets to allow continuing after a restart of 
our application, since we want to re-consume the stream upon restarts. However, 
it appears that if you do *not* checkpoint, then when there is an intermittent 
disconnect, the consumer will restart at the beginning of the topic. I haven't 
yet traced down why this happens.

We were thinking of simply creating a new consumer group id upon every reboot, 
but this seems messy, leaving around a lot of unused consumer group ids. A 
couple questions:

1) What resources does a groupId use, when it is active (a consumer using it) 
and when it is inactive (no consumers using it)?

The only resources I can identify are:
* kafka/zookeeper using it for group membership (only when the group is active)
* disk storage for most recent offset in zookeeper (only the most recent is 
stored per partition)
* disk storage for all offsets in kafka (all checkpoints stored, but there is 
log compaction)
* in-memory storage for most recent offset in kafka, for lookups.

2) Are old non-active groupId's ever deleted?

Thanks,
-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Where is replication factor stored?

2015-10-16 Thread James Cheng

> On Oct 16, 2015, at 1:19 PM, Guozhang Wang  wrote:
>
> Replication factor is stored as topic configs that are introduced since
> 0.8.1, you can find it in the wiki you mentioned.
>

Ah, I didn't notice the /config section.

But it still doesn't show the replication factor.

[zk: localhost:2181(CONNECTED) 3] get /config/topics/__consumer_offsets
{"version":1,"config":{"segment.bytes":"104857600","cleanup.policy":"compact"}}
cZxid = 0xc017a
ctime = Wed Aug 05 22:48:12 UTC 2015
mZxid = 0xc017a
mtime = Wed Aug 05 22:48:12 UTC 2015
pZxid = 0xc017a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 79
numChildren = 0

I tried that for a number of different topics, and none of them have it.

-James


> Guozhang
>
> On Fri, Oct 16, 2015 at 12:33 PM, James Cheng  wrote:
>
>> Hi,
>>
>> Where is the replication factor for a topic stored? It isn't listed at
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper.
>> But the kafka-topics --describe command returns something. Where is it
>> finding that?
>>
>> Thanks,
>> -James
>>
>>
>> 
>>
>> This email and any attachments may contain confidential and privileged
>> material for the sole use of the intended recipient. Any review, copying,
>> or distribution of this email (or any attachments) by others is prohibited.
>> If you are not the intended recipient, please contact the sender
>> immediately and permanently delete this email and any attachments. No
>> employee or agent of TiVo Inc. is authorized to conclude any binding
>> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
>> Inc. may only be made by a signed written agreement.
>>
>
>
>
> --
> -- Guozhang




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Where is replication factor stored?

2015-10-16 Thread James Cheng
Hi,

Where is the replication factor for a topic stored? It isn't listed at 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper.
 But the kafka-topics --describe command returns something. Where is it finding 
that?

Thanks,
-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: New consumer client compatible with old broker

2015-10-15 Thread James Cheng

> On Oct 15, 2015, at 11:29 AM, tao xiao  wrote:
>
> Hi team,
>
> Does new consumer client (the one in trunk) work with 0.8.2.x broker? I am
> planning to use the new consumer in our development but don't want to
> upgrade the broker to the latest. is it possible to do that?

Tao,

I recently tried using the console-consumer from trunk to read from a 0.8.2 
broker. It did not work. I tried the console-consumer both in normal mode 
(which uses old consumer) as well as --new-consumer mode. Neither of them 
worked.

https://twitter.com/lorax_james/status/652607463057829888

-James




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Dealing with large messages

2015-10-05 Thread James Cheng
Here’s an article that Gwen wrote earlier this year on handling large messages 
in Kafka.

http://ingest.tips/2015/01/21/handling-large-messages-kafka/

-James

> On Oct 5, 2015, at 11:20 AM, Pradeep Gollakota  wrote:
>
> Fellow Kafkaers,
>
> We have a pretty heavyweight legacy event logging system for batch
> processing. We're now sending the events into Kafka now for realtime
> analytics. But we have some pretty large messages (> 40 MB).
>
> I'm wondering if any of you have use cases where you have to send large
> messages to Kafka and how you're dealing with them.
>
> Thanks,
> Pradeep




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: custom message handlers?

2015-09-28 Thread James Cheng

> On Sep 28, 2015, at 12:47 PM, Doug Tomm  wrote:
>
> hello,
>
> i've noticed the addition of the custom message handler feature in the latest 
> code; a very useful feature.  in what release will it be available, and when 
> might that be?  at present i am building kafka from source to get this 
> feature.
>

Doug, are you asking about custom message handling in Mirror Maker?

-James

> many thanks,
> doug
>




This email and any attachments may contain confidential and privileged material 
for the sole use of the intended recipient. Any review, copying, or 
distribution of this email (or any attachments) by others is prohibited. If you 
are not the intended recipient, please contact the sender immediately and 
permanently delete this email and any attachments. No employee or agent of TiVo 
Inc. is authorized to conclude any binding agreement on behalf of TiVo Inc. by 
email. Binding agreements with TiVo Inc. may only be made by a signed written 
agreement.


Re: Log Cleaner Thread Stops

2015-09-24 Thread James Cheng

> On Sep 24, 2015, at 8:11 PM, Todd Palino  wrote:
>
> Well, in general you can't currently use compressed messages in any topic
> that has compaction turned on regardless of whether or not you are using
> Kafka-committed offsets. The log compaction thread will die either way.
> There's only one compression thread for the broker that runs on all topics
> that use compaction.
>
> Jason, to address your question, it's probably wise to wait for now.
> Zookeeper offsets work, so unless it's broke, don't fix it for now. We're
> using Kafka-committed offsets at LinkedIn for our mirror makers and our
> auditor application (both of which are considered infrastructure
> applications for Kafka), but we're not encouraging other internal users to
> switch over just yet.
>

Burrow depends on kafka-commited offsets, doesn’t it? I guess that means Burrow 
is only being used to monitor your mirror makers and auditor application, then?

-James

> -Todd
>
>
> On Wed, Sep 23, 2015 at 3:21 PM, James Cheng  wrote:
>
>>
>> On Sep 18, 2015, at 10:25 AM, Todd Palino  wrote:
>>
>>> I think the last major issue with log compaction (that it couldn't handle
>>> compressed messages) was committed as part of
>>> https://issues.apache.org/jira/browse/KAFKA-1374 in August, but I'm not
>>> certain what version this will end up in. It may be part of 0.8.2.2.
>>>
>>> Regardless, you'll probably be OK now. We've found that once we clean
>> this
>>> issue up once it doesn't appear to recur. As long as you're not writing
>> in
>>> compressed messages to a log compacted topic (and that won't happen with
>>> __consumer_offsets, as it's managed by the brokers themselves - it would
>>> only be if you were using other log compacted topics), you're likely in
>> the
>>> clear now.
>>>
>>
>> Todd,
>>
>> If I understand your description of the problem, you are saying that
>> enabling log compaction on a topic with compressed messages can (will?)
>> cause the log cleaner to crash when it encounters those compressed
>> messages. And the death of the cleaner thread will prevent log compaction
>> from running on other topics, even ones that don't have compressed messages.
>>
>> That means if we have a cluster where we want to use log compaction on
>> *any* topic, we need to either:
>> 1) apply https://issues.apache.org/jira/browse/KAFKA-1374 (or upgrade to
>> some version it is applied)
>> OR
>> 2) make sure that we don't use compressed messages in *any* topic that has
>> log compaction turned on.
>>
>> And, more specifically, if we want to make use of __consumer_offsets, then
>> we cannot use compressed messages in any topic that has compaction turned
>> on.
>>
>> Is that right?
>> -James
>>
>>> -Todd
>>>
>>>
>>> On Fri, Sep 18, 2015 at 9:54 AM, John Holland <
>>> john.holl...@objectpartners.com> wrote:
>>>
>>>> Thanks!
>>>>
>>>> I did what you suggested and it worked except it was necessary for me to
>>>> remove the cleaner-offset-checkpoint file from the data directory and
>>>> restart the servers.  The log indicates all is well.
>>>>
>>>> Do you know what version the fix to this will be in? I'm not looking
>>>> forward to dealing with this on a reoccurring basis.
>>>>
>>>> -John
>>>>
>>>> On Fri, Sep 18, 2015 at 8:48 AM Todd Palino  wrote:
>>>>
>>>>> Yes, this is a known concern, and it should be fixed with recent
>> commits.
>>>>> In the meantime, you'll have to do a little manual cleanup.
>>>>>
>>>>> The problem you're running into is a corrupt message in the offsets
>>>> topic.
>>>>> We've seen this a lot. What you need to do is set the topic
>> configuration
>>>>> to remove the cleanup.policy config, and set retention.ms and
>> segment.ms
>>>>> to
>>>>> something reasonably low. I suggest using a value of 3 or 4 times your
>>>>> commit interval for consumers. Then wait until the log segments are
>>>> reaped
>>>>> (wait twice as long as the retention.ms you chose, to be safe). Once
>>>> this
>>>>> is done, you can set the topic configuration back the way it was
>> (remove
>>>>> segment.ms and retention.ms configs, and set cleanup.policy=compact).
&

Re: Log Cleaner Thread Stops

2015-09-23 Thread James Cheng

On Sep 18, 2015, at 10:25 AM, Todd Palino  wrote:

> I think the last major issue with log compaction (that it couldn't handle
> compressed messages) was committed as part of
> https://issues.apache.org/jira/browse/KAFKA-1374 in August, but I'm not
> certain what version this will end up in. It may be part of 0.8.2.2.
> 
> Regardless, you'll probably be OK now. We've found that once we clean this
> issue up once it doesn't appear to recur. As long as you're not writing in
> compressed messages to a log compacted topic (and that won't happen with
> __consumer_offsets, as it's managed by the brokers themselves - it would
> only be if you were using other log compacted topics), you're likely in the
> clear now.
> 

Todd,

If I understand your description of the problem, you are saying that enabling 
log compaction on a topic with compressed messages can (will?) cause the log 
cleaner to crash when it encounters those compressed messages. And the death of 
the cleaner thread will prevent log compaction from running on other topics, 
even ones that don't have compressed messages.

That means if we have a cluster where we want to use log compaction on *any* 
topic, we need to either:
1) apply https://issues.apache.org/jira/browse/KAFKA-1374 (or upgrade to some 
version it is applied)
OR
2) make sure that we don't use compressed messages in *any* topic that has log 
compaction turned on.

And, more specifically, if we want to make use of __consumer_offsets, then we 
cannot use compressed messages in any topic that has compaction turned on.

Is that right?
-James

> -Todd
> 
> 
> On Fri, Sep 18, 2015 at 9:54 AM, John Holland <
> john.holl...@objectpartners.com> wrote:
> 
>> Thanks!
>> 
>> I did what you suggested and it worked except it was necessary for me to
>> remove the cleaner-offset-checkpoint file from the data directory and
>> restart the servers.  The log indicates all is well.
>> 
>> Do you know what version the fix to this will be in? I'm not looking
>> forward to dealing with this on a reoccurring basis.
>> 
>> -John
>> 
>> On Fri, Sep 18, 2015 at 8:48 AM Todd Palino  wrote:
>> 
>>> Yes, this is a known concern, and it should be fixed with recent commits.
>>> In the meantime, you'll have to do a little manual cleanup.
>>> 
>>> The problem you're running into is a corrupt message in the offsets
>> topic.
>>> We've seen this a lot. What you need to do is set the topic configuration
>>> to remove the cleanup.policy config, and set retention.ms and segment.ms
>>> to
>>> something reasonably low. I suggest using a value of 3 or 4 times your
>>> commit interval for consumers. Then wait until the log segments are
>> reaped
>>> (wait twice as long as the retention.ms you chose, to be safe). Once
>> this
>>> is done, you can set the topic configuration back the way it was (remove
>>> segment.ms and retention.ms configs, and set cleanup.policy=compact).
>>> Lastly, you'll need to do a rolling bounce of the cluster to restart the
>>> brokers (which restarts the log cleaner threads). Technically, you only
>>> need to restart brokers where the threads have died, but it's easier to
>>> just restart all of them.
>>> 
>>> Keep in mind that when you do this, you are deleting old offsets. If your
>>> consumers are all live and healthy, this shouldn't be a problem because
>>> they will just continue to commit their offsets properly. But if you have
>>> an offline consumer, you'll lose the committed offsets by doing this.
>>> 
>>> -Todd
>>> 
>>> 
>>> On Fri, Sep 18, 2015 at 5:31 AM, John Holland <
>>> john.holl...@objectpartners.com> wrote:
>>> 
 I've been experiencing this issue across several of our environments
>> ever
 since we enabled the log cleaner for the __consumer_offsets topic.
 
 We are on version 0.8.2.1 of kafka, using the new producer.  All of our
 consumers are set to commit to kafka only.
 
 Below is the stack trace in the log I've encountered across several
 different clusters.  A simple restart of kafka will allow compaction to
 continue on all of the other partitions but the incorrect one will
>> always
 fail.
 
 Here are the values for it from the kafka-topics --describe command:
 
 Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3
 Configs:segment.bytes=104857600,cleanup.policy=compact
 
 Are there any recommendations on how to prevent this and the best way
>> to
 recover from this exception?  This is causing disk space to fill up
>>> quickly
 on the node.
 
 I did see an open issue that seems very similar to this
 https://issues.apache.org/jira/browse/KAFKA-1641 but this is the
 __consumer_offsets topic which I have not had any part in setting up
>> nor
 producing to.
 
 [2015-09-18 02:57:25,520] INFO Cleaner 0: Beginning cleaning of log
 __consumer_offsets-17. (kafka.log.LogCleaner)
 [2015-09-18 02:57:25,520] INFO Cleaner 0: Building offset map for
 __consumer_offsets-17

Documentation typo for offsets.topic.replication.factor ?

2015-08-05 Thread James Cheng
Hi,

My kafka cluster has a __consumer_offsets topic with 50 partitions (the default 
for offsets.topic.num.partitions) but with a replication factor of just 1 (the 
default for offsets.topic.replication.factor should be 3).

From the docs http://kafka.apache.org/documentation.html:

offsets.topic.replication.factor3   The replication factor for the 
offset commit topic. A higher setting (e.g., three or four) is recommended in 
order to ensure higher availability. If the offsets topic is created when fewer 
brokers than the replication factor then the offsets topic will be created with 
fewer replicas.


I'm guessing there's a typo there? I'm guessing it should be:

If the offsets topic is created when fewer brokers than the replication factor 
[are active], then the offsets topic will be created with fewer replicas.

Or something along those lines?

Thanks,
-James




Re: Checkpointing with custom metadata

2015-08-03 Thread James Cheng
Nice new email address, Gwen. :)

On Aug 3, 2015, at 3:17 PM, Gwen Shapira  wrote:

> You are correct. You can see that ZookeeperConsumerConnector is hardcoded
> with null metadata.
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L310
> 
> More interesting, it looks like the Metadata is not exposed in the new
> KafkaConsumer either.
> 
> Mind sharing what did you plan to use it for? this will help us figure out
> how to expose it :)
> 

I'm juggling around a couple designs on what I'm trying to do, so it may turn 
out that I actually don't need it.

My generic answer is, I have some application state related to the processing 
of a topic, and I wanted to store it somewhere persistent that will survive 
across process death and disk failure. So storing it with the offset seemed 
like a nice solution. I could alternatively store it in a standalone kafka 
topic instead.

The more detailed answer: I'm doing a time-based merge sort of 3 topics (A, B, 
and C) and outputtiing the results into a new output topic (let's call it 
"sorted-topic"). Except that during the initial creation of "sorted-topic", and 
I want all of topic A to be output to sorted-topic first, and then followed by 
an on-going merge sort of topics B, C, and any updates to A that come along.

I was trying to handle the situation of what happens if I crash when initially 
copying topic A into sorted-topic. And I was thinking that I could save some 
metadata in my topic A checkpoint that says "still doing initial copy of A". So 
that way, when I start up next time, I would know to continue copying topic A 
to the output. Once I have finished copying all of A to sorted-topic, that I 
would store "finished doing initial copy of A" into my checkpoint, and that 
upon restart, I would check that and know to start doing the merge sort of A B 
C.

I have a couple other designs that seem cleaner, tho, so I might not actually 
need it.

-James

> Gwen
> 
> 
> On Mon, Aug 3, 2015 at 1:52 PM, James Cheng  wrote:
> 
>> According to
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest,
>> we can store custom metadata with our checkpoints. It looks like the high
>> level consumer does not support committing offsets with metadata, and that
>> in order to checkpoint with custom metadata, we have to issue the
>> OffsetCommitRequest ourselves. Is that correct?
>> 
>> Thanks,
>> -James
>> 
>> 



Checkpointing with custom metadata

2015-08-03 Thread James Cheng
According to 
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest,
 we can store custom metadata with our checkpoints. It looks like the high 
level consumer does not support committing offsets with metadata, and that in 
order to checkpoint with custom metadata, we have to issue the 
OffsetCommitRequest ourselves. Is that correct?

Thanks,
-James



Re: New consumer - poll/seek javadoc confusing, need clarification

2015-07-21 Thread James Cheng

> On Jul 21, 2015, at 9:15 AM, Ewen Cheslack-Postava  wrote:
> 
> On Tue, Jul 21, 2015 at 2:38 AM, Stevo Slavić  wrote:
> 
>> Hello Apache Kafka community,
>> 
>> I find new consumer poll/seek javadoc a bit confusing. Just by reading docs
>> I'm not sure what the outcome will be, what is expected in following
>> scenario:
>> 
>> - kafkaConsumer is instantiated with auto-commit off
>> - kafkaConsumer.subscribe(someTopic)
>> - kafkaConsumer.position is called for every TopicPartition HLC is actively
>> subscribed on
>> 
>> and then when doing multiple poll calls in succession (without calling
>> commit), does seek have to be called in between poll calls to position HLC
>> to skip what was read in previous poll, or does HLC keep that state
>> (position after poll) in memory, so that next poll (without seek in between
>> two poll calls) will continue from where last poll stopped?
>> 
> 
> The position is tracked in-memory within the consumer, so as long as there
> isn't a consumer rebalance, consumption will just proceed with subsequent
> messages (i.e. the behavior I think most people would find intuitive).
> However, if a rebalance occurs (another consumer instance joins the group
> or some leave), then a partition may be assigned to an different consumer
> instance that has no idea about the current position and will restart based
> on the offset reset setting (because attempting to fetch the committed
> offset will fail since no offsets have been committed).
> 

Ewen,

What happens if there is a broker failure and a new broker becomes the 
partition leader? Does the high level consumer start listening to the new 
partition leader at the in-memory position, or does it restart based on saved 
offsets?

Thanks,
-James

> -Ewen
> 
> 
>> Could be it's just me not understanding this from javadoc. If not, maybe
>> javadoc can be improved to make this (even) more obvious.
>> 
>> Kind regards,
>> Stevo Slavic.
>> 
> 
> 
> 
> -- 
> Thanks,
> Ewen



Consuming from Kafka but don't need to save offsets

2015-07-20 Thread James Cheng
Hi,

I have a web service that serves up some data that it obtains from a kafka 
topic. When the process starts up, it wants to load the entire kafka topic into 
memory, and serve the data up from an in-memory hashtable. The data in the 
topic has primary keys and is log compacted, and so the total dataset will be 
small enough to fit in memory. My web service will only start serving up data 
when the entire topic is loaded. (And for that, 
https://issues.apache.org/jira/browse/KAFKA-1977 would be super useful).

I am only storing this data in memory. In the event of process death or 
restart, my in-memory state is gone, and so I will always want to rebuild it by 
again consuming the topic from the earliest offset. I will never need to 
checkpoint my offsets.

Also, I will have N instances of this application, each one needing to consume 
the entire topic. This is how I plan to do horizontal scaling of my web service.

I would like to use the high level consumer, so that I don't need to manually 
discover which broker is the leader, and so that I don't have to handle leader 
rebalancing.

A couple questions:
1) Does this use case make sense? Is this pattern used by anyone else? I like 
it because it makes my web service completely stateless.
2) In order to make each instance consume all partitions of the topic, I need 
each consumer group id to be unique to that process. So I was thinking of just 
using a UUID or something similar. What is the "cost" of creating a new 
consumer group id? If I am creating a new one every time I start my 
application, would I be cluttering up zookeeper or the __consumer_offsets 
topic? Note there will only every be N instances of my application running. 
Since I never will need to checkpoint my offsets, does that affect my question 
about "cluttering up" zookeeper/kafka? Are old consumer groups ever cleaned up 
out of zookeeper or the __consumer_offsets topic?
3) Are the stored offsets used for any other reason, aside from at startup of a 
new consumer? Are offsets used after rebalancing when partition leaders change 
due to broker failure? I know that offsets can be used for Burrow-like 
monitoring.
4) Since I don't need for support checkpointing, another option is to use the 
SimpleConsumer. The sample code at 
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example 
looks fairly comprehensive. It handles discovery of the partition leader, and 
handles leader rebalancing. Are there any other situations that I should be 
aware of before relying on that sample code?
5) Will any of this change when the new consumer comes out? Will the 
SimpleConsumer still exist when the new consumer comes out?

Thanks,
-James



Re: New producer in production

2015-07-17 Thread James Cheng
Be aware that the old producer and new producer have different partitioning 
algorithms:
http://mail-archives.apache.org/mod_mbox/kafka-users/201504.mbox/%3C4A6E5F2C-232E-4288-8F9F-3F9D9AE05718%40tivo.com%3E

This could bite you if you rely on key-based partitioning and are switching 
from the old producer to the new producer.

Siva, this is less important for you, since you are starting to use kafka for 
the first time. And this is not important at all if you provide your own 
partitioning algorithm.

-James

On Jul 17, 2015, at 1:23 PM, Sivananda Reddy  wrote:

> Hi Jay,
> 
> I found it here:
> http://kafka.apache.org/documentation.html#newproducerconfigs, the same
> link is reported by James.
> 
> @Joel: Thanks a lot for the info, I will use new producer
> 
> Regards,
> Siva.
> 
> On Fri, Jul 17, 2015 at 12:02 PM, James Cheng  wrote:
> 
>> http://kafka.apache.org/documentation.html, Section 3.4.
>> 
>>> 3.4 New Producer Configs
>>> 
>>> We are working on a replacement for our existing producer. The code is
>> available in trunk now and can be considered beta quality. Below is the
>> configuration for the new producer.
>> 
>> Sivananda might have seen it elsewhere, but this is where I found it.
>> 
>> -James
>> 
>> On Jul 17, 2015, at 9:49 AM, Jay Kreps  wrote:
>> 
>>> Hey Sivananda,
>>> 
>>> That's actually no longer true and likely a documentation bug. Where did
>>> you see that?
>>> 
>>> -Jay
>>> 
>>> On Fri, Jul 17, 2015 at 9:35 AM, Sivananda Reddy <
>> sivananda2...@gmail.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Kafka document ion says that the new producer is in Beta state, how
>> safe is
>>>> it to use the new producer in production?. This is the first time I am
>>>> using Kafka for my application messaging needs. Please let me know.
>>>> 
>>>> Thank you,
>>>> Siva.
>>>> 
>> 
>> 



  1   2   >