Re: Flush Kafka topic

2017-08-23 Thread Hans Jespersen
in 0.11 and above see the CLI command bin//kafka-delete-records.sh 

-hans




> On Aug 23, 2017, at 7:28 PM, Rahul Singh  wrote:
> 
> Hello all,
> 
> I am unable to purge the topic data from Kafka. Is there any class to flush
> all topic data.
> 
> Thank you



Re: Pinning clients to specific brokers

2017-08-23 Thread Hans Jespersen
We (Confluent) run Kafka as a SaaS-based cloud offering and we do not see any 
reason for this feature so I just don’t understand the motivation for it. 
Please explain.

-hans

-- 
/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */




> On Aug 23, 2017, at 12:42 AM, Mohit Chawla  
> wrote:
> 
> Hey Hans,
> 
> Thanks for your reply.
> 
> In a cloud environment this can be useful. Perhaps if partitioning and
> replicas are selected appropriately, this could be possible ?
> 
> Thanks,
> Mohit
> 
> On Tuesday, August 22, 2017, Hans Jespersen  wrote:
> 
>> Doing that doesn't really make sense in a Kafka cluster because the topic
>> partitions and their replicas are spread out across many brokers in the
>> cluster. That's what enables the parallel processing and fault tolerance
>> features of Kafka.
>> 
>> -hans
>> 
>>> On Aug 22, 2017, at 3:14 AM, Mohit Chawla > > wrote:
>>> 
>>> Hi folks,
>>> 
>>> Is it possible to pin kafka clients to use only specific brokers
>> throughout
>>> their lifetime and not just for the initial bootstrapping ?
>>> 
>>> Thanks,
>>> Mohit
>> 



Re: kafka in unrecoverable state

2017-08-23 Thread Murad Mamedov
Hi David,

Thanks for reply. However, I don't have problem with number of replicas. I
have 3 brokers. And topics configured accordingly, especially
__consumer_offsets

Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3
Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

And everything was working find for months, until today.

Why would I want changing replication factor? To what value?

On Wed, Aug 23, 2017 at 11:19 PM, David Frederick  wrote:

> |> NotEnoughReplicasException: Number of  insync replicas for partition
> __consumer_offsets-17 is [1], below required minimum [2]
>
> Please refer to
> https://stackoverflow.com/questions/37960767/how-to-
> change-the-replicas-of-kafka-topic.
> Hope it helps!
>
>
> On Aug 23, 2017 5:17 AM, "Murad Mamedov"  wrote:
>
> > Hi,
> >
> > Did you manage to find the root cause of this issue?
> >
> > Same thing happened here.
> >
> > Thanks in advance
> >
> > On Tue, Jun 13, 2017 at 7:50 PM, Paul van der Linden 
> > wrote:
> >
> > > I managed to solve it by:
> > > - stopping and deleting all data on kafka & zookeeper
> > > - stopping all consumers and producers
> > > - starting kafka & zookeeper, waiting till they are up
> > > - start all consumers & producers,
> > >
> > > Is there a better way to do this, without data loss and halting
> > everything?
> > >
> > > On Tue, Jun 13, 2017 at 4:28 PM, Paul van der Linden <
> p...@sportr.co.uk>
> > > wrote:
> > >
> > > > A few lines of the logs:
> > > >
> > > > [2017-06-13 15:25:37,343] INFO [GroupCoordinator 0]: Stabilized group
> > > > summarizer generation 701 (kafka.coordinator.GroupCoordinator)
> > > > [2017-06-13 15:25:37,345] INFO [GroupCoordinator 0]: Assignment
> > received
> > > > from leader for group summarizer for generation 701
> (kafka.coordinator.
> > > > GroupCoordinator)
> > > > [2017-06-13 15:25:37,345] ERROR [Replica Manager on Broker 0]: Error
> > > > processing append operation on partition __consumer_offsets-17
> > > > (kafka.server.ReplicaManager)
> > > > org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
> > > > insync replicas for partition __consumer_offsets-17 is [1], below
> > > required
> > > > minimum [2]
> > > > [2017-06-13 15:25:37,345] INFO [GroupCoordinator 0]: Preparing to
> > > > restabilize group summarizer with old generation 701
> > (kafka.coordinator.
> > > > GroupCoordinator)
> > > >
> > > > This keeps happening, for all consumer offsets and all groups, etc
> > > >
> > > > On Tue, Jun 13, 2017 at 4:21 PM, Paul van der Linden <
> > p...@sportr.co.uk>
> > > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I'm trying to find out how to at least get my kafka working again.
> > > >> Something went wrong and kafka has halted to a throughput of 0
> > > messages. It
> > > >> keeps looping on stablizing consumer groups, and erroring on an
> append
> > > >> operation to the offset paritions, plus Not enough replicas.
> > > >>
> > > >> The weird things is, that after not being able to work this out I
> want
> > > >> pretty brutal (luckily I can afford to loose more messages):
> > > >> - delete all kafka and zookeeper instances
> > > >> - updated kafka
> > > >> - cleared all disk
> > > >>
> > > >> Still kafka is in this unrecoverable error. Does anyone have any
> idea
> > > how
> > > >> to fix this?
> > > >>
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > Regards,
> > *Murad M*
> > *M (tr): +90 (533) 4874329*
> > *E: m...@muradm.net *
> >
>



-- 
Regards,
*Murad M*
*M (tr): +90 (533) 4874329*
*E: m...@muradm.net *


Re: kafka straming progress had be done a few minutes later

2017-08-23 Thread Guozhang Wang
Hello Jie,

Could you provide some more details in your "transformation" logic? For
example did you use any state stores? Note that if caching is enabled (by
default it is), then you may not see outputs until the cache is flushed
during commit.

Guozhang


On Wed, Aug 23, 2017 at 12:31 PM, Matthias J. Sax 
wrote:

> Not sure what your question is...
>
> Maybe you refer to commit interval that is 30 seconds by default. It
> could be, that you don't see any writes to the output topic before that.
> But it's a wild guess.
>
> You can try to set a shorter commit interval via StreamsConfig.
>
>
> -Matthias
>
> On 8/22/17 8:09 PM, 杰 杨 wrote:
> > Hi all:
> > When I find kafka stream api.I use it to consumer one topic and filter
> data and transform to another topic .
> > I foud that the progress will be done in a few minutes later?
> >
> > 发送自 Windows 10 版邮件应用
> >
>
>


-- 
-- Guozhang


Re: kafka straming progress had be done a few minutes later

2017-08-23 Thread Matthias J. Sax
Not sure what your question is...

Maybe you refer to commit interval that is 30 seconds by default. It
could be, that you don't see any writes to the output topic before that.
But it's a wild guess.

You can try to set a shorter commit interval via StreamsConfig.


-Matthias

On 8/22/17 8:09 PM, 杰 杨 wrote:
> Hi all:
> When I find kafka stream api.I use it to consumer one topic and filter data 
> and transform to another topic .
> I foud that the progress will be done in a few minutes later?
> 
> 发送自 Windows 10 版邮件应用
> 



signature.asc
Description: OpenPGP digital signature


Re: kafka in unrecoverable state

2017-08-23 Thread David Frederick
|> NotEnoughReplicasException: Number of  insync replicas for partition
__consumer_offsets-17 is [1], below required minimum [2]

Please refer to
https://stackoverflow.com/questions/37960767/how-to-change-the-replicas-of-kafka-topic.
Hope it helps!


On Aug 23, 2017 5:17 AM, "Murad Mamedov"  wrote:

> Hi,
>
> Did you manage to find the root cause of this issue?
>
> Same thing happened here.
>
> Thanks in advance
>
> On Tue, Jun 13, 2017 at 7:50 PM, Paul van der Linden 
> wrote:
>
> > I managed to solve it by:
> > - stopping and deleting all data on kafka & zookeeper
> > - stopping all consumers and producers
> > - starting kafka & zookeeper, waiting till they are up
> > - start all consumers & producers,
> >
> > Is there a better way to do this, without data loss and halting
> everything?
> >
> > On Tue, Jun 13, 2017 at 4:28 PM, Paul van der Linden 
> > wrote:
> >
> > > A few lines of the logs:
> > >
> > > [2017-06-13 15:25:37,343] INFO [GroupCoordinator 0]: Stabilized group
> > > summarizer generation 701 (kafka.coordinator.GroupCoordinator)
> > > [2017-06-13 15:25:37,345] INFO [GroupCoordinator 0]: Assignment
> received
> > > from leader for group summarizer for generation 701 (kafka.coordinator.
> > > GroupCoordinator)
> > > [2017-06-13 15:25:37,345] ERROR [Replica Manager on Broker 0]: Error
> > > processing append operation on partition __consumer_offsets-17
> > > (kafka.server.ReplicaManager)
> > > org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
> > > insync replicas for partition __consumer_offsets-17 is [1], below
> > required
> > > minimum [2]
> > > [2017-06-13 15:25:37,345] INFO [GroupCoordinator 0]: Preparing to
> > > restabilize group summarizer with old generation 701
> (kafka.coordinator.
> > > GroupCoordinator)
> > >
> > > This keeps happening, for all consumer offsets and all groups, etc
> > >
> > > On Tue, Jun 13, 2017 at 4:21 PM, Paul van der Linden <
> p...@sportr.co.uk>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> I'm trying to find out how to at least get my kafka working again.
> > >> Something went wrong and kafka has halted to a throughput of 0
> > messages. It
> > >> keeps looping on stablizing consumer groups, and erroring on an append
> > >> operation to the offset paritions, plus Not enough replicas.
> > >>
> > >> The weird things is, that after not being able to work this out I want
> > >> pretty brutal (luckily I can afford to loose more messages):
> > >> - delete all kafka and zookeeper instances
> > >> - updated kafka
> > >> - cleared all disk
> > >>
> > >> Still kafka is in this unrecoverable error. Does anyone have any idea
> > how
> > >> to fix this?
> > >>
> > >
> > >
> >
>
>
>
> --
> Regards,
> *Murad M*
> *M (tr): +90 (533) 4874329*
> *E: m...@muradm.net *
>


Clients list

2017-08-23 Thread Emmett Butler
Hi Kafka users,

I maintain the PyKafka  python client
library, and the PyKafka listing on the clients page
 is
outdated. I made a Confluence account, but I don't see a way to edit the
page. How can I get edit access to this page so I can keep PyKafka's
listing up to date?

-- 
Emmett Butler | Software Engineer



Re: Pinning clients to specific brokers

2017-08-23 Thread Stephen Durfey
Mohit,

Can you describe your use case around why you want this to happen?

Thanks


From: Joao Reis 
Sent: Wednesday, August 23, 2017 11:08:02 AM
To: users@kafka.apache.org
Subject: Re: Pinning clients to specific brokers

Hey Mohit,

I agree with Hans, and additionally you may have trouble when the cluster needs 
to rebalance its partitions or when partition leadership changes node.

Also you may loose high-availability and cluster resilience in case of 
fail-over.

Cheers
João Reis
On 23 Aug 2017, at 08:42, Mohit Chawla 
> wrote:

Hey Hans,

Thanks for your reply.

In a cloud environment this can be useful. Perhaps if partitioning and
replicas are selected appropriately, this could be possible ?

Thanks,
Mohit

On Tuesday, August 22, 2017, Hans Jespersen 
> wrote:

Doing that doesn't really make sense in a Kafka cluster because the topic
partitions and their replicas are spread out across many brokers in the
cluster. That's what enables the parallel processing and fault tolerance
features of Kafka.

-hans

On Aug 22, 2017, at 3:14 AM, Mohit Chawla 

> wrote:

Hi folks,

Is it possible to pin kafka clients to use only specific brokers
throughout
their lifetime and not just for the initial bootstrapping ?

Thanks,
Mohit



__
This email has been scanned by the Symantec Email Security.cloud service.
For more information please visit 
http://www.symanteccloud.com
__


__
This email has been scanned by the Symantec Email Security.cloud service.
For more information please visit http://www.symanteccloud.com
__


Independent zookeep for kafka multi node

2017-08-23 Thread IT Consultant
Hi All

Can I have three independent zookeepers tagged to three kafka brokers
without any clustering or quorum ?

Would it be a good idea ?


Re: Pinning clients to specific brokers

2017-08-23 Thread Joao Reis
Hey Mohit,

I agree with Hans, and additionally you may have trouble when the cluster needs 
to rebalance its partitions or when partition leadership changes node.

Also you may loose high-availability and cluster resilience in case of 
fail-over.

Cheers
João Reis
On 23 Aug 2017, at 08:42, Mohit Chawla 
> wrote:

Hey Hans,

Thanks for your reply.

In a cloud environment this can be useful. Perhaps if partitioning and
replicas are selected appropriately, this could be possible ?

Thanks,
Mohit

On Tuesday, August 22, 2017, Hans Jespersen 
> wrote:

Doing that doesn't really make sense in a Kafka cluster because the topic
partitions and their replicas are spread out across many brokers in the
cluster. That's what enables the parallel processing and fault tolerance
features of Kafka.

-hans

On Aug 22, 2017, at 3:14 AM, Mohit Chawla 

> wrote:

Hi folks,

Is it possible to pin kafka clients to use only specific brokers
throughout
their lifetime and not just for the initial bootstrapping ?

Thanks,
Mohit



__
This email has been scanned by the Symantec Email Security.cloud service.
For more information please visit 
http://www.symanteccloud.com
__


__
This email has been scanned by the Symantec Email Security.cloud service.
For more information please visit http://www.symanteccloud.com
__


Re: Avro With Kafka

2017-08-23 Thread Kidong Lee
Classpath Schema Registry and Consul Schema Registry which I wrote can be
found here:
https://github.com/mykidong/kafka-etl-consumer/tree/master/src/main/java/kafka/etl/deserialize

Classpath Schema Registry can be used like this:

// topic and avro schema classpath properties.
// topic key must be in topic list.
Properties topicAndPathProps = new Properties();
topicAndPathProps.put("item-view-event", "/META-INF/avro/item-view-event.avsc");

AvroDeserializeService avroDeserializeService = new
ClasspathAvroDeserializeService(topicAndPathProps);
Schema schema = avroDeserializeService.getSchema("item-view-event");


And Consul Schema Registry:
// topic and avro schema consul key path properties.
// topic key must be in topic list.
Properties topicAndPathProps = new Properties();
topicAndPathProps.put("item-view-event", "avro-schemas/item-view-event");

// consul agent host and port.
String agentHost = "localhost";
int agentPort = 8500;

AvroDeserializeService avroDeserializeService = new
ConsulAvroDeserializeService(topicAndPathProps, agentHost, agentPort);
Schema schema = avroDeserializeService.getSchema("item-view-event");


- Kidong.


2017-08-18 13:56 GMT+09:00 Kidong Lee :

> You can send avro record to kafka and consume it without schema registry.
>
> In my approach, avro schema file avsc must be in the classpath on both
> producer and consumer side.
>
> On producer side, first write value avro serializer and set the properties
> of key.serializer and value.serializer to kafka producer configuration.
>
> For instance, the following class is avro serializer for value:
>
> import domain.Event;
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericDatumWriter;
> import org.apache.avro.generic.GenericRecord;
> import org.apache.avro.io.DatumWriter;
> import org.apache.avro.io.Encoder;
> import org.apache.avro.io.EncoderFactory;
> import org.apache.kafka.common.serialization.Serializer;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.ByteArrayOutputStream;
> import java.io.IOException;
> import java.util.Map;
>
>
> /**
>  * Created by mykidong on 2016-05-17.
>  */
> public class KafkaAvroEventSerializer implements Serializer {
>
> private static Logger log = 
> LoggerFactory.getLogger(KafkaAvroEventSerializer.class);
>
> private Schema schema;
>
>
> @Override
> public void configure(Map map, boolean b) {
>
> // get avro avsc schema path from kafka configuration.
>
> String avroSchemaPath = (String) map.get("event.avro.schema.path");
>
> Schema.Parser parser = new Schema.Parser();
> try {
>
> // construct avro schema instance from the classpath.
>
> schema = 
> parser.parse(getClass().getResourceAsStream(avroSchemaPath));
> }catch (IOException e)
> {
> throw new RuntimeException(e);
> }
> }
>
> @Override
> public byte[] serialize(String s, Event event) {
> try {
> GenericRecord datum = new GenericData.Record(schema);
> datum.put("eventType", event.getEventType());
> datum.put("value", event.getValue());
>
> ByteArrayOutputStream out = new ByteArrayOutputStream();
> DatumWriter writer = new 
> GenericDatumWriter(schema);
> Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
> writer.write(datum, encoder);
> encoder.flush();
>
> byte[] avroBytes = out.toByteArray();
> out.close();
>
> return avroBytes;
> }
> catch (Exception e)
> {
> throw new RuntimeException(e);
> }
> }
>
> @Override
> public void close() {
>
> }
> }
>
>
> Kafka producer will send Event which should be replaced with your message:
>
> Properties kafkaProp = new Properties();
> ..
> kafkaProp.put("key.serializer", 
> "org.apache.kafka.common.serialization.IntegerSerializer");
>
> // the value avro serializer written in the above.
>
> kafkaProp.put("value.serializer", "your.package.KafkaAvroEventSerializer");
> ..
>
> producer = new KafkaProducer<>(kafkaProp);
>
> producer.send(new ProducerRecord(eventType, event));
>
>
> On consumer side, avro schema instance should be cached, because the messages 
> consumed from kafka must be deserialized, which costs some latency.
>
> Avro schema instance can be constructed from the classpath and mapped with 
> event type key like this:
>
> import api.dao.AvroSchemaRegistryDao;
> import org.apache.avro.Schema;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.springframework.beans.factory.InitializingBean;
>
> import java.util.Properties;
> import java.util.concurrent.ConcurrentHashMap;
> import java.util.concurrent.ConcurrentMap;
>
>
> public class MapAvroSchemaRegistryDao implements AvroSchemaRegistryDao, 
> 

Re: kafka in unrecoverable state

2017-08-23 Thread Murad Mamedov
Hi,

Did you manage to find the root cause of this issue?

Same thing happened here.

Thanks in advance

On Tue, Jun 13, 2017 at 7:50 PM, Paul van der Linden 
wrote:

> I managed to solve it by:
> - stopping and deleting all data on kafka & zookeeper
> - stopping all consumers and producers
> - starting kafka & zookeeper, waiting till they are up
> - start all consumers & producers,
>
> Is there a better way to do this, without data loss and halting everything?
>
> On Tue, Jun 13, 2017 at 4:28 PM, Paul van der Linden 
> wrote:
>
> > A few lines of the logs:
> >
> > [2017-06-13 15:25:37,343] INFO [GroupCoordinator 0]: Stabilized group
> > summarizer generation 701 (kafka.coordinator.GroupCoordinator)
> > [2017-06-13 15:25:37,345] INFO [GroupCoordinator 0]: Assignment received
> > from leader for group summarizer for generation 701 (kafka.coordinator.
> > GroupCoordinator)
> > [2017-06-13 15:25:37,345] ERROR [Replica Manager on Broker 0]: Error
> > processing append operation on partition __consumer_offsets-17
> > (kafka.server.ReplicaManager)
> > org.apache.kafka.common.errors.NotEnoughReplicasException: Number of
> > insync replicas for partition __consumer_offsets-17 is [1], below
> required
> > minimum [2]
> > [2017-06-13 15:25:37,345] INFO [GroupCoordinator 0]: Preparing to
> > restabilize group summarizer with old generation 701 (kafka.coordinator.
> > GroupCoordinator)
> >
> > This keeps happening, for all consumer offsets and all groups, etc
> >
> > On Tue, Jun 13, 2017 at 4:21 PM, Paul van der Linden 
> > wrote:
> >
> >> Hi,
> >>
> >> I'm trying to find out how to at least get my kafka working again.
> >> Something went wrong and kafka has halted to a throughput of 0
> messages. It
> >> keeps looping on stablizing consumer groups, and erroring on an append
> >> operation to the offset paritions, plus Not enough replicas.
> >>
> >> The weird things is, that after not being able to work this out I want
> >> pretty brutal (luckily I can afford to loose more messages):
> >> - delete all kafka and zookeeper instances
> >> - updated kafka
> >> - cleared all disk
> >>
> >> Still kafka is in this unrecoverable error. Does anyone have any idea
> how
> >> to fix this?
> >>
> >
> >
>



-- 
Regards,
*Murad M*
*M (tr): +90 (533) 4874329*
*E: m...@muradm.net *


Re: about Kafka behavior with distinct groups of consumers, but with the same group.id ?

2017-08-23 Thread Manikumar
Consumers with the same group.id are part of the same consumer group.
Topic/partitions are load-balanced over the consumer instances based on
their topic subscriptions.
In this case G1, G2 consumers are part of the same group, T1 is balanced
over G1 and T2 over G2.
This is a valid scenario.

On Wed, Aug 23, 2017 at 2:52 PM, Dominique De Vito 
wrote:

> Hi,
>
> I am wondering about Kafka behavior.
>
> So, let me explain the use case I face.
>
> I have 2 different groups of consumers:
>
> * group of consumers G1 to consume only msgs from topic T1
> * group of consumers G2 to consume only msgs from topic T2
>
> The question is... What happens if I set the same group.id for both
> groups?
>
> On one side, I say to Kafka: nodes in G1 and G2 are completely separated,
> as they read distinct topics.
>
> On the other side, I say to Kafka: nodes in G1 and G2 are somewhat similar,
> as they have the same group.id.
>
> * Is this use case authorized ?
> * if so, is Kafka behavior well defined for such a use case ? and what is
> the use case ?
> * even if authorized, I guess it's not the best pratice (to have distinct
> group of consumers with different topics but the same group.id), isn't it
> ?
>
> Many thanks.
>
> Regards,
> Dominique
>


about Kafka behavior with distinct groups of consumers, but with the same group.id ?

2017-08-23 Thread Dominique De Vito
Hi,

I am wondering about Kafka behavior.

So, let me explain the use case I face.

I have 2 different groups of consumers:

* group of consumers G1 to consume only msgs from topic T1
* group of consumers G2 to consume only msgs from topic T2

The question is... What happens if I set the same group.id for both groups?

On one side, I say to Kafka: nodes in G1 and G2 are completely separated,
as they read distinct topics.

On the other side, I say to Kafka: nodes in G1 and G2 are somewhat similar,
as they have the same group.id.

* Is this use case authorized ?
* if so, is Kafka behavior well defined for such a use case ? and what is
the use case ?
* even if authorized, I guess it's not the best pratice (to have distinct
group of consumers with different topics but the same group.id), isn't it ?

Many thanks.

Regards,
Dominique


Re: Kafka streams application (v 0.10.0.1) stuck at close

2017-08-23 Thread Damian Guy
Hi,

If you can then i'd recommend upgrading to a newer version. As you said
many bugs have been fixed since 0.10.0.1

On Wed, 23 Aug 2017 at 05:08 Balaprassanna Ilangovan <
balaprassanna1...@gmail.com> wrote:

> Hi,
>
> I have the following three question regarding Apache Kafka streams.
>
> 1. I am in a position to use v 0.10.0.1 of Apache Kafka though so many bugs
> related to streams are fixed in the later versions. My application consists
> of low level processors that run for more than an hour for larger files
> (video transcoding). So, we use a session timeout and request timeout of 2
> hrs. streams.close() is stuck for a long time even the processors are idle.
> Is there a reason? Is there a work around for this version?
>
>
There were some bugs to do with streams.close() in earlier versions that
may cause deadlocks. This may be the issue:
https://issues.apache.org/jira/browse/KAFKA-4366


> 2. Also, what does processorContext.commit() do exactly? Does it save the
> position of application in a topology or commit consumed message offset in
> the partition? Though commits are handled automatically by streams, should
> context.commit() be called at the end of each processor in a topology?
>
>
context.commit is just telling the Streams Library to commit that task the
next time it goes through the poll loop. You don't need to call this unless
you specifically want to commit after you have processed 1 or more records.
Otherwise this is handled automatically by the commit.interval.ms config


> 3. In a topology, if two processor completes successfully and the
> application goes down during third processor execution, does it start from
> first when the application comes back?
>

Each task will start from the last committed position. So if that was all
in a single thread, then it will start from the beginning again.


> --
> With Regards,
> Bala Prassanna I,
> 8124831208 <(812)%20483-1208>
>


Python Kafka client that has support for SASL_SSL with SCRAM-SHA-256/512

2017-08-23 Thread Alexei Levashov
Hello,

could someone point me in the direction of Python Kafka client that has
support for SASL_SSL with SCRAM-SHA-256/512?
The reason for asking is that this lib edenhill/librdkafka

seems to have configuration properties sasl.* ... for SASL_SSL with
SCRAM-SHA-256/512 but I couldn't find in the list
KAFKA/Clients#Clients-Python
 that
any of these clients provide access to these (similar) properties. I must
be missing something obvious.
If it is the wrong list to ask where else could I forward this question?

Any hints would be very helpful,
Thx,
-AL


Re: Pinning clients to specific brokers

2017-08-23 Thread Mohit Chawla
Hey Hans,

Thanks for your reply.

In a cloud environment this can be useful. Perhaps if partitioning and
replicas are selected appropriately, this could be possible ?

Thanks,
Mohit

On Tuesday, August 22, 2017, Hans Jespersen  wrote:

> Doing that doesn't really make sense in a Kafka cluster because the topic
> partitions and their replicas are spread out across many brokers in the
> cluster. That's what enables the parallel processing and fault tolerance
> features of Kafka.
>
> -hans
>
> > On Aug 22, 2017, at 3:14 AM, Mohit Chawla  > wrote:
> >
> > Hi folks,
> >
> > Is it possible to pin kafka clients to use only specific brokers
> throughout
> > their lifetime and not just for the initial bootstrapping ?
> >
> > Thanks,
> > Mohit
>


Re: Consumer reconsuming all kafka messages

2017-08-23 Thread Elyahou Ittah
It use the default one, which is uncomitted earliest

On Wed, Aug 23, 2017 at 7:34 AM, Dan Markhasin  wrote:

> Is your storm consumer set to auto.offset.reset="earliest"?
>
> On 22 August 2017 at 10:05, Elyahou Ittah  wrote:
>
> > I checked the __consumer_offsets topic and here is an extraction from
> this
> > log for the same consumer group, a specific topic (users) and specific
> > partition (15):
> >
> > [storm_kafka_topology,users,15]::[OffsetMetadata[8327,{
> > topic-partition=users-15,
> > offset=8327, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> > 4]'}],CommitTime 1503230031557,ExpirationTime 1503316431557]
> > [storm_kafka_topology,users,15]::[OffsetMetadata[8330,{
> > topic-partition=users-15,
> > offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> > 4]'}],CommitTime 1503230332504,ExpirationTime 1503316732504]
> > [storm_kafka_topology,users,15]::[OffsetMetadata[6512,{
> > topic-partition=users-15,
> > offset=6512, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> > 4]'}],CommitTime 1503230748612,ExpirationTime 1503317148612]
> > [storm_kafka_topology,users,15]::[OffsetMetadata[8172,{
> > topic-partition=users-15,
> > offset=8172, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> > 4]'}],CommitTime 1503230791209,ExpirationTime 1503317191209]
> > [storm_kafka_topology,users,15]::[OffsetMetadata[8330,{
> > topic-partition=users-15,
> > offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> > 4]'}],CommitTime 1503230821337,ExpirationTime 1503317221337]
> > [storm_kafka_topology,users,15]::[OffsetMetadata[8333,{
> > topic-partition=users-15,
> > offset=8333, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> > 4]'}],CommitTime 1503231513311,ExpirationTime 1503317913311]
> > [storm_kafka_topology,users,15]::[OffsetMetadata[8338,{
> > topic-partition=users-15,
> > offset=8338, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> > 4]'}],CommitTime 1503231603513,ExpirationTime 1503318003513]
> > [storm_kafka_topology,users,15]::[OffsetMetadata[8344,{
> > topic-partition=users-15,
> > offset=8344, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> > 4]'}],CommitTime 1503231693829,ExpirationTime 1503318093829]
> >
> > we can see here that the consumer was at offset 8330 at Sunday, August
> 20,
> > 2017 11:53:51.557 AM and at offset 6512 somes minutes after (the kafka
> > restart occured at this time)
> >
> > What can explain the consumer group to rewind partition like this ?
> >
> > On Mon, Aug 21, 2017 at 11:10 AM, Elyahou Ittah 
> > wrote:
> >
> > > attached log file
> > >
> > > On Mon, Aug 21, 2017 at 11:06 AM, Elyahou Ittah 
> > > wrote:
> > >
> > >> I am consuming from kafka using KafkaSpout of Storm and also in ruby
> > >> using ruby-kafka gem (both use new consumer API).
> > >>
> > >> I noticed that after a rolling restart of the kafka cluster. The
> > >> kafkaSpout reconsumed all kafka messages ignoring the committed
> > offsets...
> > >>
> > >> What can cause this behavior ?
> > >>
> > >> Attached kafka logs at this time (storm consumers are storm_consumer_1
> > >> and storm_consumer_2 and ruby consumer is ruby_kafka_consumer)
> > >>
> > >> I see these many lines like these for storm consumer but not for ruby
> > >> consumer:
> > >>
> > >> [2017-08-20 12:03:54,270] INFO [GroupCoordinator 0]: Group
> > >> storm_consumer_2 with generation 52 is now empty
> (__consumer_offsets-48)
> > >> (kafka.coordinator.group.GroupCoordinator)
> > >> [2017-08-20 12:03:54,701] INFO [GroupCoordinator 0]: Group
> > >> storm_consumer_2 with generation 56 is now empty
> (__consumer_offsets-48)
> > >> (kafka.coordinator.group.GroupCoordinator)
> > >>
> > >
> > >
> >
>