Re: Consumer Pause & Scheduled Resume

2018-10-24 Thread Matthias J. Sax
There is no issue if you call `poll()` is all partitions are paused. If
fact, if you want to make sure that the consumer does not fall out of
the consumer group, you must call `poll()` in regular interval to not
hit `max.poll.interval.ms` timeout.


-Matthias

On 10/24/18 10:25 AM, pradeep s wrote:
> Pause and resume is required since i am running a pod in kubernetes and i
> am not shutting down the app
> 
> On Tue, Oct 23, 2018 at 10:33 PM pradeep s 
> wrote:
> 
>> Hi,
>> I have a requirement to have kafka streaming start at scheduled time and
>> then pause the stream when the consumer poll returns empty fetches for 3 or
>> more polls.
>>
>> I am starting a consumer poll loop during application startup using a
>> singled thread executor and then pausing the consumer when the poll is
>> returning empty for 3 polls.
>>
>> When the schedule kicks in , i am calling *consumer.resume.*
>>
>> Is this approach correct ?
>> Will it cause any issue If the  consumer calls poll on a paused consumer.
>>
>> Skeleton Code
>> 
>>
>> public class *OfferItemImageConsumer* implements Runnable {
>>
>> @Override
>> public void run() {
>> try {
>> do {
>> ConsumerRecords records = 
>> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
>> writeAndPauseEmptyFetch(records);
>> processRecords(records);
>> } while (!consumerLoopClosed.get());
>> } catch (RuntimeException ex) {
>> handleConsumerLoopException(ex);
>> } finally {
>> kafkaConsumer.close();
>> }
>> }
>>
>>
>> private void writeAndPauseEmptyFetch(ConsumerRecords 
>> records) {
>> if (records.isEmpty()) {
>> emptyFetchCount++;
>> }
>> if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
>> writeImageData();
>> emptyFetchCount = 0;
>> kafkaConsumer.pause(kafkaConsumer.assignment());
>> consumerPaused = true;
>> }
>> }
>>
>> }
>>
>> =
>>
>> public class *ItemImageStreamScheduler* {
>> private static final int TERMINATION_TIMEOUT = 10;
>>
>>
>> private ExecutorService executorService = 
>> Executors.newSingleThreadExecutor();
>>
>> private final OfferItemImageConsumer offerItemImageConsumer;
>> private final ItemImageStreamConfig itemImageStreamConfig;
>> private final KafkaConsumer kafkaConsumer;
>>
>> @EventListener(ApplicationReadyEvent.class)
>> void startStreaming() {
>> executorService.submit(offerItemImageConsumer);
>> }
>> @Scheduled
>> void resumeStreaming() {
>> kafkaConsumer.resume(kafkaConsumer.assignment());
>> }
>>
>>
>> }
>>
>> Thanks
>>
>> Pradeep
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Consumer Pause & Scheduled Resume

2018-10-24 Thread pradeep s
Pause and resume is required since i am running a pod in kubernetes and i
am not shutting down the app

On Tue, Oct 23, 2018 at 10:33 PM pradeep s 
wrote:

> Hi,
> I have a requirement to have kafka streaming start at scheduled time and
> then pause the stream when the consumer poll returns empty fetches for 3 or
> more polls.
>
> I am starting a consumer poll loop during application startup using a
> singled thread executor and then pausing the consumer when the poll is
> returning empty for 3 polls.
>
> When the schedule kicks in , i am calling *consumer.resume.*
>
> Is this approach correct ?
> Will it cause any issue If the  consumer calls poll on a paused consumer.
>
> Skeleton Code
> 
>
> public class *OfferItemImageConsumer* implements Runnable {
>
> @Override
> public void run() {
> try {
> do {
> ConsumerRecords records = 
> kafkaConsumer.poll(kafkaConfig.getPollTimeoutMs());
> writeAndPauseEmptyFetch(records);
> processRecords(records);
> } while (!consumerLoopClosed.get());
> } catch (RuntimeException ex) {
> handleConsumerLoopException(ex);
> } finally {
> kafkaConsumer.close();
> }
> }
>
>
> private void writeAndPauseEmptyFetch(ConsumerRecords records) 
> {
> if (records.isEmpty()) {
> emptyFetchCount++;
> }
> if (emptyFetchCount > EMPTY_FETCH_THRESHOLD && !consumerPaused) {
> writeImageData();
> emptyFetchCount = 0;
> kafkaConsumer.pause(kafkaConsumer.assignment());
> consumerPaused = true;
> }
> }
>
> }
>
> =
>
> public class *ItemImageStreamScheduler* {
> private static final int TERMINATION_TIMEOUT = 10;
>
>
> private ExecutorService executorService = 
> Executors.newSingleThreadExecutor();
>
> private final OfferItemImageConsumer offerItemImageConsumer;
> private final ItemImageStreamConfig itemImageStreamConfig;
> private final KafkaConsumer kafkaConsumer;
>
> @EventListener(ApplicationReadyEvent.class)
> void startStreaming() {
> executorService.submit(offerItemImageConsumer);
> }
> @Scheduled
> void resumeStreaming() {
> kafkaConsumer.resume(kafkaConsumer.assignment());
> }
>
>
> }
>
> Thanks
>
> Pradeep
>
>


Re: [VOTE] 2.1.0 RC0

2018-10-24 Thread Andras Beni
+1 (non-binding)

Verified signatures and checksums of release artifacts
Performed quickstart steps on rc artifacts (both scala 2.11 and 2.12) and
one built from tag 2.1.0-rc0

Andras

On Wed, Oct 24, 2018 at 10:17 AM Dong Lin  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for feature release of Apache Kafka 2.1.0.
>
> This is a major version release of Apache Kafka. It includes 28 new KIPs
> and
>
> critical bug fixes. Please see the Kafka 2.1.0 release plan for more
> details:
>
> *
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=91554044*
>  >
>
> Here are a few notable highlights:
>
> - Java 11 support
> - Support for Zstandard, which achieves compression comparable to gzip with
> higher compression and especially decompression speeds(KIP-110)
> - Avoid expiring committed offsets for active consumer group (KIP-211)
> - Provide Intuitive User Timeouts in The Producer (KIP-91)
> - Kafka's replication protocol now supports improved fencing of zombies.
> Previously, under certain rare conditions, if a broker became partitioned
> from Zookeeper but not the rest of the cluster, then the logs of replicated
> partitions could diverge and cause data loss in the worst case (KIP-320)
> - Streams API improvements (KIP-319, KIP-321, KIP-330, KIP-353, KIP-356)
> - Admin script and admin client API improvements to simplify admin
> operation (KIP-231, KIP-308, KIP-322, KIP-324, KIP-338, KIP-340)
> - DNS handling improvements (KIP-235, KIP-302)
>
> Release notes for the 2.1.0 release:
> http://home.apache.org/~lindong/kafka-2.1.0-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote ***
>
> * Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~lindong/kafka-2.1.0-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~lindong/kafka-2.1.0-rc0/javadoc/
>
> * Tag to be voted upon (off 2.1 branch) is the 2.1.0-rc0 tag:
> https://github.com/apache/kafka/tree/2.1.0-rc0
>
> * Documentation:
> *http://kafka.apache.org/21/documentation.html*
> 
>
> * Protocol:
> http://kafka.apache.org/21/protocol.html
>
> * Successful Jenkins builds for the 2.1 branch:
> Unit/integration tests: *https://builds.apache.org/job/kafka-2.1-jdk8/38/
> *
>
> Please test and verify the release artifacts and submit a vote for this RC,
> or report any issues so we can fix them and get a new RC out ASAP. Although
> this release vote requires PMC votes to pass, testing, votes, and bug
> reports are valuable and appreciated from everyone.
>
> Cheers,
> Dong
>


R: When Kafka stores group information in zookeeper?

2018-10-24 Thread Spadoni Marco
Hi friends,
however I suggest you to be aware of the contents of the mail by Manikumar 
dated October 4 @ 16:44 (see mail list archive):
"Deprecated scala consumers/producers are removed from 2.0 release. Old 
consumer related options are removed from /kafka-consumer-groups.sh tool." ...
Best,
Marco

-Messaggio originale-
Da: 赖剑清 [mailto:laijianq...@tp-link.com.cn]
Inviato: martedì 23 ottobre 2018 05:55
A: users@kafka.apache.org
Oggetto: RE: When Kafka stores group information in zookeeper?

Yeah, it works.
It all depends on the address of the bootstrap-server: address of zookeeper 
starts old consumer and kafka-brokers' address with new consumer.

Thank you very much!

>-Original Message-
>From: Peter Bukowinski [mailto:pmb...@gmail.com]
>Sent: Tuesday, October 23, 2018 11:02 AM
>To: users@kafka.apache.org
>Subject: Re: When Kafka stores group information in zookeeper?
>
>It all depends on which type of consumer you are using. If you use an
>old
>(original) consumer, you must specify one or more zookeepers since
>group management info is stored in zookeeper. If you use a new
>consumer, group management is handled by the kafka cluster itself so
>you must specify one or more brokers in the bootstrap-server list.
>Kafka has supported both original and new  consumer styles since 0.9.
>
>In summary, kafka stores consumer group info in zookeeper only if you
>are using the old consumer style. It is a consumer-specific setting
>entirely independent of topic configuration.
>
>-- Peter
>
>> On Oct 22, 2018, at 7:49 PM, 赖剑清  wrote:
>>
>> Hi, Kafka users:
>>
>> I tried to gain the information of topic-consumer groups using kafka-
>consumer-groups.sh. And I found commands below receive different infos:
>> ./kafka-consumer-groups.sh --list --zookeeper localhost:2181
>> ./kafka-consumer-groups.sh --list --new-consumer --bootstrap-server
>> localhost:9092
>>
>> I suppose the first command get data from zookeeper while the second
>> one
>from the coordinator and my question is:
>> When Kafka store group information in zookeeper? When in coordinator?
>> Is there any parameter I can specify while creating topic or
>> beginning a new
>consumer group to make sure these information store in exactly destination?
>>
>> Version of the broker is 0.9.0.1 and the client is 0.9.0.1 in Java.


This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. If 
you have received this email in error please notify postmas...@italiaonline.it .
www.italiaonline.it



Transactional Id authorization failed

2018-10-24 Thread Vadim Nikitin

Hello All.

I'm trying to setup the KAFKA cluster with transactions and 
authorization enabled.

I use "spring-kafka" integration. Here is my code in Java:

    @Bean
    public DefaultKafkaProducerFactory createDefaultKafkaProducerFactory(
    KafkaProperties kafkaProperties,
    @Value("${my.kafka.producer.transaction-prefix}") String 
transactionIdPrefix) {
    Map producerProperties = 
kafkaProperties.buildProducerProperties();

producerProperties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
producerProperties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 
1);

    producerProperties.put(ProducerConfig.RETRIES_CONFIG, 3);
    producerProperties.put(ProducerConfig.ACKS_CONFIG, "all");
producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
transactionIdPrefix);
    DefaultKafkaProducerFactory producerFactory = new 
DefaultKafkaProducerFactory(producerProperties);

    producerFactory.setTransactionIdPrefix(transactionIdPrefix);
    return producerFactory;
    }

    @Bean
    public KafkaTransactionManager kafkaTransactionManager(
    DefaultKafkaProducerFactory producerFactory) {
    KafkaTransactionManager ktm = new 
KafkaTransactionManager(producerFactory);

ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
    }

All works perfectly, when my KAFKA server do not have authorization 
enabled. But if I enable SASL_SSL plain authorization, then I get error 
at transaction initialization step.


spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule 
required username="${my.kafka.user}" password="${my.kafka.password}";


Caused by: 
org.springframework.transaction.CannotCreateTransactionException: Could 
not create Kafka transaction;
nested exception is 
org.apache.kafka.common.errors.TransactionalIdAuthorizationException: 
Transactional Id authorization failed
    at 
org.springframework.kafka.transaction.KafkaTransactionManager.doBegin(KafkaTransactionManager.java:140)
    at 
org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:377)
    at 
org.springframework.data.transaction.MultiTransactionStatus.registerTransactionManager(MultiTransactionStatus.java:68)
    at 
org.springframework.data.transaction.ChainedTransactionManager.getTransaction(ChainedTransactionManager.java:101)

    ... 96 common frames omitted
Caused by: 
org.apache.kafka.common.errors.TransactionalIdAuthorizationException: 
Transactional Id authorization failed



As I understand, I must setup "TransactionIdPrefix" in Producer to 
authorize my transaction on server.
But I cannot find any information about how to setup the same property 
on server side. For now server don't know anything about my value in 
"TransactionIdPrefix" and this the reason for this error.


Could you tell me, what should I do to avoid this error? Which parameter 
where I should set *on **server side*?



Vadim.


[VOTE] 2.1.0 RC0

2018-10-24 Thread Dong Lin
Hello Kafka users, developers and client-developers,

This is the first candidate for feature release of Apache Kafka 2.1.0.

This is a major version release of Apache Kafka. It includes 28 new KIPs and

critical bug fixes. Please see the Kafka 2.1.0 release plan for more
details:

*https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=91554044*


Here are a few notable highlights:

- Java 11 support
- Support for Zstandard, which achieves compression comparable to gzip with
higher compression and especially decompression speeds(KIP-110)
- Avoid expiring committed offsets for active consumer group (KIP-211)
- Provide Intuitive User Timeouts in The Producer (KIP-91)
- Kafka's replication protocol now supports improved fencing of zombies.
Previously, under certain rare conditions, if a broker became partitioned
from Zookeeper but not the rest of the cluster, then the logs of replicated
partitions could diverge and cause data loss in the worst case (KIP-320)
- Streams API improvements (KIP-319, KIP-321, KIP-330, KIP-353, KIP-356)
- Admin script and admin client API improvements to simplify admin
operation (KIP-231, KIP-308, KIP-322, KIP-324, KIP-338, KIP-340)
- DNS handling improvements (KIP-235, KIP-302)

Release notes for the 2.1.0 release:
http://home.apache.org/~lindong/kafka-2.1.0-rc0/RELEASE_NOTES.html

*** Please download, test and vote ***

* Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~lindong/kafka-2.1.0-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~lindong/kafka-2.1.0-rc0/javadoc/

* Tag to be voted upon (off 2.1 branch) is the 2.1.0-rc0 tag:
https://github.com/apache/kafka/tree/2.1.0-rc0

* Documentation:
*http://kafka.apache.org/21/documentation.html*


* Protocol:
http://kafka.apache.org/21/protocol.html

* Successful Jenkins builds for the 2.1 branch:
Unit/integration tests: *https://builds.apache.org/job/kafka-2.1-jdk8/38/
*

Please test and verify the release artifacts and submit a vote for this RC,
or report any issues so we can fix them and get a new RC out ASAP. Although
this release vote requires PMC votes to pass, testing, votes, and bug
reports are valuable and appreciated from everyone.

Cheers,
Dong