Re: Newbie Question

2020-03-28 Thread Hans Jespersen
I can tell from the terminology you use that you are familiar with traditional 
message queue products. Kafka is very different. Thats what makes it so 
interesting and revolutionary in my opinion.

Clients do not connect to topics because kafka is a distributed and clustered 
system where topics are sharded into pieces called partitions and the topic 
partitions are spread out across all the kafka brokers in the cluster (and also 
replicated several more times across the cluster for fault tolerance). When a 
client logically connects to a topic, its actually making many connections to 
many nodes in the kafka cluster which enables both parallel processing and 
fault tolerance.

Also when a client consumes a message, the message is not removed from a queue, 
it remains in kafka for many days (sometimes months or years). It is not “taken 
off the queue” it is rather “copied from the commit log”. It can be consumed 
again and again if needed because it is an immutable record of an event that 
happened.

Now getting back to your question of how to see where messages get consumed 
(copied). The reality is that they go many places and can be consumed many 
times. This makes tracing and tracking message delivery more difficult but not 
impossible. There are many tools both open source and commercial that can track 
data from producer to kafka (with replication) to multiple consumers. They 
typically involve taking telemetry from both clients (producers and consumers) 
and brokers (all of them as they act as a cluster) and aggregate all the data 
to see the full flow of messages in the system. Thats why the logs may seem 
overwelming and you need to look at the logs of all the broker (and perhaps all 
the clients as well) to get the full picture.

-hans 

> On Mar 28, 2020, at 4:50 PM, Colin Ross  wrote:
> 
> Hi All - just started to use Kafka. Just one thing driving me nuts. I want
> to get logs of each time a publisher or subscriber connects. I am trying to
> just get the IP that they connected from and the topic to which they
> connected. I have managed to do this through enabling debug in the
> kafka-authorizer, however, the number of logs are overwhelming as is the
> update rate (looks like 2 per second per client).
> 
> What I am actually trying to achieve is to understand where messages go, so
> I would be more than happy to just see notifications when messages are
> actually sent and actually taken off the queue.
> 
> Is there a more efficient way of achieving my goal than turning on debug?
> 
> Cheers
> Rossi


Re: Kafka with RAID 5 on. busy cluster.

2020-03-28 Thread Hans Jespersen
RAID 5 typically is slower because Kafka is very write heavy load and that 
creates a bottleneck because writes to any disk require parity writes on the 
other disks.

-hans

> On Mar 28, 2020, at 2:55 PM, Vishal Santoshi  
> wrote:
> 
> Ny one ?  We doing a series of tests to be confident, but if there is some
> data folks, who have had RAID 5 on kafka,  have to share, please do.
> 
> Regards.
> 
>> On Mon, Mar 23, 2020 at 11:29 PM Vishal Santoshi 
>> wrote:
>> 
>> << In RAID 5 one can loose more than only one disk RAID here will be data
>> corruption.
>>>> In RAID 5 if one looses more than only one disk RAID there will be data
>> corruption.
>> 
>> On Mon, Mar 23, 2020 at 11:27 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>> 
>>> One obvious issue is disk failure toleration . As in if RF =3 on.normal
>>> JBOD disk failure toleration is 2. In RAID 5 one can loose more than only
>>> one disk RAID here will be data corruption. effectively making the broker
>>> unusable, thus reducing our drive failure  toleration to 2 drives ON 2
>>> different brokers with the added caveat that we loose the whole broker as
>>> well ?
>>> 
>>> 
>>> On Mon, Mar 23, 2020 at 10:42 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>> 
>>>> We have a pretty busy kafka cluster with SSD and plain JBOD. We
>>>> planning or thinking of using RAID 5  ( hardware raid  or  6 drive SSD
>>>> bokers ) instead of JBID for various reasons. Hss some one used RAID 5 ( we
>>>> know that there is a write overhead parity bit on blocks and recreating a
>>>> dead drive )  and can share there experience on it . Confluent advises
>>>> against it but there are obvious ease one gets with RAID ( RAID 10 is to
>>>> expensive space wise )  Any advise /comments etc will be highly
>>>> appreciated.
>>>> 
>>>> Regards.
>>>> 
>>>> 


Re: Scaling Apache Kafka Producers & Consumers

2020-03-26 Thread Hans Jespersen
Very good description with pictures in the book Kafka: The Definitive Guide

https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html

-hans

> On Mar 26, 2020, at 12:00 PM, sunil chaudhari  
> wrote:
> 
> Again
> A consumer can have one or more consumer thread.
> The analogy of 12 partitions and 4 consumer is true when each consumer has
> 3 consumer threads.
> Please don’t skip the important factor “consumer thread” in this matter.
> 
> If you run each consumer with threads then you may need max 3 consumers for
> that consumer group.
> 
> If you have 12 partitions and you run 4 consumers with 4 consumer threads
> then 4 threads will be idle at any time T1.
> 
> I hope this is clear.
> 
> Thanks,
> Sunil.
> 
> On Thu, 26 Mar 2020 at 7:52 PM, Hans Jespersen  wrote:
> 
>>> As per my understanding, in Apache Kafka a single consumer from a
>> consumer
>>> group can consume messages from one partition only.
>> 
>> Not correct. A single consumer from a consumer group can consume from many
>> partitions. For example if you had a topic with 12 partitions and 4
>> consumers in a consumer group, each consumer in the group would consume
>> from 3 partitions.
>> 
>> -hans


Re: Reg : Slowness in Kafka

2020-03-26 Thread Hans Jespersen
Yes it should be going much faster than that. Something is wrong in your setup.

-hans

> On Mar 26, 2020, at 5:58 PM, Vidhya Sakar  wrote:
> 
> Hi Team,
> 
> The Kafka consumer is reading only 8 records per second.We have implemented
> apache Kafka and confluent connect S3. The confluent connect S3 collects
> the records and pushes it to S3 bucket.
> In this process, we are seeing some slowness like on an average only 8
> records is being processed for a second. am planning to have a higher
> throughout results, so that where there is a higher data load, it should
> process more number of records.
> 
> Thanks in advance.


Re: Scaling Apache Kafka Producers & Consumers

2020-03-26 Thread Hans Jespersen
> As per my understanding, in Apache Kafka a single consumer from a consumer
> group can consume messages from one partition only.

Not correct. A single consumer from a consumer group can consume from many 
partitions. For example if you had a topic with 12 partitions and 4 consumers 
in a consumer group, each consumer in the group would consume from 3 partitions.

-hans

Re: kafka connection from docker

2019-10-17 Thread Hans Jespersen
This is a great blog post that explains how kafka works with advertised 
listeners and docker

https://rmoff.net/2018/08/02/kafka-listeners-explained/

-hans

> On Oct 18, 2019, at 5:36 AM, Mich Talebzadeh  
> wrote:
> 
> I do not understand this.
> 
> You have on a physical host running zookeeper locally and a broker running
> as well. These are using default physical host portslike 2181, 9092 etc.
> 
> Then you have installed zookeeper and kaka broker on dockers?
> 
> docker run --net=host -d --name zookeeper -p 2181:2181 -p 2888:2888 -p
> 3888:3888 jplock/zookeeper
> 
> docker run -d --net=host --name kafka_broker1 -p 9092:9092 -e
> KAFKA_ADVERTISED_HOST_NAME=50.140.197.220 -e ZOOKEEPER_IP=50.140.197.220 -e
> KAFKA_BROKER_ID=1 -e KAFKA_BROKER_PORT=9092 -e KAFKA_ADVERTISED_PORT=9092
> ches/kafka
> 
> Ok --net=host means that the docker will use the host port as its own.
> Indeed other  mappings will be discarded. You can either map dockers to
> host port or use other posts but not both!
> HTH
> 
> 
> Dr Mich Talebzadeh
> 
> 
> 
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
> 
> 
> 
> http://talebzadehmich.wordpress.com
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> 
> 
> 
> 
>> On Thu, 17 Oct 2019 at 22:13, Wang, Shuo  wrote:
>> 
>> Hi,
>> 
>> I have a question regarding connecting to kafka broker from docker.
>> 
>> I have zookeeper and kafka broker running on my local machine.
>> I have a docker container running on the same local machine with
>> --network=host
>> I want to send message from inside the docker container to my local kafka
>> broker.
>> 
>> From inside the docker, I can connect to zookeeper and seek the existing
>> topicsby running:
>> `./bin/kafka-topics.sh --zookeeper localhost:2181 --list`
>> 
>> But I cannot connect to the kafka broker with either the
>> `kafka-console-consumer.sh` or `kafka-console-producer.sh`,
>> 
>> by running : `bin/kafka-console-consumer.sh --bootstrap-server
>> localhost:9092 --topic test`
>> I get:
>> 
>> ```
>> [2019-10-17 19:12:04,097] WARN [Consumer clientId=consumer-1,
>> groupId=console-consumer-99825] Error connecting to node aictjt:9092 (id: 0
>> rack: null) (org.apache.kafka.clients.NetworkClient)
>> java.net.UnknownHostException: aictjt
>> at java.net.InetAddress.getAllByName0(InetAddress.java:1281)
>> at java.net.InetAddress.getAllByName(InetAddress.java:1193)
>> at java.net.InetAddress.getAllByName(InetAddress.java:1127)
>> at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104)
>> at
>> 
>> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
>> at
>> 
>> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
>> at
>> 
>> org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
>> at
>> 
>> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:943)
>> at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:68)
>> at
>> 
>> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1114)
>> at
>> 
>> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1005)
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:537)
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
>> at
>> 
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:259)
>> at
>> 
>> org.apache.kafka.clients.consumer.in

Re: Requirements

2019-09-13 Thread Hans Jespersen
Gwen Shapira published a great whitepaper with Reference Architectures for
all Kafka and Confluent components in big and small environements and for
bare metal, VMs, and all 3 major public clouds.

https://www.confluent.io/resources/apache-kafka-confluent-enterprise-reference-architecture/


On Fri, Sep 13, 2019 at 8:26 AM Peter Menapace <
peter.menap...@vanderlande.com> wrote:

> Hi all,
>
> I have a small small question. In my company we would like to use Apache
> Kafka with KSQL.
>
> And my small question is: which hardware requirements do you have to run
> Kafka and KSQL in small and big environments?
>
>
>
> Best regards,
>
>
>
> Peter
>
>
>
> Mit freundlichen Grüßen,
>
>
>
> *Peter Menapace*
>
> Senior IT Architect - ICT Projects WP-DE
>
> T +4923197942200 | M +4915112253549
>
>
>
> [image: cid:image002.jpg@01D3CDB0.7BF22000] 
>
>
>
> *Vanderlande Industries B.V.*
>
> Vanderlandelaan 2, 5466 RB  Veghel |The Netherlands
>
> T +31 413 49 49 49 | www.vanderlande.com
>
>
>
> [image: LinkedIn.png]
>   [image:
> Twitter.png]   [image:
> cid:image005.png@01D3CDB0.7BF22000]
>   [image:
> cid:image006.png@01D3CDB0.7BF22000]
>   [image: youtube.png]
> 
>
>
>
> [image: cid:image008.jpg@01D3CDB0.7BF22000] 
>
>
> --
> Vanderlande Industries GmbH
> Sitz der Gesellschaft:
> Dortmund, Deutschland
>
>
> Amtsgericht Dortmund: HRB 8539
> Geschäftsführer: Rene Veldink
> www.vanderlande.com
>
>
> Diese E-Mail enthält vertrauliche und/oder rechtlichgeschützte
> Informationen.
> Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich
> erhalten haben,
> informieren Sie bitte den Absender und löschen Sie diese Mail.
> Das unerlaubte Kopieren sowie die unbefugte Weitergabe dieser E-Mail und
> der darin enthaltenen Informationen sind nicht gestattet.
>
> This e-mail may contain confidential and/or privilegedinformation.
> If you are not the intended recipient (or have received this e-mail in
> error) please notify the sender immediately and delete this e-mail.
> Any unauthorized copying, disclosure or distribution of the material in
> this e-mail is strictly forbidden.
>
> Think green: do you really need to print this e-mail?
>
>


Re: Update Replication Factor

2019-06-17 Thread Hans Jespersen
Take a look at the Admin Client API here
https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html

-hans


On Mon, Jun 17, 2019 at 4:27 PM shubhmeet kaur 
wrote:

> hi,
>
> I wish to updater the replciation factor of already created topic through
> Java code usign Kakfa 2.2.0 . I am having hard time finding the correct
> API. Since I am new to Kafka, any help would be appreciated.
>
> Thank you
> Meet
>


Re: Customers are getting same emails for roughly 30-40 times

2019-05-24 Thread Hans Jespersen
Its not just the config, you need to change your code. 

kafka.auto.commit.interval.ms=3000 means that consumers only commit offsets 
every 3 seconds so if there is any failure or rebalance they will reconsume up 
to 3 seconds of  data per partition. That could be many hundreds or thousands 
of messages.

I would recommend you not use auto commit at all and instead manually commit 
offsets immediately after sending each email or batch of emails.

-hans

> On May 24, 2019, at 4:35 AM, ASHOK MACHERLA  wrote:
> 
> Dear Team
> 
> 
> 
> First of all thanks for reply on this issue.
> 
> 
> 
> Right now we are using these configurations at consumer side
> 
> 
> 
> kafka.max.poll.records=20
> 
> max.push.batch.size=100
> 
> enable.auto.commit=true
> 
> auto.offset.reset=latest
> 
> kafka.auto.commit.interval.ms=3000
> 
> kafka.session.timeout.ms=1
> 
> kafka.request.timeout.ms=3000
> 
> kafka.heartbeat.interval.ms=3000
> 
> kafka.max.poll.interval.ms=30
> 
> 
> 
> 
> 
> can you please suggest us to change the above config parameters .
> 
> 
> 
> 
> 
> We are using one Kafka topic with 10 partitions and 10 consumers, we are 
> sending lakhs of emails to the customers ,
> 
> It’s enough that much partitions and consumer ??
> 
> 
> 
> Otherwise I have to increase that partitions and consumers ??
> 
> 
> 
> Please suggest  us ..
> 
> 
> 
> 
> 
> in consumer logs , its showing
> 
> consumer group is rebalancing before committed because already group is 
> rebalancing
> 
> 
> 
> Sent from Outlook.
> 
> 
> 
> 
> From: Vincent Maurin 
> Sent: Friday, May 24, 2019 3:51:23 PM
> To: users@kafka.apache.org
> Subject: Re: Customers are getting same emails for roughly 30-40 times
> 
> It also seems you are using "at least one" strategy (maybe with
> auto-commit, or commiting after sending the email)
> Maybe a "at most once" could be a valid business strategy here ?
> 
> - at least once (you will deliver all the emails, but you could deliver
> duplicates)
> consumeMessages
> sendEmails
> commitOffsets
> 
> - at most once (you will never deliver duplicates, but you might never
> deliver an given email)
> consumeMessages
> commitOffsets
> sendEmails
> 
> Ideally, you could do "exactly once", but it is hard to achieve in the
> scenario, Kafka -> External system. The usual strategy here is to have an
> idempotent operation in combination with a "at least once" strategy
> 
> Best,
> Vincent
> 
> On Fri, May 24, 2019 at 10:39 AM Liam Clarke 
> wrote:
> 
>> Consumers will rebalance if you add partitions, add consumers to the group
>> or if a consumer leaves the group.
>> 
>> Consumers will leave the group after not communicating with the server for
>> a period set by session.timeout.ms. This is usually due to an exception in
>> the code polling with the consumer, or message processing code taking too
>> long.
>> 
>> If your consumers are reprocessing messages thus causing emails to send, it
>> implies that they weren't able to commit their offsets before
>> failing/timing out.
>> 
>> We had a similar issue in a database sink that consumed from Kafka and
>> duplicated data because it took too long, and hit the session timeout, and
>> then wasn't able to commits its offsets.
>> 
>> So I'd look closely at your consuming code and log every possible source of
>> exceptions.
>> 
>> Kind regards,
>> 
>> Liam Clarke
>> 
>>> On Fri, 24 May 2019, 7:37 pm ASHOK MACHERLA,  wrote:
>>> 
>>> Dear Team Member
>>> 
>>> Currently we are using Kafka 0.10.1, zookeeper 3.4.6 versions. In our
>>> project we have to send bulk emails to customers for this purpose we are
>>> using Kafka cluster setup.
>>> 
>>> But customers are getting same emails for roughly 30-40 times. This is
>>> very worst thing. In this situation our consumer group is showing
>>> rebalancing. Might be its could be reason ?
>>> Currently one topic we are using for this. We have 10 partitions and 10
>>> consumers.
>>> I hope we have enough partitions and consumer as well.
>>> But I don’t know exactly number of partitions & consumer are required to
>>> overcome this issue.
>>> 
>>> Can you please suggest us to fix this issue.
>>> 
>>> If anything changes required in Kafka side as well as consumer side??
>>> How to stop rebalancing issue??
>>> Please suggest us, Thanks
>>> 
>>> 
>>> 
>>> Sent from Outlook.
>>> 
>>> 
>> 


Re: Performance Testing Using Consumer-Perf-Test

2019-05-15 Thread Hans Jespersen
1) Are all 10 publishers producing to the same topic? What level of ACKs do
you have set? How many partitions are in your topic? Are all 10 consumers
in the same consumer group or are they supposed to be independent consumers
that each get the full set of messages published?
2) Depends what you are measuring (latency, throughput, or something else)?
If you publish first then your consumer has to consume either from the
beginning or in real-time and will only get messages published AFTER it
successfully subscribes. With 10 consumers you could generate a lot of
rebalancing if you don't start and balance them ahead of time.

-hans





On Wed, May 15, 2019 at 8:45 AM M. Manna  wrote:

> Hello,
>
> I am trying to do some performance testing using Kafka-Consumer-Perf-Test.
> Could somone please help me understand whether my setup is correct?
>
> 1) I would like to run a benchmark test to have 10 publishers publishing
> 100 messages (4MB) each and 10 subscribers.
>
> 2) For the above, do I need to run PRoducer first and then Consumer? Or, is
> it okay just to run consumer-perf-test ?
>
> Thanks,
>


Re: Kafka Connect - HDFS or FileStream

2019-05-13 Thread Hans Jespersen
Can you just use kafka-console-consumer and just redirect the output into a
file?

-hans


On Mon, May 13, 2019 at 1:55 PM Vinay Jain  wrote:

> Hi
>
> The data needs to be transferred to some other system in other network, and
> due to some security reasons, the other systems cannot be exposed . So the
> available mechanism is file based integration. Is there a production ready
> Kafka connect adapter which can create files in local directory.
>
> Regards
> Vinay Jain
>
> On Mon, May 13, 2019 at 3:41 PM Robin Moffatt  wrote:
>
> > Can you explain more about why you're writing a file with the data?
> > Presumably, this is for another application to consume; could it not take
> > the data from Kafka directly, whether with a native client or over the
> REST
> > proxy?
> > Oftentimes local files are unnecessary 'duck tape' for integration that
> can
> > be done in a better way.
> >
> >
> > --
> >
> > Robin Moffatt | Developer Advocate | ro...@confluent.io | @rmoff
> >
> >
> > On Mon, 13 May 2019 at 01:35, Vinay Jain  wrote:
> >
> > > Hi
> > >
> > > I would like to consume a Topic and save the AVRO messages in local
> > > directory in the AVRO file formats. As per Kafka Connect File Stream
> > > documentation, it is not for production use.
> > >
> > > Other Option I am thinking to use Kafka Connect - HDFS Sink but I am
> not
> > > sure if it can also write to the Local directory if we pass in the
> > variable
> > > hdfs.url the URL for local file system instead of HDFS path.
> > >
> > > Will this work or are there any other ready made options which can be
> > used
> > > for the same.
> > >
> > > Regards
> > >
> > > Vinay
> > >
> >
>


Re: Source Connector Task in a distributed env

2019-04-24 Thread Hans Jespersen
Your connector sounds a lot like this one
https://github.com/jcustenborder/kafka-connect-spooldir

I do not think you can run such a connector in distributed mode though.
Typically something like this runs in standalone mode to avoid conflicts.

-hans


On Wed, Apr 24, 2019 at 1:08 AM Venkata S A  wrote:

> Hello Team,
>
>   I am developing a custom Source Connector that watches a
> given directory for any new files. My question is in a Distributed
> environment, how will the tasks in different nodes handle the file Queue?
>
>   Referring to this sample
> <
> https://github.com/DataReply/kafka-connect-directory-source/tree/master/src/main/java/org/apache/kafka/connect/directory
> >
> ,
> poll() in SourceTask is polling the directory at specified interval for a
> new files and fetching the files in a Queue as below:
>
> Queue queue = ((DirWatcher) task).getFilesQueue();
> >
>
> So, When in a 3 node cluster, this is run individually by each task. But
> then, How is the synchronization happening between all the tasks in
> different nodes to avoid duplication of file reading to kafka ?
>
>
> Thank you,
> Venkata S
>


Re: Something like a unique key to prevent same record from being inserted twice?

2019-04-03 Thread Hans Jespersen
Ok what you are describing is different from accidental duplicate message 
pruning which is what the idempotent publish feature does.

You are describing a situation were multiple independent messages just happen 
to have the same contents (both key and value).

Removing those messages is an application specific function as you can imaging 
applications which would not want independent but identical messages to be 
removed (for example temperature sensor readings, heartbeat messages, or other 
telemetry data that has repeat but independent values).

Your best bet is to write a simple intermediate processor that implements your 
message pruning algorithm of choice and republishes (or not) to another topic 
that your consumers read from. Its a stateful app because it needs to remember 
1 or more past messages but that can be done using the Kafka Streams processor 
API and the embedded rocksdb state store that comes with Kafka Streams (or as a 
UDF in KSQL).

You can alternatively write your consuming apps to implement similar message 
pruning functionality themselves and avoid one extra component in the end to 
end architecture

-hans

> On Apr 2, 2019, at 7:28 PM, jim.me...@concept-solutions.com 
>  wrote:
> 
> 
> 
>> On 2019/04/02 22:43:31, jim.me...@concept-solutions.com 
>>  wrote: 
>> 
>> 
>>> On 2019/04/02 22:25:16, jim.me...@concept-solutions.com 
>>>  wrote: 
>>> 
>>> 
>>>> On 2019/04/02 21:59:21, Hans Jespersen  wrote: 
>>>> yes. Idempotent publish uses a unique messageID to discard potential 
>>>> duplicate messages caused by failure conditions when  publishing.
>>>> 
>>>> -hans  
>>>> 
>>>>> On Apr 1, 2019, at 9:49 PM, jim.me...@concept-solutions.com 
>>>>>  wrote:
>>>>> 
>>>>> Does Kafka have something that behaves like a unique key so a producer 
>>>>> can’t write the same value to a topic twice?
>>> 
>>> Hi Hans,
>>> 
>>>Is there some documentation or an example with source code where I can 
>>> learn more about this feature and how it is implemented?
>>> 
>>> Thanks,
>>> Jim
>> 
>> By the way I tried this...
>> echo "key1:value1" | ~/kafka/bin/kafka-console-producer.sh --broker-list 
>> localhost:9092 --topic TestTopic --property "parse.key=true" --property 
>> "key.separator=:" --property "enable.idempotence=true" > /dev/null
>> 
>> And... that didn't seem to do the trick - after running that command 
>> multiple times I did receive key1 value1 for as many times as I had run the 
>> prior command.
>> 
>> Maybe it is the way I am setting the flags...
>> Recently I saw that someone did this...
>> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
>> --producer-property enable.idempotence=true --request-required-acks -1
> 
> Also... the reason for my question is that we are going to have two JMS 
> topics with nearly redundant data in them have the UNION written to Kafka for 
> further processing.
> 


Re: Something like a unique key to prevent same record from being inserted twice?

2019-04-02 Thread Hans Jespersen
yes. Idempotent publish uses a unique messageID to discard potential duplicate 
messages caused by failure conditions when  publishing.

-hans  

> On Apr 1, 2019, at 9:49 PM, jim.me...@concept-solutions.com 
>  wrote:
> 
> Does Kafka have something that behaves like a unique key so a producer can’t 
> write the same value to a topic twice?


Re: Need help to find references to antipatterns/pitfalls/incorrect ways to use Kafka

2019-04-01 Thread Hans Jespersen
Yes but you have more than 1 POS terminal per location so you still don't need 
20,000 partitions. Just one per location. How many locations do you have?

In doesn’t matter anyway since you can build a Kafka cluster with up to 200,000 
partitions if you use the latest versions of Kafka.

https://blogs.apache.org/kafka/entry/apache-kafka-supports-more-partitions

“As a rule of thumb, we recommend each broker to have up to 4,000 partitions 
and each cluster to have up to 200,000 partitions”

-hans 

> On Apr 1, 2019, at 2:02 AM, Alexander Kuterin  wrote:
> 
> Thanks, Hans!
> We use location specific SKU pricing and send specific price lists to the
> specific POS terminal.
> 
> пн, 1 апр. 2019 г., 3:01 Hans Jespersen :
> 
>> Doesn’t every one of the 20,000 POS terminals want to get the same price
>> list messages? If so then there is no need for 20,000 partitions.
>> 
>> -hans
>> 
>>> On Mar 31, 2019, at 7:24 PM,  
>> wrote:
>>> 
>>> Hello!
>>> 
>>> 
>>> 
>>> I ask for your help in connection with the my recent task:
>>> 
>>> - Price lists are delivered to 20,000 points of sale with a frequency of
>> <10
>>> price lists per day.
>>> 
>>> - The order in which the price lists follow is important. It is also
>>> important that the price lists are delivered to the point of sale online.
>>> 
>>> - At each point of sale, an agent application is deployed, which
>> processes
>>> the received price lists.
>>> 
>>> 
>>> 
>>> This task is not particularly difficult. Help in solving the task is not
>>> required.
>>> 
>>> 
>>> 
>>> The difficulty is that Kafka in our company is a new "silver bullet", and
>>> the project manager requires me to implement the following technical
>>> decision:
>>> 
>>> deploy 20,000 Kafka consumer instances (one instance for each point of
>> sale)
>>> for one topic partitioned into 20,000 partitions - one partition per
>>> consumer.
>>> 
>>> Technical problems obtained in experiments with this technical decision
>> do
>>> not convince him.
>>> 
>>> 
>>> 
>>> Please give me references to the books/documents/blogposts. which clearly
>>> shows that Kafka not intended for this way to use (references to other
>>> anti-patterns/pitfalls will be useful).
>>> 
>>> My own attempts to find such references were unsuccessful.
>>> 
>>> 
>>> 
>>> Thank you!
>>> 
>>> 
>>> 
>> 


Re: Need help to find references to antipatterns/pitfalls/incorrect ways to use Kafka

2019-03-31 Thread Hans Jespersen
Doesn’t every one of the 20,000 POS terminals want to get the same price list 
messages? If so then there is no need for 20,000 partitions.

-hans

> On Mar 31, 2019, at 7:24 PM,   wrote:
> 
> Hello!
> 
> 
> 
> I ask for your help in connection with the my recent task:
> 
> - Price lists are delivered to 20,000 points of sale with a frequency of <10
> price lists per day.
> 
> - The order in which the price lists follow is important. It is also
> important that the price lists are delivered to the point of sale online.
> 
> - At each point of sale, an agent application is deployed, which processes
> the received price lists.
> 
> 
> 
> This task is not particularly difficult. Help in solving the task is not
> required.
> 
> 
> 
> The difficulty is that Kafka in our company is a new "silver bullet", and
> the project manager requires me to implement the following technical
> decision: 
> 
> deploy 20,000 Kafka consumer instances (one instance for each point of sale)
> for one topic partitioned into 20,000 partitions - one partition per
> consumer.
> 
> Technical problems obtained in experiments with this technical decision do
> not convince him.
> 
> 
> 
> Please give me references to the books/documents/blogposts. which clearly
> shows that Kafka not intended for this way to use (references to other
> anti-patterns/pitfalls will be useful).
> 
> My own attempts to find such references were unsuccessful.
> 
> 
> 
> Thank you!
> 
> 
> 


Re: Question on performance data for Kafka vs NATS

2019-03-21 Thread Hans Jespersen
Thats a 4.5 year old benchmark and it was run with a single broker node and 
only 1 producer and 1 consumer all running on a single MacBookPro. Definitely 
not the target production environment for Kafka. 

-hans

> On Mar 21, 2019, at 11:43 AM, M. Manna  wrote:
> 
> HI All,
> 
> https://nats.io/about/
> 
> this shows a general comparison of sender/receiver throughputs for NATS and
> other messaging system including our favourite Kafka.
> 
> It appears that Kafka, despite taking the 2nd place, has a very low
> throughput. My question is, where does Kafka win over NATS? is it the
> unique partitioning and delivery semantics? Or, is it something else.
> 
> From what I can see, NATS has traditional pub/sub and queuing. But it
> doesn't look like there is any proper retention system built for this.
> 
> Has anyone come across this already?
> 
> Thanks,


Re: Proxying the Kafka protocol

2019-03-19 Thread Hans Jespersen


You might want to take a look at kafka-proxy ( see 
https://github.com/grepplabs/kafka-proxy 
<https://github.com/grepplabs/kafka-proxy>).
It’s a true kafka protocol proxy and modified the metadata like advertized 
listeners so it works when there is no ip routing between the client and the 
brokers.

-hans





> On Mar 19, 2019, at 8:19 AM, James Grant  wrote:
> 
> Hello,
> 
> We would like to expose a Kafka cluster running on one network to clients
> that are running on other networks without having to have full routing
> between the two networks. In this case these networks are in different AWS
> accounts but the concept applies more widely. We would like to access Kafka
> over a single (or very few) host names.
> 
> In addition we would like to filter incoming messages to enforce some level
> of data quality and also impose some access control.
> 
> A solution we are looking into is to provide a Kafka protocol level proxy
> that presents to clients as a single node Kafka cluster holding all the
> topics and partitions of the cluster behind it. This proxy would be able to
> operate in a load balanced cluster behind a single DNS entry and would also
> be able to intercept and filter/alter messages as they passed through.
> 
> The advantages we see in this approach over the HTTP proxy is that it
> presents the Kafka protocol whilst also meaning that we can use a typical
> TCP level load balancer that it is easy to route connections to. This means
> that we continue to use native Kafka clients.
> 
> Does anything like this already exist? Does anybody think it would useful?
> Does anybody know of any reason it would be impossible (or a bad idea) to
> do?
> 
> James Grant
> 
> Developer - Expedia Group



Re: Kafka - Connect for logs processing

2019-03-15 Thread Hans Jespersen
Take a look at kafka-connect-spooldir and see if it meets your needs.

https://www.confluent.io/connector/kafka-connect-spooldir/

This connector can monitor a directory and pick up any new files that are 
created. Great for picking up batch files, parsing them, and publishing each 
line as if it were published in realtime.

-hans

> On Mar 15, 2019, at 7:52 AM, Pulkit Manchanda  wrote:
> 
> Hi All,
> 
> I am building a data pipeline to send logs from one data source to the
> other node.
> I am using Kafka Connect standalone for this integration.
> Everything works fine but the problem is on Day1 the log file is renamed as
> log_Day0 and a new log file  log_Day1 is created.
> And my Kafka Connect don't process the new log file.
> Looking for a solution. Any help is appreciated.
> 
> Thanks
> Pulkit


Re: How to balance messages in kafka topics with newly added partitions?

2019-01-27 Thread Hans Jespersen
Yes but I find this even easier to do with KSQL. 

CREATE STREAM OUTPUTTOPIC AS SELECT * FROM INPUTTOPIC;

There are similar examples like this that also filter messages while copying, 
or change the message format while copying on the KSQL Recipe page here
https://www.confluent.io/stream-processing-cookbook/

There is even an example for repartitioning topics using the PARTITIONS 
parameter.
CREATE STREAM clickstream_new WITH (PARTITIONS=5) AS SELECT * from 
clickstream_raw;
-hans

> On Jan 27, 2019, at 9:24 AM, Ryanne Dolan  wrote:
> 
> You can use MirrorMaker to copy data between topics.
> 
> Ryanne
> 
>> On Sun, Jan 27, 2019, 7:12 AM jaaz jozz > 
>> Thanks, Sönke
>> Is there any available kafka tool to move messages between topics?
>> 
>> On Sun, Jan 27, 2019 at 2:40 PM Sönke Liebau
>>  wrote:
>> 
>>> Hi Jazz,
>>> 
>>> I'm afraid the only way of rebalancing old messages is indeed to
>>> rewrite them to the topic - thus creating duplication.
>>> Once a message has been written to a partition by Kafka this
>>> assignment is final, there is no way of moving it to another
>>> partition.
>>> 
>>> Changing the partition count of topics at a later time can be a huge
>>> headache, if you depend on partitioning. For this exact reason the
>>> general recommendation is to overpartition your topics a little when
>>> creating them, so that you can add consumers as the data volume
>>> increases.
>>> 
>>> In your case the best solution might be to delete and then recreate
>>> the topic with more partitions. Now you can rewrite all your data and
>>> it will result in a clean partitioning.
>>> 
>>> Hope this helps a little, feel free to get back to us if you have more
>>> questions!
>>> 
>>> Best regards,
>>> Sönke
>>> 
>>>> On Sun, Jan 27, 2019 at 1:21 PM jaaz jozz  wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> I have kafka cluster with certain topic that had too few partitions,
>> so a
>>>> large backlog of messages was collected. After i added additional
>>>> partitions, only the newly messages balanced between all the new
>>> partitions.
>>>> 
>>>> What is the preferred way to balance the "old" backlog of messages
>> inside
>>>> the original partitions across all the new partitions?
>>>> 
>>>> I thought of reading and writing again all the messages backlog to this
>>>> topic and update the offsets accordingly, but it will make duplication
>> of
>>>> messages if a new consumer group will start consuming from the
>> beginning
>>> of
>>>> this topic.
>>>> 
>>>> How can i solve this?
>>>> 
>>>> Thanks.
>>> 
>>> 
>>> 
>>> --
>>> Sönke Liebau
>>> Partner
>>> Tel. +49 179 7940878
>>> OpenCore GmbH & Co. KG - Thomas-Mann-Straße 8 - 22880 Wedel - Germany
>>> 
>> 


Re: How to acknowledge after consuming the message from Kafka topic?

2019-01-21 Thread Hans Jespersen
Do you mean this node-kafka from 4 years ago 
(https://github.com/sutoiku/node-kafka)?

If so that’s a very very old client, only supports Apache Kafka 0.8 and stores 
offsets in zookeeper (which Kafka 0.9 and above no longer do).

I recommend you use a more up to date nodejs kafka client than this one.

-hans

> On Jan 21, 2019, at 10:02 AM, Rahul Singh 
>  wrote:
> 
> I am using node-kafka, I have used consumer.commit to commit offsets but
> don't know why when I restart the consumer it consume the committed offsets.
> 
> Thanks
> 
>> On Mon, Jan 21, 2019, 10:24 PM Hans Jespersen > 
>> Are you using kafka-node or node-rdkafka? In either case you should call
>> Consumer.commit(cb) or something similar to manually commit offsets (aka
>> acknowledge messages).
>> 
>> Alternatively so can set a config parameter on the consumer to autoCommit.
>> 
>> https://github.com/SOHU-Co/kafka-node/blob/master/README.md#consumer
>> 
>> https://github.com/Blizzard/node-rdkafka/blob/master/README.md
>> 
>> -hans
>> 
>>> On Jan 21, 2019, at 5:17 AM, Rahul Singh <
>> rahul.si...@smartsensesolutions.com> wrote:
>>> 
>>> I am using in Node with node-kafka module.
>>> 
>>>> On Mon, Jan 21, 2019 at 6:45 PM M. Manna  wrote:
>>>> 
>>>> Please read KafkaConsumer javadoc - your answer is already there.
>>>> 
>>>> Thanks,
>>>> 
>>>> On Mon, 21 Jan 2019 at 13:13, Rahul Singh <
>>>> rahul.si...@smartsensesolutions.com> wrote:
>>>> 
>>>>> Hi All,
>>>>> 
>>>>> I am testing kafka locally, I am able to produce and consume message.
>>>> But,
>>>>> after consuming the message from topic I want to acknowledge.
>>>>> 
>>>>> Looking for solution. Please revert if anyone have.
>>>>> 
>>>>> Thanks & Regards
>>>>> Rahul Singh
>>>>> 
>>>> 
>> 


Re: How to acknowledge after consuming the message from Kafka topic?

2019-01-21 Thread Hans Jespersen
Are you using kafka-node or node-rdkafka? In either case you should call 
Consumer.commit(cb) or something similar to manually commit offsets (aka 
acknowledge messages). 

Alternatively so can set a config parameter on the consumer to autoCommit.

https://github.com/SOHU-Co/kafka-node/blob/master/README.md#consumer

https://github.com/Blizzard/node-rdkafka/blob/master/README.md

-hans

> On Jan 21, 2019, at 5:17 AM, Rahul Singh 
>  wrote:
> 
> I am using in Node with node-kafka module.
> 
>> On Mon, Jan 21, 2019 at 6:45 PM M. Manna  wrote:
>> 
>> Please read KafkaConsumer javadoc - your answer is already there.
>> 
>> Thanks,
>> 
>> On Mon, 21 Jan 2019 at 13:13, Rahul Singh <
>> rahul.si...@smartsensesolutions.com> wrote:
>> 
>>> Hi All,
>>> 
>>> I am testing kafka locally, I am able to produce and consume message.
>> But,
>>> after consuming the message from topic I want to acknowledge.
>>> 
>>> Looking for solution. Please revert if anyone have.
>>> 
>>> Thanks & Regards
>>> Rahul Singh
>>> 
>> 


Re: The asynchronous sending of a message returns no error if the Kafka server is not started

2018-07-18 Thread Hans Jespersen
That is expected behavior. Typically there are multiple kafka brokers and so if 
one is down the client retries to send to a newly elected leader.

A send should not be considered successful until an ACK is received in the 
client from the kafka cluster.

By default the ACK is async for performance but the send() teturns a future so 
you can make it appear to be a synchrounous publish easily. Examples are in the 
javadoc.

-hans 

> On Jul 18, 2018, at 7:45 AM, jingguo yao  wrote:
> 
> The asynchronous sending of a message returns no error even if the
> Kafka server is not started.
> 
> For all the following tests, the local Kafka server is stopped. First,
> consider this piece of code:
> 
> public static void main(String[] args) throws Exception {
>  Properties config = new Properties();
>  config.put("client.id", InetAddress.getLocalHost().getHostName());
>  config.put("bootstrap.servers", "localhost:9092");
>  config.put("acks", "all");
>  config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> 
>  try (Producer producer = new KafkaProducer<>(config);) {
>ProducerRecord record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>producer.send(record, new Callback() {
>  @Override
>  public void onCompletion(RecordMetadata metadata, Exception exception) {
>if (exception != null) {
>  System.out.println("Exceptoin occurred!");
>  exception.printStackTrace(System.out);
>}
>  }
>});
>  }
> }
> 
> Running it will produce the following error:
> 
> Exception occurred!
> org.apache.kafka.common.errors.TimeoutException: Failed to update
> metadata after 6 ms.
> 
> Second, consider this piece of code:
> 
> public static void main(String[] args) throws Exception {
>  Properties config = new Properties();
>  config.put("client.id", InetAddress.getLocalHost().getHostName());
>  config.put("bootstrap.servers", "localhost:9092");
>  config.put("acks", "all");
>  config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> 
>  try (Producer producer = new KafkaProducer<>(config);) {
>ProducerRecord record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>System.out.println("Sending a message...");
>producer.send(record).get();
>System.out.println("Message sent");
>  }
> }
> 
> Running it will produce the following error:
> 
> Sending a message...
> Exception in thread "main" java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Failed to update
> metadata after 6 ms.
> at 
> org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:1168)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:859)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:797)
> at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:684)
> at com.xdf.foreign.KafkaTest.main(KafkaTest.java:46)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to
> update metadata after 6 ms.
> 
> Third, consider this piece of code:
> 
> public static void main(String[] args) throws Exception {
>  Properties config = new Properties();
>  config.put("client.id", InetAddress.getLocalHost().getHostName());
>  config.put("bootstrap.servers", "localhost:9092");
>  config.put("acks", "all");
>  config.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>  config.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
> 
>  try (Producer producer = new KafkaProducer<>(config);) {
>ProducerRecord record = new
> ProducerRecord<>("test-topic", null, "a-little-message");
>System.out.println("Sending a message...");
>producer.send(record);
>System.out.println("Message sent");
>  }
> }
> 
> Running it will produce no error. The following output will be
> produced:
> 
> Sending a message...
> Message sent
> 
> I know that the nature of asynchronous sending demands that send
> method ignore connection error to the Kafka server. But I think that
> it is better to document this kind of behaviour somewhere.
> 
> -- 
> Jingguo


Re: Security for individual partitions

2018-06-25 Thread Hans Jespersen
Kafka ACLs are at the topic level, not partition level.

Probably better to make 10 topics of 1 partition each and use topic ACLs to 
control access.

-hans

> On Jun 25, 2018, at 9:50 PM, Yash Ganthe  wrote:
> 
> Hi,
> 
> If I have a topic with 10 partitions, I would like each partition to be
> accessible to only certain consumers. Consumer 1 should be able to read
> from partition 1 but no other partition and so on. Is this possible in
> Kafka?
> 
> -Yash


Re: Is there expiration for committed Offset in the partition

2018-06-01 Thread Hans Jespersen
You should just recommit the same offsets sooner than every 24 hours (or 
whatever your commit topic retention period is set to). The expiry of offsets 
is based on the timestamp of the commits.

-hans

> On Jun 1, 2018, at 1:03 AM, Dinesh Subramanian  
> wrote:
> 
> Hi,
> 
> Facing duplication in below scenario
> 
> Last commit is happened in 3 days back in the consumer, after that no
> messages produced in the topic. so no commits..
> so after 3 days am stopping and restarting the consumer..  this time i
> faced duplication issue in the consumer as i have this consumer
> property "*auto.offset.reset
> = earliest*", It is consumed again from the beginning.. any helps will be
> appreciated.
> 
> *Thanks & Regards,*
> 
> *Dinesh S*


Re: Round-Robin assignment when non-nullable record key

2018-05-31 Thread Hans Jespersen
Why don’t to just put the metadata in the header and leave the key null so it 
defaults to round robin?

-hans

> On May 31, 2018, at 6:54 AM, M. Manna  wrote:
> 
> Hello,
> 
> I can see the this has been set as "KIP required".
> 
> https://issues.apache.org/jira/browse/KAFKA-
> 
> I have a use case where I simply want to use the key as some metadata
> information (but not really for any messages), but ideally would like to
> round-robin partition assignment. All I understand is, unless a hash
> collision happens, a key will have a specific partition
> 
> Could a PMC member please confirm if a KIP has been made available for
> voting etc., or is this still pending?
> 
> Also, is it meanwhile safe to extends DefaultPartitioner and use
> round-robin condition for both Null/non-null keys as override?
> 
> Regards,


Re: Facing Duplication Issue in kakfa

2018-05-28 Thread Hans Jespersen
Are you seeing 1) duplicate messages stored in a Kafka topic partition or 2) 
duplicate consumption and processing of a single message stored in a Kafka 
topic?

If it’s #1 then you can turn on the idempotent producer feature to get Exactly 
Once Semantics (EOS) while publishing.

If it’s #2 then you can examine more closely how your consumer is doing offset 
commits.
If you are committing offsets automatically by time then there is always a 
possibility that the last time window of messages your consumer did not yet 
commit will be received again when the consumer restarts. 

You can instead manually commit, possibly even after each message which will 
shrink the window of possible duplicate messages to 1, but at the cost of some 
performance. 

What many of the Kafka Sink Connectors do for exactly once processing is to 
store their offsets atomically with the data they write external to Kafka. For 
example a database connector would write the message data and the offsets to a 
database in one atomic write operation. Upon restart of the app it then rereads 
the offset from the database and resumes consumption from Kafka from the last 
offset point using seek() to reposition the Kafka offset for the consumer 
before the first call to poll()

These are the techniques most people use to get end to end exactly once 
processing with no duplicates even in the event of a failure.


-hans

> On May 28, 2018, at 12:17 AM, Karthick Kumar  wrote:
> 
> Hi,
> 
> Facing Duplication inconsistently while bouncing Kafka producer and
> consumer in tomcat node. any help will be appreciated to find out the root
> cause.
> 
> -- 
> With Regards,
> Karthick.K


Re: Can anyone help me to send messages in their original order?

2018-05-26 Thread Hans Jespersen
There are two concepts in Kafka that are not always familiar to people who have 
used other pub/sub systems. 

1) partitions: 

Kafka topics are partitioned which means a single topic is sharded into 
multiple pieces that are distributed across multiple brokers in the cluster for 
parallel processing.

Order is guaranteed per partition (not per topic).

You can think of each kafka topic partition like an exclusive queue is 
traditional messaging systems and order is not guaranteed when the data is 
spread out across multiple queues in tradition messaging either.

2) keys

Kafka messages have keys in addition the value (I.e body) and the header. When 
messages are published with the same key they will be all be sent in order to 
the same partition.

If messages are published with a “null” key then they will be spread out round 
robin across all partitions (which is what you have done).


Conclusion 

You will see ordered delivery if your either use a key when you publish or 
create a topic with one partition.


-hans

> On May 26, 2018, at 7:59 AM, Raymond Xie <xie3208...@gmail.com> wrote:
> 
> Thanks. By default, can you explain me why I received the message in wrong 
> order? Note there are only 9 lines from 1 to 9, but on consumer side their 
> original order becomes messed up.
> 
> ~~~sent from my cell phone, sorry if there is any typo
> 
> Hans Jespersen <h...@confluent.io> 于 2018年5月26日周六 上午12:16写道:
>> If you create a topic with one partition they will be in order.
>> 
>> Alternatively if you publish with the same key for every message they will 
>> be in the same order even if your topic has more than 1 partition.
>> 
>> Either way above will work for Kafka.
>> 
>> -hans
>> 
>> > On May 25, 2018, at 8:56 PM, Raymond Xie <xie3208...@gmail.com> wrote:
>> > 
>> > Hello,
>> > 
>> > I just started learning Kafka and have the environment setup on my
>> > hortonworks sandbox at home vmware.
>> > 
>> > test.csv is what I want the producer to send out:
>> > 
>> > more test1.csv ./kafka-console-producer.sh --broker-list
>> > sandbox.hortonworks.com:6667 --topic kafka-topic2
>> > 
>> > 1, abc
>> > 2, def
>> > ...
>> > 8, vwx
>> > 9, zzz
>> > 
>> > What I received are all the content of test.csv, however, not in their
>> > original order;
>> > 
>> > kafka-console-consumer.sh --zookeeper 192.168.112.129:2181 --topic
>> > kafka-topic2
>> > 
>> > 2, def
>> > 1, abc
>> > ...
>> > 9, zzz
>> > 8, vwx
>> > 
>> > 
>> > I read from google that partition could be the feasible solution, however,
>> > my questions are:
>> > 
>> > 1. for small files like this one, shall I really do the partitioning? how
>> > small a partition would be acceptable to ensure the sequence?
>> > 2. for big files, each partition could still contain multiple lines, how to
>> > ensure all the lines in each partition won't get messed up on consumer 
>> > side?
>> > 
>> > 
>> > I also want to know what is the best practice to process large volume of
>> > data through kafka? There should be better way other than console command.
>> > 
>> > Thank you very much.
>> > 
>> > 
>> > 
>> > **
>> > *Sincerely yours,*
>> > 
>> > 
>> > *Raymond*


Re: Can anyone help me to send messages in their original order?

2018-05-25 Thread Hans Jespersen
If you create a topic with one partition they will be in order.

Alternatively if you publish with the same key for every message they will be 
in the same order even if your topic has more than 1 partition.

Either way above will work for Kafka.

-hans

> On May 25, 2018, at 8:56 PM, Raymond Xie <xie3208...@gmail.com> wrote:
> 
> Hello,
> 
> I just started learning Kafka and have the environment setup on my
> hortonworks sandbox at home vmware.
> 
> test.csv is what I want the producer to send out:
> 
> more test1.csv ./kafka-console-producer.sh --broker-list
> sandbox.hortonworks.com:6667 --topic kafka-topic2
> 
> 1, abc
> 2, def
> ...
> 8, vwx
> 9, zzz
> 
> What I received are all the content of test.csv, however, not in their
> original order;
> 
> kafka-console-consumer.sh --zookeeper 192.168.112.129:2181 --topic
> kafka-topic2
> 
> 2, def
> 1, abc
> ...
> 9, zzz
> 8, vwx
> 
> 
> I read from google that partition could be the feasible solution, however,
> my questions are:
> 
> 1. for small files like this one, shall I really do the partitioning? how
> small a partition would be acceptable to ensure the sequence?
> 2. for big files, each partition could still contain multiple lines, how to
> ensure all the lines in each partition won't get messed up on consumer side?
> 
> 
> I also want to know what is the best practice to process large volume of
> data through kafka? There should be better way other than console command.
> 
> Thank you very much.
> 
> 
> 
> **
> *Sincerely yours,*
> 
> 
> *Raymond*


Re: Kafka mirror maker help

2018-04-27 Thread Hans Jespersen
Sorry I hit send a bit too soon. I was so focused on the systemd part of
the email and not the Mirror Maker part.
Confluent packages include Mirror Maker but the systemd scripts are setup
to use Confluent Replicator rather than Mirror Maker.
My apologies.

-hans

/**
 * Hans Jespersen, Director Systems Engineering, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Fri, Apr 27, 2018 at 11:56 AM, Hans Jespersen <h...@confluent.io> wrote:

> The latest Confluent packages now ship with systemd scripts. That is since
> Confluent Version 4.1 - which included Apache Kafka 1.1
>
> -hans
>
> /**
>  * Hans Jespersen, Director Systems Engineering, Confluent Inc.
>  * h...@confluent.io (650)924-2670
>  */
>
> On Fri, Apr 27, 2018 at 11:15 AM, Andrew Otto <o...@wikimedia.org> wrote:
>
>> Hiya,
>>
>> Saravanan, I saw you emailed my colleague Alex about WMF’s old debian
>> packaging.  I’ll reply here.
>>
>> We now use Confluent’s Kafka debian packaging which does not (or did not?)
>> ship with init scripts.  We don’t use Sys V init.d scripts anymore either,
>> but use systemd instead.  Our systemd service unit (ERB template format)
>> is
>> here:
>>
>> https://github.com/wikimedia/puppet/blob/production/modules/
>> confluent/templates/initscripts/kafka-mirror-instance.systemd.erb
>>
>>
>>
>> On Fri, Apr 27, 2018 at 1:35 AM, Amrit Jangid <jangid.ii...@gmail.com>
>> wrote:
>>
>> > You should share related info, such source-destination Kafka versions,
>> > sample Config or error if any.
>> >
>> > FYI,  Go through
>> > https://kafka.apache.org/documentation/#basic_ops_mirror_maker
>> >
>>
>
>


Re: Kafka mirror maker help

2018-04-27 Thread Hans Jespersen
The latest Confluent packages now ship with systemd scripts. That is since
Confluent Version 4.1 - which included Apache Kafka 1.1

-hans

/**
 * Hans Jespersen, Director Systems Engineering, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Fri, Apr 27, 2018 at 11:15 AM, Andrew Otto <o...@wikimedia.org> wrote:

> Hiya,
>
> Saravanan, I saw you emailed my colleague Alex about WMF’s old debian
> packaging.  I’ll reply here.
>
> We now use Confluent’s Kafka debian packaging which does not (or did not?)
> ship with init scripts.  We don’t use Sys V init.d scripts anymore either,
> but use systemd instead.  Our systemd service unit (ERB template format) is
> here:
>
> https://github.com/wikimedia/puppet/blob/production/
> modules/confluent/templates/initscripts/kafka-mirror-instance.systemd.erb
>
>
>
> On Fri, Apr 27, 2018 at 1:35 AM, Amrit Jangid <jangid.ii...@gmail.com>
> wrote:
>
> > You should share related info, such source-destination Kafka versions,
> > sample Config or error if any.
> >
> > FYI,  Go through
> > https://kafka.apache.org/documentation/#basic_ops_mirror_maker
> >
>


Re: Is Restart needed after change in trust store for Kafka 1.1 ?

2018-03-30 Thread Hans Jespersen
This is the KIP-226 for Dynamic Broker Configuration

https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration>

and this is the JIRA for what is released in Apache Kafka 1.1.0

https://issues.apache.org/jira/browse/KAFKA-6240 
<https://issues.apache.org/jira/browse/KAFKA-6240>

which seems in include dynamic reconfiguration of SSL keystores

https://issues.apache.org/jira/browse/KAFKA-6241 
<https://issues.apache.org/jira/browse/KAFKA-6241>


-- 
/**
 * Hans Jespersen, Director Systems Engineering, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */









> On Mar 30, 2018, at 2:27 PM, Raghav <raghavas...@gmail.com> wrote:
> 
> Anyone ?
> 
> On Thu, Mar 29, 2018 at 6:11 PM, Raghav <raghavas...@gmail.com> wrote:
> 
>> Hi
>> 
>> We have a 3 node Kafka cluster running. Time to time, we have some changes
>> in trust store and we restart Kafka to take new changes into account. We
>> are on Kafka 0.10.x.
>> 
>> If we move to 1.1, would we still need to restart Kafka upon trust store
>> changes ?
>> 
>> Thanks.
>> 
>> --
>> Raghav
>> 
> 
> 
> 
> -- 
> Raghav



Re: Is Kafka Streams right for me ?

2018-03-13 Thread Hans Jespersen
"If your system is stateless and the transformations are not interdependent"
then I would just look at using Kafka Connect's Single Message Transform
(SMT) feature.

-hans

/**
 * Hans Jespersen, Director Systems Engineering, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Tue, Mar 13, 2018 at 9:18 AM, Jacob Sheck <shec0...@gmail.com> wrote:

> If you are augmenting streaming data with dimensional data, or if you can
> transform your data with a map, filter or join operation Streams will be a
> good option.  If your system is stateless and the transformations are not
> interdependent, you may want to look into using one of the queue
> technologies like AcitveMQ.
>
> On Tue, Mar 13, 2018 at 10:00 AM Sameer Rahmani <lxsame...@gmail.com>
> wrote:
>
> > Thanks Jacob. I'm using kafka as a distributed queue and my data pipeline
> > is a several components which connected together via a stream
> abstraction.
> > each component has a input and output stream. Basically the source of
> > this pipeline can be anything and the output can be anything to.
> >
> > On Tue, Mar 13, 2018 at 1:27 PM, Jacob Sheck <shec0...@gmail.com> wrote:
> >
> > > Sameer when you say that you need to "consume from and produce to a
> > topic"
> > > to me that seems like a good fit for Kafka Streams.  Streaming your
> data
> > > out of Kafka for a transform and back in has some fundamental costs and
> > > operational challenges involved.  Are the events in your stream
> > stateless?
> > > If it isn't stateless streams will ensure a consistent playback of
> events
> > > if needed.  Without knowing more about your pipeline it is hard to make
> > > recommendations.  Are you possibly using Kafka as a distributed queue?
> > >
> > > On Tue, Mar 13, 2018 at 6:29 AM Sameer Rahmani <lxsame...@gmail.com>
> > > wrote:
> > >
> > > > Hi folks,
> > > > I need to consume from and produce to a topic. I have my own data
> > > pipeline
> > > > to process the data.
> > > > So I was wondering beside the stores and StreamDSL what does Kafka
> > > Streams
> > > > brings to the table
> > > > that might be useful to me ?
> > > >
> > >
> >
>


Re: replica.fetch.max.bytes split message or not ?

2018-02-25 Thread Hans Jespersen
This is the KIP-74 write up if you want to learn more about the motivation and 
implementation of the fetch.max.bytes feature.

For example, the 5th message in your example can now be fetched even if it is 
larger than fetch.max.bytes which was a great improvement in large message 
handling. Previous the consumer could get stuck and not make progress.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes

-hans

> On Feb 25, 2018, at 8:04 AM, adrien ruffie <adriennolar...@hotmail.fr> wrote:
> 
> Hi Waleed,
> 
> thank for you reply, that I thought too !
> but it was just to check if someone thought like me.
> 
> Best regards,
> Adrien
> 
> De : Waleed Fateem <waleed.fat...@gmail.com>
> Envoyé : dimanche 25 février 2018 16:36:28
> À : users@kafka.apache.org
> Objet : Re: replica.fetch.max.bytes split message or not ?
> 
> I would say you will get that 5th message in the next request.
> 
> I don't believe under any circumstance a Kafka broker will send or receive
> a partial message.
> 
> On Feb 24, 2018 10:52 AM, "adrien ruffie" <adriennolar...@hotmail.fr> wrote:
> 
>> Hello,
>> 
>> 
>> I have found this description in only documentation (without taking into
>> account spelling errors 'byes of messages'), a question stays in my mind ...
>> 
>> what happens if the size does not fall correctly on a message number?
>> 
>> Example if in one parition I have 10(messages) of 1024 bytes --> I have
>> 10240 bytes.
>> 
>> But If I set the replica.fetch.max.bytes option to 5000,
>> 
>> the 5th message will be splitted of of 120 bytes, and send after ? Or just
>> 4 messages will be sent and the 5th will be sent in the next fetch request ?
>> 
>> 
>> Thank and best regards,
>> 
>> Adrien
>> 
>> replica.fetch.max.bytes 1024 * 1024 The number of byes of messages to
>> attempt to fetch for each partition in the fetch requests the replicas send
>> to the leader.
>> 
>> 


Re: Kafka Consumer Offsets unavailable during rebalancing

2018-02-04 Thread Hans Jespersen
Do the consumers in consumer group ‘X’ have a regex subscription that matches 
the newly created topic ‘C’?

If they do then they will only discover this new topic once their 
‘metadata.max.age.ms’  metadata refresh interval has passed, which defaults to 
5 minutes.

metadata.max.age.ms The period of time in milliseconds after which we force 
a refresh of metadata even if we haven't seen any partition leadership changes 
to proactively discover any new brokers or partitions
-hans 


> On Feb 4, 2018, at 2:16 PM, Wouter Bancken <wouter.banc...@aca-it.be> wrote:
> 
> Hi Hans,
> 
> Thanks for the response!
> 
> However, I get this result for all topics, not just for the newly created
> topic.
> 
> Situation sketch:
> 1. I have a consumer group 'X' subscribed to topics 'A' and 'B' with
> partition assignments and lag information. Consumer group 'X' is "Stable".
> 2a. Topic 'C' is (being) created.
> 2b. During this creation, I do not have a partition assignment for consumer
> group 'X' for topics 'A' and 'B' but the consumer group is still "Stable".
> 3. A second later: I have a partition assignment for consumer group 'X' for
> topics 'A' and 'B' again and the consumer group is still "Stable".
> 
> I expected the state of consumer group 'X' during step 2b to be
> "PreparingRebalance" or "AwaitingSync".
> 
> Best regards,
> Wouter
> 
>> On 4 February 2018 at 21:25, Hans Jespersen <h...@confluent.io> wrote:
>> 
>> I believe this is expected behavior.
>> 
>> If there are no subscriptions to a new topic, and therefor no partition
>> assignments, and definitely no committed offsets, then lag is an undefined
>> concept. When the consumers subscribe to this new topic they may chose to
>> start at the beginning or end of the commit log so the lag cannot be
>> predicted in advance.
>> 
>> -hans
>> 
>>> On Feb 4, 2018, at 11:51 AM, Wouter Bancken <wouter.banc...@aca-it.be>
>> wrote:
>>> 
>>> Can anyone clarify if this is a bug in Kafka or the expected behavior?
>>> 
>>> Best regards,
>>> Wouter
>>> 
>>> 
>>> On 30 January 2018 at 21:04, Wouter Bancken <wouter.banc...@aca-it.be>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> I'm trying to write an external tool to monitor consumer lag on Apache
>>>> Kafka.
>>>> 
>>>> For this purpose, I'm using the kafka-consumer-groups tool to fetch the
>>>> consumer offsets.
>>>> 
>>>> When using this tool, partition assignments seem to be unavailable
>>>> temporarily during the creation of a new topic even if the consumer
>> group
>>>> has no subscription on this new topic. This seems to match the
>>>> documentation
>>>> <https://cwiki.apache.org/confluence/display/KAFKA/
>> Kafka+Client-side+Assignment+Proposal>
>>>> saying *"Topic metadata changes which have no impact on subscriptions
>>>> cause resync"*.
>>>> 
>>>> However, when this occurs I'd expect the state of the consumer to be
>>>> "PreparingRebalance" or "AwaitingSync" but it is simply "Stable".
>>>> 
>>>> Is this a bug in the tooling or is there a different way to obtain the
>>>> correct offsets for a consumer group during a rebalance?
>>>> 
>>>> I'm using Kafka 10.2.1 but I haven't found any related issues in recent
>>>> changelogs.
>>>> Best regards,
>>>> Wouter
>>>> 
>> 


Re: Kafka Consumer Offsets unavailable during rebalancing

2018-02-04 Thread Hans Jespersen
I believe this is expected behavior.

If there are no subscriptions to a new topic, and therefor no partition 
assignments, and definitely no committed offsets, then lag is an undefined 
concept. When the consumers subscribe to this new topic they may chose to start 
at the beginning or end of the commit log so the lag cannot be predicted in 
advance.

-hans

> On Feb 4, 2018, at 11:51 AM, Wouter Bancken <wouter.banc...@aca-it.be> wrote:
> 
> Can anyone clarify if this is a bug in Kafka or the expected behavior?
> 
> Best regards,
> Wouter
> 
> 
> On 30 January 2018 at 21:04, Wouter Bancken <wouter.banc...@aca-it.be>
> wrote:
> 
>> Hi,
>> 
>> I'm trying to write an external tool to monitor consumer lag on Apache
>> Kafka.
>> 
>> For this purpose, I'm using the kafka-consumer-groups tool to fetch the
>> consumer offsets.
>> 
>> When using this tool, partition assignments seem to be unavailable
>> temporarily during the creation of a new topic even if the consumer group
>> has no subscription on this new topic. This seems to match the
>> documentation
>> <https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Client-side+Assignment+Proposal>
>> saying *"Topic metadata changes which have no impact on subscriptions
>> cause resync"*.
>> 
>> However, when this occurs I'd expect the state of the consumer to be
>> "PreparingRebalance" or "AwaitingSync" but it is simply "Stable".
>> 
>> Is this a bug in the tooling or is there a different way to obtain the
>> correct offsets for a consumer group during a rebalance?
>> 
>> I'm using Kafka 10.2.1 but I haven't found any related issues in recent
>> changelogs.
>> Best regards,
>> Wouter
>> 


Re: Capturing and storing these Kafka events for query.

2018-01-11 Thread Hans Jespersen

Another approach would be to create the query first (in something like KSQL) 
and then send the Kafka data through the pre-existing streaming query. In this 
case the results would be going into various result topics.

Tools like KSQL also let you query historical data but you need to be sure that 
this won't create a performance impact if you have a lot of data and are 
continually query it over and over from the beginning to the end. It can work 
if the data set is small but as Manoj already said so well, it might be better 
to materialize your data stream into a repository designed with indexes beyond 
those in Kafka for faster or more complex interactive queries. 

-hans




> On Jan 11, 2018, at 1:33 PM, Manoj Khangaonkar <khangaon...@gmail.com> wrote:
> 
> Hi,
> 
> If I understood the question correctly , then the better approach is to
> consume events from topic and store in
> your favorite database. Then query the database as needed.
> 
> Querying the topic for messages in kafka is not recommended as that will be
> a linear search.
> 
> regards
> 
> On Thu, Jan 11, 2018 at 8:55 AM, Maria Pilar <pilife...@gmail.com> wrote:
> 
>> Hi all,
>> 
>> I have a requirement to be able to capture and store events for query,
>> and I'm trying to choose the best option for that:
>> 
>> 1) Capture the events from a separate topic, store events a state, in
>> order to convert a stream to a table, that means materializing the
>> stream.
>> The option for it, that I'm thinking is to use an external state that
>> is maintained in an external datastore, often a NoSQL (e.g Cassandra).
>> That solution provides the advantage unlimited size and it can be
>> accessed from multiple instances of the application or from different
>> applications.
>> 
>> 2) Other options could be query direct in local o internal state or in
>> memory, however, to be able to query directly onto a topic, I would
>> need to search a record by partition and offset, I don't know exactly
>> how to implement that option if it's possible.
>> 
>> Cheers.
>> 
> 
> 
> 
> -- 
> http://khangaonkar.blogspot.com/



Re: Consumer client not able to receive messages when one of broker is pushed down in the cluster

2018-01-05 Thread Hans Jespersen
Check that your __consumer_offsets topic is also setup with replication factor 
of 3 and has In Sync Replicas. Often it gets setup first as a one node cluster 
with RF=1 and then when the cluster is expanded to 3 nodes the step to increase 
the replication factor of this topic gets missed.

-hans

> On Jan 5, 2018, at 8:48 AM, rAhul <c.rahulku...@gmail.com> wrote:
> 
> Hi,
> 
> I have a Apache kafka cluster with 3 nodes(say 1,2,3) with replication
> factor of 3 and partitions as 3.
> 
> When my producer client, consumer client and the cluster are running, able
> to transfer messages from producer to consumer without any issues.
> 
> Now I stopped leader node say node 1 from the cluster and now say node 2 is
> promoted as leader.
> 
> Message flow from producer to consumer works fine without any issues.
> 
> Now I started node 1 and stopped node 2, either node 1 or node 3 is
> promoted as leader.
> 
> Now producer able to send messages but consumer not able to receive
> messages.
> 
> I see consumer lag using kafka manager web console.
> 
> Again if I start node 2, consumer able to receive messages.
> 
> Please suggest how to overcome this issue and fix it.
> 
> Thanks.


Re: Seeking advice on Kafka Streams and Kafka Connect

2017-12-21 Thread Hans Jespersen
It might be possible to do all the transformations in #2 inside Kafka Connect. 
Connect has a simple one message at a time transformation capability called 
Single Message Transforms (SMT). There are built in Transformation functions 
that you can declaratively add to any existing connector via configuration 
properties and without coding. If the built in functions are insufficient you 
can write your own SMT functions in Java.

-hans

> On Dec 21, 2017, at 7:19 AM, Bill Bejeck <b...@confluent.io> wrote:
> 
> Hi Mads,
> 
> Great question and yes your use case here is an excellent fit for Kafka
> Streams and Kafka Connect.
> 
> For step 2 you could use a KStram#flatMap operation to split it up into
> multiple rows.
> 
> Regarding a Cassandra connector, there is an existing one:
> 
>   1. For some background try
>   
> https://www.confluent.io/blog/kafka-connect-cassandra-sink-the-perfect-match/
> 
>   2. To download the connector go to
>   https://www.confluent.io/product/connectors/
> 
> As for writing directly to Cassandra, you *could*, in theory, do so with a
> KStream.process or KStream.foreach call.
> 
> But you'd have to code that yourself which includes error handling, retry
> logic etc.   Additionally, it's usually not recommended to write out
> directly to external systems from Kafka Streams.
> 
> I'd say it's better to leverage Kafka Connect for that.
> 
> HTH,
> Bill
> 
> 
> On Thu, Dec 21, 2017 at 5:49 AM, Mads Tandrup <
> mads.tand...@schneider-electric.com> wrote:
> 
>> Hi
>> 
>> Sorry for the simple question. I’m just starting to learn about Kafka
>> streams and connect and I’m struggling to understand the exact difference
>> and which one to use. I’m coming from Apache Storm so forgive me if I make
>> false assumptions.
>> 
>> I have a use case where I have a Kafka topic with some messages. What I
>> need to do:
>> 1. Read the messages
>> 2. Split and map the message into a number of rows
>> 3. Write the rows to Cassandra
>> 
>> It seems the first 2 steps are a natural fit for Kafka Streams. But it
>> seems the way to write to Cassandra is to use Kafka Connect.
>> Is that correctly understood?
>> 
>> Is there any way to connect Kafka Streams and Kafka Connect without
>> writing it to a new kafka topic? Since the transformation in step 2 is so
>> simple it seems a waste to write it to disk.
>> 
>> Is there any other way I should consider?
>> 
>> Best regards,
>> Mads
>> 
>> 


Re: Kafka streams for golang

2017-12-19 Thread Hans Jespersen
You can call the REST endpoints in KSQL from any programming language. I
wrote some stuff in node.js to call KSQL this way and it works great. The
results don't even have to go to a Kafka topic as the results of and POST
to /query all stream using HTTP.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Tue, Dec 19, 2017 at 12:52 PM, Victor Denisov <vic...@autonomic.ai>
wrote:

> Hi,
>
> Is anybody aware of any kafka streams work for golang?
>
> Thanks,
> --
> V.
>


Re: Failure to reset consumer offsets for specific topics

2017-10-25 Thread Hans Jespersen
I think you are just missing the —execute flag.

-hans

> On Oct 25, 2017, at 1:24 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> I wonder if you have hit KAFKA-5600.
> 
> Is it possible that you try out 0.11.0.1 ?
> 
> Thanks
> 
>> On Wed, Oct 25, 2017 at 1:15 PM, Dan Markhasin <minimi...@gmail.com> wrote:
>> 
>> I am using 0.11.0.0.
>> 
>> There is no difference configuration-wise - both have 10 partitions and 2
>> replicas. There are no errors in the logs, but looking in the data folder
>> it seems like Kafka is not updating the timeindex file for data1_log -
>> notice how the timeindex file for the current log segment is not being
>> updated.
>> 
>> bash-4.2$ pwd
>> /kafka/data/data1_log-1
>> bash-4.2$ ls -ltr | tail
>> -rw-rw-r-- 1 ibiuser it 1073731573 Oct 25 01:21 000337554984.log
>> -rw-rw-r-- 1 ibiuser it 943616 Oct 25 01:21 000337554984.index
>> -rw-rw-r-- 1 ibiuser it 1073734199 Oct 25 13:38 000339816017.log
>> -rw-rw-r-- 1 ibiuser it   10485756 Oct 25 13:38
>> 000341934289.timeindex
>> -rw-rw-r-- 1 ibiuser it 10 Oct 25 13:38
>> 000341934289.snapshot
>> -rw-rw-r-- 1 ibiuser it  0 Oct 25 13:38
>> 000339816017.timeindex
>> -rw-rw-r-- 1 ibiuser it 566712 Oct 25 13:38 000339816017.index
>> -rw-rw-r-- 1 ibiuser it 17 Oct 25 20:23 leader-epoch-checkpoint
>> -rw-rw-r-- 1 ibiuser it   10485760 Oct 25 23:03 000341934289.index
>> -rw-rw-r-- 1 ibiuser it  461590419 Oct 25 23:04 000341934289.log
>> 
>> For comparison, the beats topic:
>> 
>> bash-4.2$ cd ../beats-1
>> bash-4.2$ ls -ltr
>> total 3212088
>> -rw-rw-r-- 1 ibiuser it 17 Oct 25 00:23 leader-epoch-checkpoint
>> -rw-rw-r-- 1 ibiuser it 10 Oct 25 20:04
>> 000188672034.snapshot
>> -rw-rw-r-- 1 ibiuser it2773008 Oct 25 20:04
>> 000185224087.timeindex
>> -rw-rw-r-- 1 ibiuser it 1073741779 Oct 25 20:04 000185224087.log
>> -rw-rw-r-- 1 ibiuser it1967440 Oct 25 20:04 000185224087.index
>> -rw-rw-r-- 1 ibiuser it   10485760 Oct 25 23:03 000188672034.index
>> -rw-rw-r-- 1 ibiuser it   10485756 Oct 25 23:04
>> 000188672034.timeindex
>> -rw-rw-r-- 1 ibiuser it   50166645 Oct 25 23:04 000188672034.log
>> 
>> 
>> To give some context to why I'm even trying to reset the offsets, we had
>> encountered a strange situation earlier today:
>> 
>> 1) One of the brokers had a hardware failure, and had to be rebuilt from
>> scratch (data partition was gone)
>> 2) When it went down, we noticed a spike in lag in one particular consumer
>> group - it seems to have reset its offset to an earlier point in time (but
>> not the earliest offset of the topic); I have read other messages on this
>> mailing list of users who experienced the same behavior with 0.11.0.0
>> 3) The broker was reinstalled and rejoined the cluster with the same
>> broker.id (but with no data on it) - it rebalanced and eventually all
>> replicas became synced and the cluster was functioning normally.
>> 4) I then decided to bounce the same broker again to see if I can reproduce
>> the issue I saw in #2 - and as soon as the broker was restarted, the exact
>> same consumer group had its offset reset again and was lagging with
>> millions of records behind the current offset.
>> 5) I then tried to manually reset the consumer group's offset to a few
>> minutes before I restarted the broker, only to discover this strange
>> behavior where no matter which datetime value I provided, it kept resetting
>> to the latest offset.
>> 
>> 
>>> On 25 October 2017 at 22:48, Ted Yu <yuzhih...@gmail.com> wrote:
>>> 
>>> Do you mind providing a bit more information ?
>>> 
>>> Release of Kafka you use
>>> 
>>> Any difference between data1_log and the other, normal topic ?
>>> 
>>> Probably check the broker log where data1_log is hosted - see if there is
>>> some clue.
>>> 
>>> Thanks
>>> 
>>> On Wed, Oct 25, 2017 at 12:11 PM, Dan Markhasin <minimi...@gmail.com>
>>> wrote:
>>> 
>>>> I'm trying to use the kafka-consumer-groups.sh tool in order to rewind
>> a
>>>> consumer group's offset, however it seems to be returning the latest
>>> offset
>>>> regarding of the requested offset.
>>>> 
>>>> You can see in the below example that two consecutive commands to reset
>>> the
>

Re: Debugging invalid_request response from a .10.2 server for list offset api using librdkafka client

2017-09-27 Thread Hans Jespersen
The 0.8.1 protocol does not support target timestamps so it makes sense
that you would get an invalid request error if the client is sending a
Version 1 or Version 2 Offsets Request. The only Offset Request that a
0.8.1 broker knows how to handle is a Version 0 Offsets Request.

>From https://kafka.apache.org/protocol
INVALID_REQUEST 42 False This most likely occurs because of a request being
malformed by the client library or the message was sent to an incompatible
broker. See the broker logs for more details.

For more info on the 0.11 Kafka protocol and ListOffset Requests see

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI(AKAListOffset)

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Wed, Sep 27, 2017 at 10:20 AM, Vignesh <vignesh.v...@gmail.com> wrote:

> Correction in above mail, we get 42 - INVALID_REQUEST, not 43.
> Few other data points
>
> Server has following configs set
>
> inter.broker.protocol.version=0.8.1
>
> log.message.format.version=0.8.1
>
>
>
> My understanding is that we should get unsupported message format with
> above configurations, why do we get invalid_request?
>
>
> Thanks,
>
> Vignesh.
>
>
>
>
> On Wed, Sep 27, 2017 at 9:51 AM, Vignesh <vignesh.v...@gmail.com> wrote:
>
> > Hi,
> >
> > We are using LibrdKafka library version .11.0 and calling List Offset API
> > with a timestamp on a 0.10.2 kafka server installed in a windows machine.
> >
> > This request returns an error code, 43 - INVALID_REQUEST.
> >
> > We have other local installations of Kafka version 0.10.2 (also on
> > Windows) and are able to use the library successfully.
> >
> > Are there any settings on this specific server that is causing this
> error?
> > Which logs can we enable and look at to get additional details about what
> > is wrong with the request?
> >
> > Thanks,
> > Vignesh.
> >
>


Re: KSQL with Apache Kafka

2017-09-19 Thread Hans Jespersen
Those prerequisites are just for the Confluent CLI used in the quickstart. The 
Apache Kafka and Zookeeper versions included in the Confluent distribution are 
the latest and the same as the Apache Kafka download so it will work. You will 
just need to start Zookeeper and Kafka with the shell scripts in the ./bin 
directory rather than just typing “confluent start” as it says in the 
quickstart documentation.

-hans

> On Sep 19, 2017, at 8:41 PM, Koert Kuipers <ko...@tresata.com> wrote:
> 
> we are using the other components of confluent platform without installing
> the confluent platform, and its no problem at all. i dont see why it would
> be any different with this one.
> 
>> On Tue, Sep 19, 2017 at 1:38 PM, Buntu Dev <buntu...@gmail.com> wrote:
>> 
>> Based on the prerequisites mentioned on Github, Confluent platform seems to
>> be required for using KSQL:
>> 
>> https://github.com/confluentinc/ksql/blob/0.1.x/
>> docs/quickstart/quickstart-non-docker.md#non-docker-setup-for-ksql
>> 
>> 
>> Did anyone try KSQL against vanilla Apache Kafka?
>> 
>> 
>> Thanks!
>> 


Re: Flush Kafka topic

2017-08-23 Thread Hans Jespersen
in 0.11 and above see the CLI command bin//kafka-delete-records.sh 

-hans




> On Aug 23, 2017, at 7:28 PM, Rahul Singh <rahulronit1...@gmail.com> wrote:
> 
> Hello all,
> 
> I am unable to purge the topic data from Kafka. Is there any class to flush
> all topic data.
> 
> Thank you



Re: Pinning clients to specific brokers

2017-08-23 Thread Hans Jespersen
We (Confluent) run Kafka as a SaaS-based cloud offering and we do not see any 
reason for this feature so I just don’t understand the motivation for it. 
Please explain.

-hans

-- 
/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */




> On Aug 23, 2017, at 12:42 AM, Mohit Chawla <mohit.chawla.bin...@gmail.com> 
> wrote:
> 
> Hey Hans,
> 
> Thanks for your reply.
> 
> In a cloud environment this can be useful. Perhaps if partitioning and
> replicas are selected appropriately, this could be possible ?
> 
> Thanks,
> Mohit
> 
> On Tuesday, August 22, 2017, Hans Jespersen <h...@confluent.io> wrote:
> 
>> Doing that doesn't really make sense in a Kafka cluster because the topic
>> partitions and their replicas are spread out across many brokers in the
>> cluster. That's what enables the parallel processing and fault tolerance
>> features of Kafka.
>> 
>> -hans
>> 
>>> On Aug 22, 2017, at 3:14 AM, Mohit Chawla <mohit.chawla.bin...@gmail.com
>> <javascript:;>> wrote:
>>> 
>>> Hi folks,
>>> 
>>> Is it possible to pin kafka clients to use only specific brokers
>> throughout
>>> their lifetime and not just for the initial bootstrapping ?
>>> 
>>> Thanks,
>>> Mohit
>> 



Re: Pinning clients to specific brokers

2017-08-22 Thread Hans Jespersen
Doing that doesn't really make sense in a Kafka cluster because the topic 
partitions and their replicas are spread out across many brokers in the 
cluster. That's what enables the parallel processing and fault tolerance 
features of Kafka.

-hans

> On Aug 22, 2017, at 3:14 AM, Mohit Chawla <mohit.chawla.bin...@gmail.com> 
> wrote:
> 
> Hi folks,
> 
> Is it possible to pin kafka clients to use only specific brokers throughout
> their lifetime and not just for the initial bootstrapping ?
> 
> Thanks,
> Mohit


Re: How to clear a particular partition?

2017-08-18 Thread Hans Jespersen
Yes thanks Manikumar! I just tested this and it is indeed all in and working 
great in 0.11! I thought I would have to wait until 1.0 to be able to use and 
recommend this in production.

I published 100 messages 

seq 100 | ./bin/kafka-console-producer.sh --broker-list localhost:9092 
--topic mytest

then deleted 90 of those 100 messages with the new kafka-delete-records.sh 
command line tool

./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 
--offset-json-file ./offsetfile.json

where offsetfile.json contains

{"partitions": [{"topic": “mytest", "partition": 0, "offset": 90}], 
"version":1 }

and then consume the messages from the beginning to verify that 90 of the 100 
messages are indeed deleted.

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--topic mytest --from-beginning
91
92
93
94
95
96
97
98
99
100


-hans




> On Aug 18, 2017, at 10:32 AM, Manikumar <manikumar.re...@gmail.com> wrote:
> 
> This feature got released in Kafka 0.11.0.0. You can
> use kafka-delete-records.sh script to delete data.
> 
> On Sun, Aug 13, 2017 at 11:27 PM, Hans Jespersen <h...@confluent.io> wrote:
> 
>> This is an area that is being worked on. See KIP-107 for details.
>> 
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient <
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 107:+Add+purgeDataBefore()+API+in+AdminClient>
>> 
>> -hans
>> 
>> 
>>> On Aug 10, 2017, at 10:52 AM, Sven Ludwig <s_lud...@gmx.de> wrote:
>>> 
>>> Hello,
>>> 
>>> assume that all producers and consumers regarding a topic-partition have
>> been shutdown.
>>> 
>>> Is it possible in this situation to empty that topic-partition, while
>> the other topic-partitions keep working?
>>> 
>>> Like for example, is it possible to trigger a log truncation to 0 on the
>> leader for that partition using some admin tool?
>>> 
>>> Kind Regards,
>>> Sven
>>> 
>> 
>> 



Re: How to clear a particular partition?

2017-08-13 Thread Hans Jespersen
This is an area that is being worked on. See KIP-107 for details.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-107%3A+Add+purgeDataBefore%28%29+API+in+AdminClient
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-107:+Add+purgeDataBefore()+API+in+AdminClient>

-hans


> On Aug 10, 2017, at 10:52 AM, Sven Ludwig <s_lud...@gmx.de> wrote:
> 
> Hello,
>  
> assume that all producers and consumers regarding a topic-partition have been 
> shutdown.
>  
> Is it possible in this situation to empty that topic-partition, while the 
> other topic-partitions keep working?
>  
> Like for example, is it possible to trigger a log truncation to 0 on the 
> leader for that partition using some admin tool?
>  
> Kind Regards,
> Sven
>  



Re: Adding partitons | Unaffected producers

2017-08-04 Thread Hans Jespersen
See the producer param called metadata.max.age.ms which is "The period of time 
in milliseconds after which we force a refresh of metadata even if we haven't 
seen any partition leadership changes to proactively discover any new brokers 
or partitions."

-hans

> On Aug 4, 2017, at 5:17 AM, Sameer Kumar <sam.kum.w...@gmail.com> wrote:
> 
> According to Kafka docs, producer decides on which partition the data shall
> reside. I am aware that neither broker nor producer needs to be restarted
> to detect added partitions.
> 
> Would like to understand if there is some frequency through which producer
> detects new partitions.
> 
> Though consumers were not made partiton aware, any possible reasons for the
> same.
> 
> -Sameer.


Re: kafka connect

2017-07-15 Thread Hans Jespersen
If you are looking for connectors there is a good curated list of connectors 
here https://www.confluent.io/product/connectors/ 
<https://www.confluent.io/product/connectors/>

I don't see tcp on the list but the general naming scheme for open source 
connectors on github is to call them “kafka-connect-*”. I quick search will 
yield a few “kafka-connect-tcp” connectors like this one 
https://github.com/dhanuka84/kafka-connect-tcp 
<https://github.com/dhanuka84/kafka-connect-tcp>


-hans



> On Jul 4, 2017, at 10:26 AM, Clay Teahouse <clayteaho...@gmail.com> wrote:
> 
> Hello All,
> 
> I have a few questions regarding kafka connect. I'd appreciate your replies.
> 
> 1) Is there a kafka connector for listening to tcp sockets?
> 
> 2) If there a protobuf converter that can deal with variable length
> messages? Meaning read the prefix that specifies the length and use the
> specified length to read the actual message?
> 
> thanks
> Clay



Re: about Exactly-once Semantics

2017-07-02 Thread Hans Jespersen
When you write the msg results to MySQL, you include the offset of the message 
with the results. This can be done in one atomic write transaction. Then if 
your application crashes, when it starts back up, it should read the offset 
stored with the last message results in the database, then seek() to that 
offset, and continue consuming with exactly once semantics.

This is how many of the exactly once Kafka Connect Sink Connectors work today.

-hans

> On Jul 1, 2017, at 11:28 PM, fuyou <fuyou...@gmail.com> wrote:
> 
> I read the great blog about kafka Exactly-once Semantics
> <https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/>
> .
> my question is when consumer receive msg,and process the msg  result save
> to db(eg.MySQL),how to keep Exactly-once Semantics.
> 
> I think the kafka Exactly-once Semantics useful when consumer  process the
> msg result save to kafka.
> 
> am i right ?
> 
> thanks .
> -- 
>   =
> 
>  fuyou001
> Best Regards


Re: Requires suggestions for Producer request throttling

2017-06-29 Thread Hans Jespersen
Request quotas was just added to 0.11. Does that help in your use case?

https://cwiki.apache.org/confluence/display/KAFKA/KIP-124+-+Request+rate+quotas

-hans

> On Jun 29, 2017, at 12:55 AM, sukumar.np <sukumar...@zohocorp.com> wrote:
> 
> Hi Team,
> 
> 
> 
> We are having a Kafka cluster with multiple Topics in it and shared with 
> multiple services(clients). Each service will have multiple events source 
> from where they will be pushing messages to Kafka brokers. Once a service 
> starts producing message at a high rate, it will affect other 
> services(clients) because it will fill the disk quickly which will bring the 
> cluster down. So, we want to throttle Producer request once it crosses the 
> specified threshold(Which can be set on Topic basis, not for each service).
> 
> 
> 
> After checking with Quota feature available in Kafka we found that It allows 
> pushing data to a queue and will keep responses in delay queue(if it requires 
> getting throttled). If we apply quota for our use case then below problems 
> can happen:
> 
> a).  Since quota observes message rate for a window and starts to throttle 
> the producer responses, meanwhile all incoming messages will be added to the 
> queue. It may fill the disks quickly as there are many producers for the same 
> Topics and will create an outage for Kafka service.
> 
> b). For sync Producers, because of throttling response will be delayed, which 
> will result in hang the user-thread or app-servers.
> 
> 
> 
> So we don't want to go for applying quota for our use case. Can you please 
> share some suggestions to handle this use-case in our Kafka broker. Like, 
> before messages get appended to log, it should validate for throttling and if 
> it requires being throttled. Throttling mechanism should be either slow down 
> the request rate up to specified time frame or throw some generic exception 
> from broker side to clients.
> 
> 
> 
> Our Kafka setup like,
> 
> Having 3 brokers in a cluster and each Topic has replication factor 3 and 
> using Kafka-0.10.0.1.
> 
> 
> 
> Looking forward to your suggestions.
> 
> 
> 
> Thanks
> 
> Sukumar N
> 
> 
> 
> 
> 


Re: question about document

2017-06-27 Thread Hans Jespersen
Correct. The use of the word "server" in that sentence is meant as broker (or 
KafkaServer as it shows up in the 'jps' command) not as a physical or virtual 
machine.

-hans

> On Jun 27, 2017, at 1:22 AM, James <896066...@qq.com> wrote:
> 
> Hello,
>At https://kafka.apache.org/intro, I found a sentence:
>Each partition has one server which acts as the "leader" and zero or more 
> servers which act as "followers". 
>In my opinion, it is possible that more than one brokers exist on a 
> machine with different ports and different broker.id, so I think the below is 
> more appropriate:
>  Each partition has one broker which acts as the "leader" and zero or 
> more brokers which act as "followers". 
> Thank you!


Re: help!Kafka failover do not work as expected in Kafka quick start tutorial

2017-06-22 Thread Hans Jespersen
Do you list all three brokers on your consumers bootstrap-server list?

-hans

> On Jun 22, 2017, at 5:15 AM, 夏昀 <kingdomm...@126.com> wrote:
> 
> hello:
> I am trying the quickstart of kafka documentation,link is, 
> https://kafka.apache.org/quickstart. when I moved to Step 6: Setting up a 
> multi-broker cluster,I have deployed 3 kafka broker instance.I killed either 
> server-1 or server-2, everything goes well as the document says. But when I 
> killed the firet broker where brokeID=0, the consumer can't read the new 
> records produced by producer. When I restart the broker 0,consumer can 
> display new messages. why the system work well when broker1 or broker 2 is 
> killed,but can't work when broker 0 is killed?
> can you explain this for me, thank you very much!


Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL Permission of OffsetFetch

2017-06-17 Thread Hans Jespersen

Offset commit is something that is done in the act of consuming (or reading) 
Kafka messages. 
Yes technically it is a write to the Kafka consumer offset topic but it's much 
easier for 
administers to think of ACLs in terms of whether the user is allowed to write 
(Produce) or 
read (Consume) messages and not the lower level semantics that are that 
consuming is actually
reading AND writing (albeit only to the offset topic).

-hans




> On Jun 17, 2017, at 10:59 AM, Viktor Somogyi <viktor.somo...@cloudera.com> 
> wrote:
> 
> Hi Vahid,
> 
> +1 for OffsetFetch from me too.
> 
> I also wanted to ask the strangeness of the permissions, like why is
> OffsetCommit a Read operation instead of Write which would intuitively make
> more sense to me. Perhaps any expert could shed some light on this? :)
> 
> Viktor
> 
> On Tue, Jun 13, 2017 at 2:38 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com <mailto:vahidhashem...@us.ibm.com>> wrote:
> 
>> Hi Michal,
>> 
>> Thanks a lot for your feedback.
>> 
>> Your statement about Heartbeat is fair and makes sense. I'll update the
>> KIP accordingly.
>> 
>> --Vahid
>> 
>> 
>> 
>> 
>> From:Michal Borowiecki <michal.borowie...@openbet.com>
>> To:users@kafka.apache.org, Vahid S Hashemian <
>> vahidhashem...@us.ibm.com>, d...@kafka.apache.org
>> Date:06/13/2017 01:35 AM
>> Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
>> Permission of OffsetFetch
>> --
>> 
>> 
>> 
>> Hi Vahid,
>> 
>> +1 wrt OffsetFetch.
>> 
>> The "Additional Food for Thought" mentions Heartbeat as a non-mutating
>> action. I don't think that's true as the GroupCoordinator updates the
>> latestHeartbeat field for the member and adds a new object to the
>> heartbeatPurgatory, see completeAndScheduleNextHeartbeatExpiration()
>> called from handleHeartbeat()
>> 
>> NB added dev mailing list back into CC as it seems to have been lost along
>> the way.
>> 
>> Cheers,
>> 
>> Michał
>> 
>> 
>> On 12/06/17 18:47, Vahid S Hashemian wrote:
>> Hi Colin,
>> 
>> Thanks for the feedback.
>> 
>> To be honest, I'm not sure either why Read was selected instead of Write
>> for mutating APIs in the initial design (I asked Ewen on the corresponding
>> JIRA and he seemed unsure too).
>> Perhaps someone who was involved in the design can clarify.
>> 
>> Thanks.
>> --Vahid
>> 
>> 
>> 
>> 
>> From:   Colin McCabe *<cmcc...@apache.org <mailto:cmcc...@apache.org>>* 
>> <cmcc...@apache.org <mailto:cmcc...@apache.org>>
>> To: *users@kafka.apache.org <mailto:users@kafka.apache.org>* 
>> <users@kafka.apache.org <mailto:users@kafka.apache.org>>
>> Date:   06/12/2017 10:11 AM
>> Subject:Re: [DISCUSS] KIP-163: Lower the Minimum Required ACL
>> Permission of OffsetFetch
>> 
>> 
>> 
>> Hi Vahid,
>> 
>> I think you make a valid point that the ACLs controlling group
>> operations are not very intuitive.
>> 
>> This is probably a dumb question, but why are we using Read for mutating
>> APIs?  Shouldn't that be Write?
>> 
>> The distinction between Describe and Read makes a lot of sense for
>> Topics.  A group isn't really something that you "read" from in the same
>> way as a topic, so it always felt kind of weird there.
>> 
>> best,
>> Colin
>> 
>> 
>> On Thu, Jun 8, 2017, at 11:29, Vahid S Hashemian wrote:
>> 
>> Hi all,
>> 
>> I'm resending my earlier note hoping it would spark some conversation
>> this
>> time around :)
>> 
>> Thanks.
>> --Vahid
>> 
>> 
>> 
>> 
>> From:   "Vahid S Hashemian" *<vahidhashem...@us.ibm.com 
>> <mailto:vahidhashem...@us.ibm.com>>*
>> <vahidhashem...@us.ibm.com <mailto:vahidhashem...@us.ibm.com>>
>> To: dev *<d...@kafka.apache.org <mailto:d...@kafka.apache.org>>* 
>> <d...@kafka.apache.org <mailto:d...@kafka.apache.org>>, "Kafka User"
>> 
>> *<users@kafka.apache.org <mailto:users@kafka.apache.org>>* 
>> <users@kafka.apache.org <mailto:users@kafka.apache.org>>
>> 
>> Date:   05/30/2017 08:33 AM
>> Subject:KIP-163: Lower the Minimum Required ACL Permission of
>> OffsetFetch
>> 
>> 
>> 
>> Hi,
>> 
>> I started a new KIP

Re: Async Non Blocking Kafka Producer

2017-06-07 Thread Hans Jespersen
If you are setting acks=0 then you don't care about losing data even when the 
cluster is up. The only way to get at-least-once is acks=all.

> On Jun 7, 2017, at 1:12 PM, Ankit Jain <ankitjainc...@gmail.com> wrote:
> 
> Thanks hans.
> 
> It would work but producer will start loosing the data even the Cluster is
> available.
> 
> Thanks
> Ankit Jain
> 
>> On Wed, Jun 7, 2017 at 12:56 PM, Hans Jespersen <h...@confluent.io> wrote:
>> 
>> Try adding props.put("max.block.ms", "0");
>> 
>> -hans
>> 
>> 
>> 
>>> On Jun 7, 2017, at 12:24 PM, Ankit Jain <ankitjainc...@gmail.com> wrote:
>>> 
>>> Hi,
>>> 
>>> We want to use the non blocking Kafka producer. The producer thread
>> should
>>> not block if the Kafka is cluster is down or not reachable.
>>> 
>>> Currently, we are setting following properties but the Producer thread is
>>> still blocking if the Kafka cluster goes gown or unreachable.
>>> 
>>> * props.put("block.on.buffer.full", "false");*
>>> * props.put("acks", "0");*
>>> 
>>> --
>>> Thanks
>> 
>> 
> 
> 
> -- 
> Thanks,
> Ankit Jain


Re: Async Non Blocking Kafka Producer

2017-06-07 Thread Hans Jespersen
Try adding props.put("max.block.ms", "0");

-hans



> On Jun 7, 2017, at 12:24 PM, Ankit Jain <ankitjainc...@gmail.com> wrote:
> 
> Hi,
> 
> We want to use the non blocking Kafka producer. The producer thread should
> not block if the Kafka is cluster is down or not reachable.
> 
> Currently, we are setting following properties but the Producer thread is
> still blocking if the Kafka cluster goes gown or unreachable.
> 
> * props.put("block.on.buffer.full", "false");*
> * props.put("acks", "0");*
> 
> -- 
> Thanks



Re: Data in kafka topic in Json format

2017-06-02 Thread Hans Jespersen
You have shared the Kafka connect properties but not the source connector 
config.
Which source connector are you using? Does it override the default settings you 
provided?
Are you running the connector in standalone mode or distributed mode?
Also what are you using to consume the messages and see the message format?

-hans



> On Jun 2, 2017, at 9:10 AM, Mina Aslani <aslanim...@gmail.com> wrote:
> 
> Hi Hans,
> 
> Thank you for your quick response, appreciate it.
> 
> In *kafka-connect* docker, I see below settings in
> *kafka-connect.properties* file in *kafka-connect *directory:
> 
> key.converter.schemas.enable=false
> key.converter.schema.registry.url=http://kafka-schema-registry:
> value.converter.schema.registry.url=http://kafka-schema-registry:
> 
> key.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter=org.apache.kafka.connect.json.JsonConverter
> 
> internal.value.converter.schemas.enable=false
> rest.advertised.host.name=kafka-connect
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> And the settings in *schema-registry *directory of *kafka-connect *docker
> are as
> 
> https://github.com/confluentinc/schema-registry/tree/master/config
> 
> Should I consider any other settings for *kafka-connect* or
> *schema-registry* to get the real json object NOT string
> formatted/stringified json which has extra "\"  and is not json any more?
> 
> Best regards,
> Mina
> 
> On Fri, Jun 2, 2017 at 11:18 AM, Hans Jespersen <h...@confluent.io> wrote:
> 
>> 
>> My earlier comment still applies but in Kafka Connect the equivalent of a
>> serializer/deserializer (serdes) is called a “converter”.
>> Check which converter you have configured for your source connector and if
>> it is overriding whatever the default converter is configured for the
>> connect worker it is running in.
>> 
>> -hans
>> 
>> 
>> 
>> 
>>> On Jun 2, 2017, at 8:12 AM, Mina Aslani <aslanim...@gmail.com> wrote:
>>> 
>>> Hi,
>>> 
>>> I would like to add that I use kafka-connect and schema-registery
>> version `
>>> 3.2.1-6`.
>>> 
>>> Best regards,
>>> Mina
>>> 
>>> On Fri, Jun 2, 2017 at 10:59 AM, Mina Aslani <aslanim...@gmail.com>
>> wrote:
>>> 
>>>> Hi.
>>>> 
>>>> Is there any way that I get the data into a Kafka topic in Json format?
>>>> The source that I ingest the data from have the data in Json format,
>>>> however when I look that data in the kafka topic, schema and payload
>> fields
>>>> are added and data is not in json format.
>>>> 
>>>> I want to avoid implementing a transformer to get the data from the
>> topic
>>>> and publishes Json in another one.
>>>> 
>>>> Your input is appreciated.
>>>> 
>>>> Best regards,
>>>> Mina
>>>> 
>> 
>> 



Re: Data in kafka topic in Json format

2017-06-02 Thread Hans Jespersen

My earlier comment still applies but in Kafka Connect the equivalent of a 
serializer/deserializer (serdes) is called a “converter”.
Check which converter you have configured for your source connector and if it 
is overriding whatever the default converter is configured for the connect 
worker it is running in.

-hans




> On Jun 2, 2017, at 8:12 AM, Mina Aslani <aslanim...@gmail.com> wrote:
> 
> Hi,
> 
> I would like to add that I use kafka-connect and schema-registery version `
> 3.2.1-6`.
> 
> Best regards,
> Mina
> 
> On Fri, Jun 2, 2017 at 10:59 AM, Mina Aslani <aslanim...@gmail.com> wrote:
> 
>> Hi.
>> 
>> Is there any way that I get the data into a Kafka topic in Json format?
>> The source that I ingest the data from have the data in Json format,
>> however when I look that data in the kafka topic, schema and payload fields
>> are added and data is not in json format.
>> 
>> I want to avoid implementing a transformer to get the data from the topic
>> and publishes Json in another one.
>> 
>> Your input is appreciated.
>> 
>> Best regards,
>> Mina
>> 



Re: Data in kafka topic in Json format

2017-06-02 Thread Hans Jespersen

Check which serializer you have configured in your producer. You are probably 
using an Avro serializer which will add the schema and modify the payload to 
avro data. You can use a String serializer or a ByteArray serializer and the 
data will either be Base64 encoded or not encoded at all.

-hans



> On Jun 2, 2017, at 7:59 AM, Mina Aslani <aslanim...@gmail.com> wrote:
> 
> Hi.
> 
> Is there any way that I get the data into a Kafka topic in Json format?
> The source that I ingest the data from have the data in Json format,
> however when I look that data in the kafka topic, schema and payload fields
> are added and data is not in json format.
> 
> I want to avoid implementing a transformer to get the data from the topic
> and publishes Json in another one.
> 
> Your input is appreciated.
> 
> Best regards,
> Mina



Re: Java APIs for ZooKeeper related operations

2017-05-30 Thread Hans Jespersen
Target is sometime in June. Apache Kafka releases are every 4 months so 
February, June, and October of each year

-hans


> On May 30, 2017, at 3:58 PM, Raghav <raghavas...@gmail.com> wrote:
> 
> Hans
> 
> When will this version (0.11) be available ?
> 
> On Tue, May 30, 2017 at 3:54 PM, Hans Jespersen <h...@confluent.io 
> <mailto:h...@confluent.io>> wrote:
> Probably important to read and understand these enhancements coming in 0.11
> 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations
>  
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations>
> 
> -hans
> 
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * h...@confluent.io <mailto:h...@confluent.io> (650)924-2670 
> <tel:%28650%29924-2670>
>  */
> 
> On Tue, May 30, 2017 at 3:50 PM, Mohammed Manna <manme...@gmail.com 
> <mailto:manme...@gmail.com>> wrote:
> 
> > 1) For issue no. 1 I think you might find the AdminUtils useful This link
> > <https://stackoverflow.com/questions/16946778/how-can-we- 
> > <https://stackoverflow.com/questions/16946778/how-can-we->
> > create-a-topic-in-kafka-from-the-ide-using-api>
> > should
> > help you understand.
> >
> > I haven't got around using ACL for Kafka yet (as I am still doing PoC
> > myself) - so probably some other power user can chime in?
> >
> > KR,
> >
> > On 30 May 2017 at 23:35, Raghav <raghavas...@gmail.com 
> > <mailto:raghavas...@gmail.com>> wrote:
> >
> > > Hi
> > >
> > > I want to know if there are Java APIs for the following. I want to be
> > able
> > > to do these things programmatically in our Kafka cluster. Using command
> > > line tools seems a bit hacky. Please advise the right way to do, and any
> > > pointers to Library in Java, Python or Go.
> > >
> > > 1. Creating topic with a given replication and partition.
> > > 2. Push ACLs into Kafka Cluster
> > > 3. Get existing ACL info from Kafka Cluster
> > >
> > > Thanks.
> > >
> > > Raghav
> > >
> >
> 
> 
> 
> -- 
> Raghav



Re: Java APIs for ZooKeeper related operations

2017-05-30 Thread Hans Jespersen
Probably important to read and understand these enhancements coming in 0.11

https://cwiki.apache.org/confluence/display/KAFKA/KIP-117%3A+Add+a+public+AdminClient+API+for+Kafka+admin+operations

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Tue, May 30, 2017 at 3:50 PM, Mohammed Manna <manme...@gmail.com> wrote:

> 1) For issue no. 1 I think you might find the AdminUtils useful This link
> <https://stackoverflow.com/questions/16946778/how-can-we-
> create-a-topic-in-kafka-from-the-ide-using-api>
> should
> help you understand.
>
> I haven't got around using ACL for Kafka yet (as I am still doing PoC
> myself) - so probably some other power user can chime in?
>
> KR,
>
> On 30 May 2017 at 23:35, Raghav <raghavas...@gmail.com> wrote:
>
> > Hi
> >
> > I want to know if there are Java APIs for the following. I want to be
> able
> > to do these things programmatically in our Kafka cluster. Using command
> > line tools seems a bit hacky. Please advise the right way to do, and any
> > pointers to Library in Java, Python or Go.
> >
> > 1. Creating topic with a given replication and partition.
> > 2. Push ACLs into Kafka Cluster
> > 3. Get existing ACL info from Kafka Cluster
> >
> > Thanks.
> >
> > Raghav
> >
>


Re: Trouble with querying offsets when using new consumer groups API

2017-05-30 Thread Hans Jespersen
I can confirm that in 0.10.2.1 I get offset information for disconnected 
consumers. The note in the output is a bit misleading because it also works 
with non-Java clients as long as they implement the new consumer. For example 
below is what I get when using the blizzard/node-rdkafka client which wraps 
librdkafka. 

[bin] $ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe 
--group node-red-rdkafka-groupid
Note: This will only show information about consumers that use the Java 
consumer API (non-ZooKeeper-based consumers).


TOPIC  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   
 CONSUMER-ID   HOST 
  CLIENT-ID
mytopic0  4   4   0 
 -0b713303-e69e-47d9-baf7-70df49ff48a6 /10.20.0.1   



-hans


> On May 30, 2017, at 10:57 AM, Jerry George <jerr...@gmail.com> wrote:
> 
> Thank you Hans and Vahid.
> 
> That was definitely of great help. Much appreciated!
> 
> Regards,
> Jerry
> 
> On Tue, May 30, 2017 at 1:53 PM, Vahid S Hashemian <
> vahidhashem...@us.ibm.com> wrote:
> 
>> Hi Jerry,
>> 
>> The behavior you are expecting is implemented in 0.10.2 through KIP-88 (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> 88%3A+OffsetFetch+Protocol+Update
>> ) and KAFKA-3853 (https://issues.apache.org/jira/browse/KAFKA-3853).
>> Starting from this release when you query a consumer group (new consumer
>> only) you'll see the stored offset corresponding to a topic partition even
>> if there is no active consumer consuming from it.
>> 
>> I hope this helps.
>> --Vahid
>> 
>> 
>> 
>> 
>> From:   Jerry George <jerr...@gmail.com>
>> To: users@kafka.apache.org
>> Date:   05/26/2017 06:55 AM
>> Subject:Trouble with querying offsets when using new consumer
>> groups API
>> 
>> 
>> 
>> Hi
>> 
>> I had question about the new consumer APIs.
>> 
>> I am having trouble retrieving the offsets once the consumers are
>> *disconnected* when using new consumer v2 API. Following is what I am
>> trying to do,
>> 
>> *bin/kafka-consumer-groups.sh -new-consumer --bootstrap-server kafka:9092
>> --group group --describe*
>> 
>> If I query this when the consumers are connected, there is no problem.
>> However, once the consumers are disconnected it says there is no such
>> group, though the offsets are retained in __consumer_offsets.
>> 
>> The offset retention policy is default; i.e. 1440 minutes, I believe.
>> 
>> Once the consumers are reconnected, I am able to query the offsets once
>> again.
>> 
>> Could anyone here please help me understand why this is?
>> 
>> Kafka: 0.10.1
>> Consumer Library: sarama golang library
>> 
>> Regards,
>> Jerry
>> 
>> 
>> 
>> 
>> 



Re: Trouble with querying offsets when using new consumer groups API

2017-05-30 Thread Hans Jespersen
It is definitely expected behavior that the new consumer version of 
kafka-consumer-groups.sh —describe only returns metadata for ‘active’ members. 
It will print an error message if the consumer group you provide has no active 
members.

https://github.com/confluentinc/kafka/blob/trunk/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L88
 
<https://github.com/confluentinc/kafka/blob/trunk/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala#L88>

As to the reason for this I am not certain. My guess is that since offsets are 
now stored in a partitioned kafka topic (i.e. _offsets) and uses a key that 
includes the topic+partition+consumerID then it was deemed too problematic or 
resource intensive to support queries for inactive consumers because the tool 
would have to read every message in every partition of the _offset topic in 
order to find a complete list of the offsets for a given inactive consumer. 

-hans


> On May 30, 2017, at 8:09 AM, Jerry George <jerr...@gmail.com> wrote:
> 
> Hi Abhimanyu,
> 
> No, actually waiting for someone with operational experience to reply on
> the list. Thank you for bumping the question though :)
> 
> If anyone in the list has experience increasing the retention or if this is
> expected behaviour, could kindly suggest an alternative?
> 
> 
> Regards,
> Jerry
> 
> On Sat, May 27, 2017 at 8:18 AM, Abhimanyu Nagrath <
> abhimanyunagr...@gmail.com> wrote:
> 
>> Hi Jerry,
>> 
>> I am also facing the same issue. Did you found the solution?
>> 
>> Regards,
>> Abhimanyu
>> 
>> On Fri, May 26, 2017 at 7:24 PM, Jerry George <jerr...@gmail.com> wrote:
>> 
>>> Hi
>>> 
>>> I had question about the new consumer APIs.
>>> 
>>> I am having trouble retrieving the offsets once the consumers are
>>> *disconnected* when using new consumer v2 API. Following is what I am
>>> trying to do,
>>> 
>>> *bin/kafka-consumer-groups.sh -new-consumer --bootstrap-server kafka:9092
>>> --group group --describe*
>>> 
>>> If I query this when the consumers are connected, there is no problem.
>>> However, once the consumers are disconnected it says there is no such
>>> group, though the offsets are retained in __consumer_offsets.
>>> 
>>> The offset retention policy is default; i.e. 1440 minutes, I believe.
>>> 
>>> Once the consumers are reconnected, I am able to query the offsets once
>>> again.
>>> 
>>> Could anyone here please help me understand why this is?
>>> 
>>> Kafka: 0.10.1
>>> Consumer Library: sarama golang library
>>> 
>>> Regards,
>>> Jerry
>>> 
>> 



Re: [E] Re: Kafka Configuration Question

2017-05-29 Thread Hans Jespersen
Not sure why you would need 7 zookeepers, 3 or 5 is more common but since you 
have setup 7 then zookeeper will not be active unless a “quorum” of the 
zookeepers are up and running. That means you need to have 4 or more zookeepers 
up and running first, and then start your kafka brokers.

Also, you only included one server.properties file. You will need to give each 
of the 7 brokers each a unique broker.id and host.name. In fact host.name is 
deprecated and it looks like you might be incorrectly setting them all to 
“0.0.0.0” when you should instead be setting listeners to 
PLAINTEXT://0.0.0.0:9092  to get the broker to listen 
to all interfaces. 

From https://kafka.apache.org/documentation/ 
<https://kafka.apache.org/documentation/>
host.name   DEPRECATED: only used when `listeners` is not set. Use 
`listeners` instead. hostname of broker. If this is set, it will only bind to 
this address. If this is not set, it will bind to all interfaces
listeners   Listener List - Comma-separated list of URIs we will listen on 
and the listener names. If the listener name is not a security protocol, 
listener.security.protocol.map must also be set. Specify hostname as 0.0.0.0 to 
bind to all interfaces. Leave hostname empty to bind to default interface. 
Examples of legal listener lists: PLAINTEXT://myhost:9092,SSL://:9091 
CLIENT://0.0.0.0:9092,REPLICATION://localhost:
Lastly you have set 
advertised.listeners=PLAINTEXT://kafka-1.site-s2.mon.com:9092 
 which would override the value of 
listeners and host.name and force the broker to only listen on port 9092 on the 
IP address for kafka-1.site-s2.mon.com <http://kafka-1.site-s2.mon.com/>. Is 
this what you really want to do?

advertised.listenersListeners to publish to ZooKeeper for clients to use, 
if different than the listeners above. In IaaS environments, this may need to 
be different from the interface to which the broker binds. If this is not set, 
the value for `listeners` will be used.

If you set all this up properly and still get an error then please include the 
errors from both zookeeper and kafka brokers to help in debugging further

-hans

-- 
/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */




> On May 29, 2017, at 1:33 AM, Bennett, Conrad 
> <conrad.benn...@verizonwireless.com.INVALID> wrote:
> 
> Hello – anyone had a chance to take a look at this for me please? My cluster 
> still isn’t up successfully. Thanks in advance 
> Conrad Bennett Jr.
> From: Conrad Bennett <conrad.benn...@verizonwireless.com 
> <mailto:conrad.benn...@verizonwireless.com>>
> Date: Friday, May 26, 2017 at 12:48 PM
> To: SenthilKumar K <senthilec...@gmail.com <mailto:senthilec...@gmail.com>>
> Subject: Re: [E] Re: Kafka Configuration Question
> 
> Hi SenthilKumar,
> 
> Did you get a chance to review the configuration? I am still unsuccessful 
> with getting the cluster up. 
> Conrad Bennett Jr.
> From: Conrad Bennett <conrad.benn...@verizonwireless.com 
> <mailto:conrad.benn...@verizonwireless.com>>
> Date: Thursday, May 25, 2017 at 10:16 AM
> To: SenthilKumar K <senthilec...@gmail.com <mailto:senthilec...@gmail.com>>
> Subject: Re: [E] Re: Kafka Configuration Question
> 
> Here is the attachment. 
> Conrad Bennett Jr.
> 
> From: SenthilKumar K <senthilec...@gmail.com <mailto:senthilec...@gmail.com>>
> Date: Thursday, May 25, 2017 at 3:09 AM
> To: Conrad Bennett <conrad.benn...@verizonwireless.com 
> <mailto:conrad.benn...@verizonwireless.com>>
> Subject: [E] Re: Kafka Configuration Question
> 
> Hi , Seems to be you missed attachment ..
> 
> On Thu, May 25, 2017 at 10:55 AM, Bennett, Conrad 
> <conrad.benn...@verizonwireless.com.invalid 
> <mailto:conrad.benn...@verizonwireless.com.invalid>> wrote:
>> Hello,
>> 
>> I’m hoping someone could provide me with some assistance please. I am in the 
>> process of attempting to standing up a Kafka cluster and I have 7 nodes all 
>> of which has kafka and zookeeper installed. I have attached my 
>> server.properties file to verify whether or not I have anything 
>> misconfigured but each time I try to start the kafka service it fails with 
>> the error timed out connecting to zookeeper but the zookeeper process is up 
>> and running. Also during my research I read in order to achieve better 
>> performance separate drives for kafka data should be configure, but in the 
>> configuration file I didn’t understand where exactly that should be 
>> configure. Any assistance would be greatly appreciated. Thanks in advance 
>> 
>> kafka: { version: 0.10.1.1 }
>> 
>> zkper: { version: 3.4.9 }
>> 
>> Conrad Bennett Jr.
> 
> 



Re: Producer Async Issue

2017-05-27 Thread Hans Jespersen
So you just want to not block and just silently throw away the messages and 
lose them forever? Kafka's persistence is all in the broker so there is no 
client side storing of data on disk. The client will send in async mode until 
the client memory buffer is full. Only then does it block, and this is by 
design because then it's up to your app to decide to either throw the messages 
away, stop publishing, or store them somewhere outside of Kafka.

The easiest solution would be to run more than one broker so that they are 
fault tolerant and will take over for any failed broker nodes.

-hans

> On May 27, 2017, at 12:40 PM, Abhimanyu Nagrath <abhimanyunagr...@gmail.com> 
> wrote:
> 
> HI Hans,
> 
> What exactly I meant by asynchronous is that when my Kafka broker is down
> and I am trying to produce the message . It is getting stuck till the
> configured max.block.ms and after that further code is executed. What I am
> looking for is that whether the broker is down or not it should not get
> stuck.
> 
> 
> 
> Regards,
> Abhimanyu
> 
>> On Sat, May 27, 2017 at 10:30 PM, Hans Jespersen <h...@confluent.io> wrote:
>> 
>> The producer is asynchronous (assuming you mean the Java Producer)
>> 
>> https://kafka.apache.org/0102/javadoc/index.html?org/apache/
>> kafka/clients/producer/KafkaProducer.html
>> 
>> -hans
>> 
>>> On May 27, 2017, at 5:15 AM, Abhimanyu Nagrath <
>> abhimanyunagr...@gmail.com> wrote:
>>> 
>>> Hi,
>>> I am using Kafka 0.10.2 single node cluster and I want to know
>>> Kafka producer completely asynchronous. So what configuration I need to
>>> change in order to make producer completely asynchronous.
>>> 
>>> 
>>> 
>>> Regards,
>>> Abhimanyu
>> 


Re: Producer Async Issue

2017-05-27 Thread Hans Jespersen
The producer is asynchronous (assuming you mean the Java Producer)

https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

-hans

> On May 27, 2017, at 5:15 AM, Abhimanyu Nagrath <abhimanyunagr...@gmail.com> 
> wrote:
> 
> Hi,
> I am using Kafka 0.10.2 single node cluster and I want to know
> Kafka producer completely asynchronous. So what configuration I need to
> change in order to make producer completely asynchronous.
> 
> 
> 
> Regards,
> Abhimanyu


Re: 0.10.0.0 cluster : segments getting latest ts

2017-05-25 Thread Hans Jespersen

If the last message (or all the messages) in the earliest segment has no 
timestamp it will use the filesystem timestamp for expiring.
Since the timestamps on your 3 brokers got reset then it will be 
log.retention.hours=24 (1 day) before these segments can be deleted (unless you 
reset the file timestamp back to something over a day ago).
Even though later segments have timestamps in the messages they cannot be 
expired until all the earlier segments are deleted so they are stuck waiting 
for 24 hours as well.

The latest distribution of Kafka is 0.10.2.1 so if you can, you should also 
probably upgrade to a newer version but that is a separate discussion.

-hans




> On May 25, 2017, at 11:50 AM, Milind Vaidya <kava...@gmail.com> wrote:
> 
> In  short it should work regardless as per "During the migration phase, if
> the first message in a segment does not have a timestamp, the log rolling
> will still be based on the (current time - create time of the segment)."
> 
> But that is not happening This is also for 3 out of 6 brokers.
> The 3 good ones deleted the data properly but these 3 do not show the same
> behaviour.
> 
> I came across this JIRA : https://issues.apache.org/jira/browse/KAFKA-3802 
> <https://issues.apache.org/jira/browse/KAFKA-3802>
> 
> It says it is fixed in next version 0.10.0.1
> <https://issues.apache.org/jira/browse/KAFKA/fixforversion/12334962 
> <https://issues.apache.org/jira/browse/KAFKA/fixforversion/12334962>>. I
> even tried that. On QA hosts it retains TS for .log files across restart.
> But when tried the new version on one of the prod host, same old story.
> 
> So internal or File system ts, it should get deleted when expired. What
> could be other reason and way out ot  this ?
> 
> On Thu, May 25, 2017 at 10:43 AM, Hans Jespersen <h...@confluent.io 
> <mailto:h...@confluent.io>> wrote:
> 
>> I quoted the wrong paragraph in my earlier response. The same KIP has a
>> section on log retention as well.
>> 
>> "Enforce time based log retention
>> 
>> To enforce time based log retention, the broker will check from the oldest
>> segment forward to the latest segment. For each segment, the broker checks
>> the last time index entry of a log segment. The timestamp will be the
>> latest timestamp of the messages in the log segment. So if that timestamp
>> expires, the broker will delete the log segment. The broker will stop at
>> the first segment which is not expired. i.e. the broker will not expire a
>> segment even if it is expired, unless all the older segment has been
>> expired."
>> 
>> If none of the messages in a segment has a timestamp, last modified time
>> will be used.
>> 
>> -hans
>> 
>> /**
>> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> * h...@confluent.io (650)924-2670
>> */
>> 
>> On Thu, May 25, 2017 at 9:53 AM, Hans Jespersen <h...@confluent.io> wrote:
>> 
>>> 0.10.x format messages have timestamps within them so retention and
>>> expiring of messages isn't entirely based on the filesystem timestamp of
>>> the log segments anymore.
>>> 
>>> From KIP-33 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 33+-+Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-
>>> Enforcetimebasedlogrolling
>>> 
>>> "Enforce time based log rolling
>>> 
>>> Currently time based log rolling is based on the creating time of the log
>>> segment. With this KIP, the time based rolling would be changed to only
>>> based on the message timestamp. More specifically, if the first message
>> in
>>> the log segment has a timestamp, A new log segment will be rolled out if
>>> timestamp in the message about to be appended is greater than the
>> timestamp
>>> of the first message in the segment + log.roll.ms. When
>>> message.timestamp.type=CreateTime, user should set
>>> max.message.time.difference.ms appropriately together with log.roll.ms
>> to
>>> avoid frequent log segment roll out.
>>> 
>>> During the migration phase, if the first message in a segment does not
>>> have a timestamp, the log rolling will still be based on the (current
>> time
>>> - create time of the segment)."
>>> 
>>> -hans
>>> 
>>> /**
>>> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>>> * h...@confluent.io <mailto:h...@confluent.io> (650)924-2670 
>>> <(650)%20924-2670>
>>> */
>>> 
>>> On Thu, May 25, 2017 at 12:44 AM, Milind Vaidya <kava...@

Re: 0.10.0.0 cluster : segments getting latest ts

2017-05-25 Thread Hans Jespersen
I quoted the wrong paragraph in my earlier response. The same KIP has a
section on log retention as well.

"Enforce time based log retention

To enforce time based log retention, the broker will check from the oldest
segment forward to the latest segment. For each segment, the broker checks
the last time index entry of a log segment. The timestamp will be the
latest timestamp of the messages in the log segment. So if that timestamp
expires, the broker will delete the log segment. The broker will stop at
the first segment which is not expired. i.e. the broker will not expire a
segment even if it is expired, unless all the older segment has been
expired."

If none of the messages in a segment has a timestamp, last modified time
will be used.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Thu, May 25, 2017 at 9:53 AM, Hans Jespersen <h...@confluent.io> wrote:

> 0.10.x format messages have timestamps within them so retention and
> expiring of messages isn't entirely based on the filesystem timestamp of
> the log segments anymore.
>
> From KIP-33 - https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 33+-+Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-
> Enforcetimebasedlogrolling
>
> "Enforce time based log rolling
>
> Currently time based log rolling is based on the creating time of the log
> segment. With this KIP, the time based rolling would be changed to only
> based on the message timestamp. More specifically, if the first message in
> the log segment has a timestamp, A new log segment will be rolled out if
> timestamp in the message about to be appended is greater than the timestamp
> of the first message in the segment + log.roll.ms. When
> message.timestamp.type=CreateTime, user should set
> max.message.time.difference.ms appropriately together with log.roll.ms to
> avoid frequent log segment roll out.
>
> During the migration phase, if the first message in a segment does not
> have a timestamp, the log rolling will still be based on the (current time
> - create time of the segment)."
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * h...@confluent.io (650)924-2670 <(650)%20924-2670>
>  */
>
> On Thu, May 25, 2017 at 12:44 AM, Milind Vaidya <kava...@gmail.com> wrote:
>
>> I have 6 broker cluster.
>>
>> I upgraded it from 0.8.1.1 to 0.10.0.0.
>>
>> Kafka Producer to cluster to consumer (apache storm) upgrade went smooth
>> without any errors.
>> Initially keeping protocol to 0.8 and after clients were upgraded it was
>> promoted to 0.10.
>>
>> Out of 6 brokers, 3 are honouring  log.retention.hours. For other 3 when
>> broker is restarted the time stamp for segment changes to current time.
>> That leads to segments not getting deleted hence disk gets full.
>>
>> du -khc /disk1/kafka-broker/topic-1
>>
>> 71G /disk1/kafka-broker/topic-1
>>
>> 71G total
>>
>> Latest segment timestamp : May 25 07:34
>>
>> Oldest segment timestamp : May 25 07:16
>>
>>
>> It is impossible that 71 GB data was collected in mere 15 mins of
>> time. The log.retention.hours=24
>> and this is not new broker so oldest data should be around 24 hrs old.
>>
>> As mentioned above only 3 out of 6 are showing same behaviour.  Why is
>> this
>> happening ?
>>
>
>


Re: 0.10.0.0 cluster : segments getting latest ts

2017-05-25 Thread Hans Jespersen
0.10.x format messages have timestamps within them so retention and
expiring of messages isn't entirely based on the filesystem timestamp of
the log segments anymore.

>From KIP-33 -
https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-Enforcetimebasedlogrolling

"Enforce time based log rolling

Currently time based log rolling is based on the creating time of the log
segment. With this KIP, the time based rolling would be changed to only
based on the message timestamp. More specifically, if the first message in
the log segment has a timestamp, A new log segment will be rolled out if
timestamp in the message about to be appended is greater than the timestamp
of the first message in the segment + log.roll.ms. When
message.timestamp.type=CreateTime, user should set
max.message.time.difference.ms appropriately together with log.roll.ms to
avoid frequent log segment roll out.

During the migration phase, if the first message in a segment does not have
a timestamp, the log rolling will still be based on the (current time -
create time of the segment)."

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Thu, May 25, 2017 at 12:44 AM, Milind Vaidya <kava...@gmail.com> wrote:

> I have 6 broker cluster.
>
> I upgraded it from 0.8.1.1 to 0.10.0.0.
>
> Kafka Producer to cluster to consumer (apache storm) upgrade went smooth
> without any errors.
> Initially keeping protocol to 0.8 and after clients were upgraded it was
> promoted to 0.10.
>
> Out of 6 brokers, 3 are honouring  log.retention.hours. For other 3 when
> broker is restarted the time stamp for segment changes to current time.
> That leads to segments not getting deleted hence disk gets full.
>
> du -khc /disk1/kafka-broker/topic-1
>
> 71G /disk1/kafka-broker/topic-1
>
> 71G total
>
> Latest segment timestamp : May 25 07:34
>
> Oldest segment timestamp : May 25 07:16
>
>
> It is impossible that 71 GB data was collected in mere 15 mins of
> time. The log.retention.hours=24
> and this is not new broker so oldest data should be around 24 hrs old.
>
> As mentioned above only 3 out of 6 are showing same behaviour.  Why is this
> happening ?
>


Re: Kafka Read Data from All Partition Using Key or Timestamp

2017-05-25 Thread Hans Jespersen
The timeindex was added in 0.10 so I think you need to use the new Consumer API 
to access this functionality. Specifically you should call offsetsForTimes()

https://kafka.apache.org/0102/javadoc/org/apache/kafka/clients/consumer/Consumer.html#offsetsForTimes(java.util.Map)

-hans

> On May 25, 2017, at 6:39 AM, SenthilKumar K <senthilec...@gmail.com> wrote:
> 
> I did an experiment on searching messages using timestamps ..
> 
> Step 1: Used Producer with Create Time ( CT )
> Step 2 : Verify whether it reflects in Kafka or not
>  .index  .log
>  .timeindex
>These three files in disk and seems to be time_index working .
> 
> Step 3: Let's look into data
>offset: 121 position: 149556 *CreateTime*: 1495718896912 isvalid:
> true payloadsize: 1194 magic: 1 compresscodec: NONE crc: 1053048980
> keysize: 8
> 
>  Looks good ..
> Step 4 :  Check .timeindex file .
>  timestamp: 1495718846912 offset: 116
>  timestamp: 1495718886912 offset: 120
>  timestamp: 1495718926912 offset: 124
>  timestamp: 1495718966912 offset: 128
> 
> So all set for Querying data using timestamp ?
> 
> Kafka version : kafka_2.11-0.10.2.1
> 
> Here is the code i'm using to search query -->
> https://gist.github.com/senthilec566/bc8ed1dfcf493f0bb5c473c50854dff9
> 
> requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(queryTime,
> 1));
> If i pass my own timestamp , always getting zero result ..
> *Same question asked here too
> **https://stackoverflow.com/questions/31917134/how-to-use-unix-timestamp-to-get-offset-using-simpleconsumer-api
> <https://stackoverflow.com/questions/31917134/how-to-use-unix-timestamp-to-get-offset-using-simpleconsumer-api>*
> .
> 
> 
> Also i could notice below error in index file:
> 
> *Found timestamp mismatch* in
> :/home/user/kafka-logs/topic-0/.timeindex
> 
>  Index timestamp: 0, log timestamp: 1495717686913
> 
> *Found out of order timestamp* in
> :/home/user/kafka-logs/topic-0/.timeindex
> 
>  Index timestamp: 0, Previously indexed timestamp: 1495719406912
> 
> Not sure what is missing here :-( ... Pls advise me here!
> 
> 
> Cheers,
> Senthil
> 
> On Thu, May 25, 2017 at 3:36 PM, SenthilKumar K <senthilec...@gmail.com>
> wrote:
> 
>> Thanks a lot Mayuresh. I will look into SearchMessageByTimestamp feature
>> in Kafka ..
>> 
>> Cheers,
>> Senthil
>> 
>> On Thu, May 25, 2017 at 1:12 PM, Mayuresh Gharat <
>> gharatmayures...@gmail.com> wrote:
>> 
>>> Hi Senthil,
>>> 
>>> Kafka does allow search message by timestamp after KIP-33 :
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+
>>> Add+a+time+based+log+index#KIP-33-Addatimebasedlogindex-S
>>> earchmessagebytimestamp
>>> 
>>> The new consumer does provide you a way to get offsets by timestamp. You
>>> can use these offsets to seek to that offset and consume from there. So if
>>> you want to consume between a range you can get the start and end offset
>>> based on the timestamps, seek to the start offset and consume and process
>>> the data till you reach the end offset.
>>> 
>>> But these timestamps are either CreateTime(when the message was created
>>> and you will have to specify this when you do the send()) or
>>> LogAppendTime(when the message was appended to the log on the kafka broker)
>>> : https://kafka.apache.org/0101/javadoc/org/apache/kafka/clien
>>> ts/producer/ProducerRecord.html
>>> 
>>> Kafka does not look at the fields in your data (key/value) for giving
>>> back you the data. What I meant was it will not look at the timestamp
>>> specified by you in the actual data payload.
>>> 
>>> Thanks,
>>> 
>>> Mayuresh
>>> 
>>> On Thu, May 25, 2017 at 12:43 PM, SenthilKumar K <senthilec...@gmail.com>
>>> wrote:
>>> 
>>>> Hello Dev Team, Pls let me know if any option to read data from Kafka
>>>> (all
>>>> partition ) using timestamp . Also can we set custom offset value to
>>>> messages ?
>>>> 
>>>> Cheers,
>>>> Senthil
>>>> 
>>>> On Wed, May 24, 2017 at 7:33 PM, SenthilKumar K <senthilec...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Hi All ,  We have been using Kafka for our Use Case which helps in
>>>>> delivering real time raw logs.. I have a requireme

Re: Why do I need to specify replication factor when creating a topic?

2017-05-11 Thread Hans Jespersen

If you enable auto topic creation that that is exactly what will happen.

There are pros and cons to creating topics with defaults values but if you fell 
strongly that is the way that you want Kafka to work it is entire possible to 
setup the system to work that way.

-hans




> On May 11, 2017, at 4:07 PM, Jeff Widman <j...@netskope.com> wrote:
> 
> When creating a new topic, why do I need to specify the replication factor
> and number of partitions?
> 
> I'd rather than when omitted, Kafka defaults to the value set in
> server.properties.
> 
> Was this an explicit design decision?



Re: Does Kafka producer waits till previous batch returns responce before sending next one?

2017-04-30 Thread Hans Jespersen
Yes you understand correctly that batch == request

-hans

> On Apr 30, 2017, at 11:58 AM, Petr Novak <oss.mli...@gmail.com> wrote:
> 
> Thank you a lot.
>  
> How requests in max.in.flight.requests.per.connection relates to batches? 1 
> request precisely means 1 batch? It would make sense if I think about it. 
> Just to ensure I understand correctly.
>  
> Petr
>  
> From: Michal Borowiecki [mailto:michal.borowie...@openbet.com] 
> Sent: 30. dubna 2017 20:51
> To: users@kafka.apache.org
> Subject: Re: Does Kafka producer waits till previous batch returns responce 
> before sending next one?
>  
> Yes, that's what the docs say in both places:
> 
> max.in.flight.requests.per.connection
> The maximum number of unacknowledged requests the client will send on a 
> single connection before blocking. Note that if this setting is set to be 
> greater than 1 and there are failed sends, there is a risk of message 
> re-ordering due to retries (i.e., if retries are enabled).
>  
> retries
> Setting a value greater than zero will cause the client to resend any record 
> whose send fails with a potentially transient error. Note that this retry is 
> no different than if the client resent the record upon receiving the error. 
> Allowing retries without setting max.in.flight.requests.per.connection to 1 
> will potentially change the ordering of records because if two batches are 
> sent to a single partition, and the first fails and is retried but the second 
> succeeds, then the records in the second batch may appear first.
> Cheers,
> Michał
> 
>  
> On 30/04/17 19:32, Jun MA wrote:
> Does this mean that if the client have retry > 0 and 
> max.in.flight.requests.per.connection > 1, then even if the topic only have 
> one partition, there’s still no guarantee of the ordering?
>  
> Thanks,
> Jun
> On Apr 30, 2017, at 7:57 AM, Hans Jespersen <h...@confluent.io> wrote:
>  
> There is a parameter that controls this behavior called max.in. 
> flight.requests.per.connection
>  
> If you set max.in. flight.requests.per.connection = 1 then the producer waits 
> until previous produce requests returns a response before sending the next 
> one (or retrying). The retries parameter controller the number of times to 
> attempt to produce a batch after a failure.
>  
> If flight.requests.per.connection = 1 and retries is get to the maximum then 
> ordering is guaranteed.
>  
> If there is a timeout then the producer library would try again and again to 
> produce the message and will not skip over to try and produce the next 
> message.
>  
> If you set flight.requests.per.connection > 1 (I think the default is 5) then 
> you can get a commit log with messages out of order wrt the original 
> published order (because retries are done in parallel rather then in series)
>  
> -hans
>  
>  
>  
> On Apr 30, 2017, at 3:13 AM, Petr Novak <oss.mli...@gmail.com> wrote:
>  
> Hello,
>  
> Does Kafka producer waits till previous batch returns response before
> sending next one? Do I assume correctly that it does not when retries can
> change ordering?
>  
>  
>  
> Hence batches delay is introduced only by producer internal send loop time
> and linger?
>  
>  
>  
> If a timeout would be localized only to a single batch send request for some
> reason, does it affect the next batch (assuming this batch can go through
> successfully)?
>  
>  
>  
> Many thanks,
>  
> Petr
>  
>  
>  
> -- 
> 
> 
> Michal Borowiecki
> Senior Software Engineer L4
> 
> T:
> +44 208 742 1600
> +44 203 249 8448
>  
> 
> E:
> michal.borowie...@openbet.com
> 
> W:
> www.openbet.com
> 
> OpenBet Ltd
> Chiswick Park Building 9
> 566 Chiswick High Rd
> London
> W4 5XT
> UK
> 
> This message is confidential and intended only for the addressee. If you have 
> received this message in error, please immediately notify the 
> postmas...@openbet.com and delete it from your system as well as any copies. 
> The content of e-mails as well as traffic data may be monitored by OpenBet 
> for employment and security purposes. To protect the environment please do 
> not print this e-mail unless necessary. OpenBet Ltd. Registered Office: 
> Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, United 
> Kingdom. A company registered in England and Wales. Registered no. 3134634. 
> VAT no. GB927523612
>  


Re: Does Kafka producer waits till previous batch returns responce before sending next one?

2017-04-30 Thread Hans Jespersen
There is a parameter that controls this behavior called max.in. 
flight.requests.per.connection

If you set max.in. flight.requests.per.connection = 1 then the producer waits 
until previous produce requests returns a response before sending the next one 
(or retrying). The retries parameter controller the number of times to attempt 
to produce a batch after a failure.

If flight.requests.per.connection = 1 and retries is get to the maximum then 
ordering is guaranteed.

If there is a timeout then the producer library would try again and again to 
produce the message and will not skip over to try and produce the next message.

If you set flight.requests.per.connection > 1 (I think the default is 5) then 
you can get a commit log with messages out of order wrt the original published 
order (because retries are done in parallel rather then in series)

-hans



> On Apr 30, 2017, at 3:13 AM, Petr Novak <oss.mli...@gmail.com> wrote:
> 
> Hello,
> 
> Does Kafka producer waits till previous batch returns response before
> sending next one? Do I assume correctly that it does not when retries can
> change ordering?
> 
> 
> 
> Hence batches delay is introduced only by producer internal send loop time
> and linger?
> 
> 
> 
> If a timeout would be localized only to a single batch send request for some
> reason, does it affect the next batch (assuming this batch can go through
> successfully)?
> 
> 
> 
> Many thanks,
> 
> Petr
> 


Re: How does replication affect kafka quota?

2017-04-24 Thread Hans Jespersen

Replication will not effect the users quota as it is done under a different 
replication quota (which you can control separately). The user should still see 
a 50 MBps maximum rate enforced into each broker.

-hans



> On Apr 23, 2017, at 11:39 PM, Archie <anubhavnidhi1...@gmail.com> wrote:
> 
> So by specifying a kafka quota for a user as 50 MBps, I can make sure it
> can write on a partition at broker X (imagine this user has only 1
> partition at this broker) at a max rate of 50 MBps. Now if the partition
> has a replica on another broker Y, will the user still be able to write
> data at rate 50MBps? Or will replicas slow down the user's write rate?
> 
> 
> Thanks,
> Archie



Re: Re: Re: ZK and Kafka failover testing

2017-04-19 Thread Hans Jespersen
The kafka-console-producer.sh defaults to acks=1 so just be careful with
using those tools for too much debugging. Your output is helpful though.

https://github.com/apache/kafka/blob/5a2fcdd6d480e9f003cc49a59d5952ba4c515a71/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L185

-hans

On Wed, Apr 19, 2017 at 3:52 PM, Shrikant Patel <spa...@pdxinc.com> wrote:

> Just to add, I see below behavior repeat with even command line console
> producer and consumer that come with Kafka.
>
> Thanks,
> Shri
> __
> Shrikant Patel  |  817.367.4302
> Enterprise Architecture Team
> PDX-NHIN
>
>
> -Original Message-
> From: Shrikant Patel
> Sent: Wednesday, April 19, 2017 5:49 PM
> To: users@kafka.apache.org
> Subject: RE: [EXTERNAL] Re: Re: ZK and Kafka failover testing
>
> Thanks Jeff, Onur, Jun, Hans. I am learning a lot from your response.
>
> Just to summarize briefly my steps, 5 node Kafka and ZK cluster.
> 1. ZK cluster has all node working. Consumer is down.
> 2. Bring down majority of ZK nodes.
> 3. Thing are functional no issue (no dup or lost message) 4. Now first
> kafka node come down.
> 5. My issue start happening - as you see below producer says message with
> key 34 and 35 failed.
> 6. Bring majority of ZK node up.
> 7. Other kafka nodes assumes leadership for node 1's topic.
> 8. Bring consumer up, it starts consuming from the last offset and I see
> duplicates. I see message 34 (3 times) and 35 (4 times)
>
>
> Jeff, in my case I don’t see issue with kafka cluster recovering, once the
> majority ZK nodes are up, other Kafka takes up leadership for down node
> immediately.
> Onur, as Jun mentioned since I have acks=all, I am not seeing any messages
> being lost.
>
> Jun, Hans, I had same thought of trying to eliminate the consumer getting
> duplicate because of incorrectly acking the message. In next run of this
> test case, I was not run client at all. My consumer, producer properties
> are in first email in this thread. As I understand RetriableException is
> for temporary issue and I would like retry to see if issue resolves itself
> by then, hence producer has retries =3
>
> Producer log
>
> *** Publisher #  Paritition - 12 Key - 26 Value - value 26
>  *** Publisher #  Paritition - 13 Key - 27 Value - value 27
>  *** Publisher #  Paritition - 14 Key - 28 Value - value 28
>  *** Publisher #  Paritition - 0 Key - 29 Value - value 29
>  *** Publisher #  Paritition - 1 Key - 30 Value - value 30
>  *** Publisher #  Paritition - 2 Key - 31 Value - value 31
>  *** Publisher #  Paritition - 3 Key - 32 Value - value 32
>  *** Publisher #  Paritition - 4 Key - 33 Value - value 33
>  *** Publisher #  Paritition - 5 Key - 34 Value - value 34
> 2017-04-19 16:39:08.008  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 37 on topic-partition ${topic-name}-5, retrying (2
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:39:39.128  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 39 on topic-partition ${topic-name}-5, retrying (1
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:40:10.271  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> with correlation id 41 on topic-partition ${topic-name}-5, retrying (0
> attempts left). Error: NETWORK_EXCEPTION
> 2017-04-19 16:40:41.419 ERROR 399580 --- [| shri-producer] 
> o.s.k.support.LoggingProducerListener
>   : Exception thrown when sending a message with key='34' and
> payload='value 34' to topic ${topic-name} and partition 5:
> org.apache.kafka.common.errors.NetworkException: The server disconnected
> before a response was received.
> 2017-04-19 16:42:50.621  INFO 399580 --- [pool-1-thread-1]
> c.p.p.SpringKafkaPublisher_Simple: *** Failed to
> publish  Paritition - 5 Key - 34 Value - value 34
> java.util.concurrent.ExecutionException: 
> org.springframework.kafka.core.KafkaProducerException:
> Failed to send; nested exception is 
> org.apache.kafka.common.errors.NetworkException:
> The server disconnected before a response was received.
> 2017-04-19 16:42:51.001  INFO 399580 --- [pool-1-thread-1]
> c.p.p.SpringKafkaPublisher_Simple: *** Publisher
> #  Paritition - 6 Key - 35 Value - value 35
> 2017-04-19 16:43:21.010  WARN 399580 --- [| shri-producer]
> o.a.k.clients.producer.internals.Sender  : Got error produce response
> w

Re: Re: ZK and Kafka failover testing

2017-04-19 Thread Hans Jespersen
The OP was asking about duplicate messages, not lost messages, so I think
we are discussing two different possible scenarios. When ever someone says
they see duplicate messages it's always good practice to first double check
ack mode, in flight messages, and retries. Also its important to check if
the messages are really duplicates in the Kafka log, or if they are just
seeing the same message reprocessed several times in the consumer due to
some other issue with offset commits.

-hans

On Wed, Apr 19, 2017 at 10:19 AM, Onur Karaman <onurkaraman.apa...@gmail.com
> wrote:

> If this is what I think it is, it has nothing to do with acks,
> max.in.flight.requests.per.connection, or anything client-side and is
> purely about the kafka cluster.
>
> Here's a simple example involving a single zookeeper instance, 3 brokers, a
> KafkaConsumer and KafkaProducer (neither of these clients interact with
> zookeeper).
> 1. start up zookeeper:
> > ./bin/zookeeper-server-start.sh config/zookeeper.properties
>
> 2. start up some brokers:
> > ./bin/kafka-server-start.sh config/server0.properties
> > ./bin/kafka-server-start.sh config/server1.properties
> > ./bin/kafka-server-start.sh config/server2.properties
>
> 3 create a topic:
> > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t
> --partitions 1 --replication-factor 3
>
> 4. start a console consumer (this needs to happen before step 5 so we can
> write __consumer_offsets metadata to zookeeper):
> > ./bin/kafka-console-consumer.sh --broker-list
> localhost:9090,localhost:9091,localhost:9092 --topic t
>
> 5. kill zookeeper
>
> 6. start a console producer and produce some messages:
> > ./bin/kafka-console-producer.sh --broker-list
> localhost:9090,localhost:9091,localhost:9092 --topic t
>
> 7. notice the size of the broker logs grow with each message you send:
> > l /tmp/kafka-logs*/t-0
>
> 8. notice the consumer consuming the messages being produced
>
> Basically, zookeeper can be completely offline and your brokers will append
> to logs and process client requests just fine as long as it doesn't need to
> interact with zookeeper. Today, the only way a broker knows to stop
> accepting requests is when it receives instruction from the controller.
>
> I first realized this last July when debugging a small production data loss
> scenario that was a result of this[1]. Maybe this is an attempt at leaning
> towards availability over consistency. Personally I think that brokers
> should stop accepting requests when it disconnects from zookeeper.
>
> [1] The small production data loss scenario happens when accepting requests
> during the small window in between a broker's zookeeper session expiration
> and when the controller instructs the broker to stop accepting requests.
> During this time, the broker still thinks it leads partitions that are
> currently being led by another broker, effectively resulting in a window
> where the partition is led by two brokers. Clients can continue sending
> requests to the old leader, and for producers with low acknowledgement
> settings (like ack=1), their messages will be lost without the client
> knowing, as the messages are being appended to the phantom leader's logs
> instead of the true leader's logs.
>
> On Wed, Apr 19, 2017 at 7:56 AM, Shrikant Patel <spa...@pdxinc.com> wrote:
>
> > While we were testing, our producer had following configuration
> > max.in.flight.requests.per.connection=1, acks= all and retries=3.
> >
> > The entire producer side set is below. The consumer has manual offset
> > commit, it commit offset after it has successfully processed the message.
> >
> > Producer setting
> > bootstrap.servers​= {point the F5 VS fronting Kafka cluster}
> > key.serializer= {appropriate value as per your cases}
> > value.serializer= {appropriate value as per your case}
> > acks= all
> > retries=3
> > ssl.key.password= {appropriate value as per your case}
> > ssl.keystore.location= {appropriate value as per your case}
> > ssl.keystore.password= {appropriate value as per your case}
> > ssl.truststore.location= {appropriate value as per your case}
> > ssl.truststore.password= {appropriate value as per your case}
> > batch.size=16384​
> > client.id= {appropriate value as per your case, may help with debugging}
> > max.block.ms​=65000
> > request.timeout.ms=3
> > security.protocol= SSL
> > ssl.enabled.protocols=TLSv1.2
> > ssl.keystore.type=JKS
> > ssl.protocol=TLSv1.2
> > ssl.truststore.type=JKS
> > max.in.flight.requests.per.connection=1
> > metadata.fetch.timeout.ms=6
> > reconnect.backoff.ms=1000
> > retry.backoff.ms​=1000
>

Re: ZK and Kafka failover testing

2017-04-18 Thread Hans Jespersen
When you publish, is acks=0,1 or all (-1)?
What is max.in.flight.requests.per.connection (default is 5)?

It sounds to me like your publishers are using acks=0 and so they are not
actually succeeding in publishing (i.e. you are getting no acks) but they
will retry over and over and will have up to 5 retries in flight, so when
the broker comes back up, you are getting 4 or 5 copies of the same
message.

Try setting max.in.flight.requests.per.connection=1 to get rid of duplicates
Try setting acks=all to ensure the messages are being persisted by the
leader and all the available replicas in the kafka cluster.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Tue, Apr 18, 2017 at 4:10 PM, Shrikant Patel <spa...@pdxinc.com> wrote:

> Hi All,
>
> I am seeing strange behavior between ZK and Kafka. We have 5 node in ZK
> and Kafka cluster each. Kafka version - 2.11-0.10.1.1
>
> The min.insync.replicas is 3, replication.factor is 5 for all topics,
> unclean.leader.election.enable is false. We have 15 partitions for each
> topic.
>
> The step we are following in our testing.
>
>
> * My understanding is that ZK needs aleast 3 out of 5 server to be
> functional. Kafka could not be functional without zookeeper. In out
> testing, we bring down 3 ZK nodes and don't touch Kafka nodes. Kafka is
> still functional, consumer\producer can still consume\publish from Kafka
> cluster. We then bring down all ZK nodes, Kafka consumer\producers are
> still functional. I am not able to understand why Kafka cluster is not
> failing as soon as majority of ZK nodes are down. I do see error in Kafka
> that it cannot connection to ZK cluster.
>
>
>
> * With all or majority of ZK node down, we bring down 1 Kafka
> nodes (out of 5, so 4 are running). And at that point the consumer and
> producer start failing. My guess is the new leadership election cannot
> happen without ZK.
>
>
>
> * Then we bring up the majority of ZK node up. (1st Kafka is still
> down) Now the Kafka cluster become functional, consumer and producer now
> start working again. But Consumer sees big junk of message from kafka, and
> many of them are duplicates. It's like these messages were held up
> somewhere, Where\Why I don't know?  And why the duplicates? I can
> understand few duplicates for messages that consumer would not commit
> before 1st node when down. But why so many duplicates and like 4 copy for
> each message. I cannot understand this behavior.
>
> Appreciate some insight about our issues. Also if there are blogs that
> describe the ZK and Kafka failover scenario behaviors, that would be
> extremely helpful.
>
> Thanks,
> Shri
>
> This e-mail and its contents (to include attachments) are the property of
> National Health Systems, Inc., its subsidiaries and affiliates, including
> but not limited to Rx.com Community Healthcare Network, Inc. and its
> subsidiaries, and may contain confidential and proprietary or privileged
> information. If you are not the intended recipient of this e-mail, you are
> hereby notified that any unauthorized disclosure, copying, or distribution
> of this e-mail or of its attachments, or the taking of any unauthorized
> action based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to civil
> and criminal prosecution and penalties. If you are not the intended
> recipient, please immediately notify the sender by telephone at
> 800-433-5719 or return e-mail and permanently delete the original e-mail.
>


Re: Kafka MTLS Support?

2017-04-12 Thread Hans Jespersen

Are you asking about Multiplexed Transport Layer Security (MTLS) - 
https://en.wikipedia.org/wiki/Multiplexed_Transport_Layer_Security 
<https://en.wikipedia.org/wiki/Multiplexed_Transport_Layer_Security>  or Mutual 
TLS authentication (mTLS) https://en.wikipedia.org/wiki/Mutual_authentication 
<https://en.wikipedia.org/wiki/Mutual_authentication> and can you provide more 
information about the motivation for your question.

-hans




> On Apr 12, 2017, at 1:50 AM, Sriram Srinivasaraghavan (srirsri2) 
> <srirs...@cisco.com> wrote:
> 
> Team,
> 
> Would like to know if MTLS is supported as a security protocol, for 
> inter-broker communications and clients to broker communications?
> Could someone throw some light?
> 
> Regards,
> Sriram S



Re: Kafka producer and consumer within on sync execution

2017-04-09 Thread Hans Jespersen
You posted the same question to Stack Overflow so I answered it there 

https://stackoverflow.com/questions/43302857/handling-sync-api-call-rest-spring-and-async-message-kafka-in-the-same-execu/43312070#43312070
 
<https://stackoverflow.com/questions/43302857/handling-sync-api-call-rest-spring-and-async-message-kafka-in-the-same-execu/43312070#43312070>

-hans




> On Apr 8, 2017, at 8:49 PM, Rams N <99ram...@gmail.com> wrote:
> 
> Hi,
> I've an usecase to respond to an API call to the client which should happen
> in sync. But within the api execution, the system A need to publish a kafka
> message to an different system B and which responds to another kafka topic.
> The response must be consumed by A and should respond to the client as API
> response.
> 
> So, here this problem has a async pub-sub model and also API handling,
> which is a sync.
> 
> Any suggestions on implementing this highly appreciated.
> 
> thanks
> Rams



Re: Kafka connector

2017-04-06 Thread Hans Jespersen
If you want both N2 and N3 to get all the same messages (rather than each
getting an exclusive partitioned subset of the data) then you need to
configure N2 and N3 to be in unique Kafka consumer groups which I believe
is driven off the "name" of the N2 and N3 connectors. Make sure N2 and N3
have different names.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Thu, Apr 6, 2017 at 4:26 PM, Tushar Sudhakar Jee <tus...@levyx.com>
wrote:

> Hello Sir/Ma'am,
> I was trying to write a simple case of using kafka connector. My setup
> involves using three nodes N1,N2 and N3.
> N1 is the source and N2, N3 are the sink nodes in my case.
> I am writing data to a text file(say input.txt) on Node N1 and using the
> standalone kafka connector I wish to see a text file with content similar
> to input.txt on the nodes N2 and N3.
>
> I am using the REST API to make changes in* topic name, file name and
> tasks.max*.
>
> However, during the experiments I ran I was unable to get a complete copy
> of the input.txt on both nodes(N2 and N3) at the *same time. *
> *Also tuning the value of tasks.max on nodes (N2 and N3) for the sink
> connector decided on which node data would be sent. *
>
> So, my question is whether I am wrong in expecting such an outcome?
> If so then what should I be expecting as a result of the experiment?
> If not then how do I get my desired outcome?
>
>
> Regards,
>
> --
>
> *Tushar Sudhakar Jee *| Software Engineer
>
> c *424.535.8225 <(424)%20535-8225>* | tus...@levyx.com <will...@levyx.com>
>
> [image: Levyx-Logo-Final 9%]
>
> 49 Discovery, Suite #220
>
> Irvine, CA 92618
>
> *www.levyx.com <http://www.levyx.com/>*
>
>
> Levyx | 49 Discovery, Suite #220 | Irvine, CA 92618 | www.levyx.com
>
> This email and any files transmitted with it are confidential and intended
> solely for the use of the individual or entity to whom they are addressed.
> If you have received this email in error please let us know by e-mail
> reply and delete it from your system; copying this message or disclosing
> its contents to anyone is strictly prohibited.
>


Re: How to increase network throughput of Kafka cluster?

2017-04-01 Thread Hans Jespersen
Then you will need even more parallel producers to saturate a 10 GigE network 
(if you don't hit you disk I/O limit first)

-hans

> On Apr 1, 2017, at 3:15 PM, Archie <anubhavnidhi1...@gmail.com> wrote:
> 
> My replication factor is 1.
> 
> Thanks,
> Archie
> 
>> On Sat, Apr 1, 2017 at 3:51 PM, Hans Jespersen <h...@confluent.io> wrote:
>> 
>> What replication factor are you using? If you have a default replication
>> factor = 3 then a publish rate of 1.4 Gbps is actually 1.4 Gbps *3 = 4.2
>> Gbps of network traffic. If you are also consuming at the same time then
>> it’s actually 4.2 Gbps + 1.4 Gbps = 5.6 Gbps.
>> 
>> You would completely saturate the network if you added a second producer
>> and consumer at those rates (if your storage system can keep up to the
>> network bandwidth).
>> 
>> -hans
>> 
>> 
>> 
>>> On Apr 1, 2017, at 10:25 AM, Archie <anubhavnidhi1...@gmail.com> wrote:
>>> 
>>> I have set up my kafka cluster in a network with 9.3 Gbps links. And am
>>> using the bin/kafka-producer-perf-test.sh script to test the throughput
>>> performance of kafka for a topic which has 1 partition.
>>> 
>>> Right now I am only able to use 1.4 Gbps of network link, i.e. the script
>>> returns a throughput of 1.4 Gbps (179 MBps)
>>> 
>>> My basic question is how can I increase the throughput of kafka to
>>> completely saturate the network?
>>> 
>>> Thanks,
>>> Archie
>> 
>> 


Re: How to increase network throughput of Kafka cluster?

2017-04-01 Thread Hans Jespersen
What replication factor are you using? If you have a default replication factor 
= 3 then a publish rate of 1.4 Gbps is actually 1.4 Gbps *3 = 4.2 Gbps of 
network traffic. If you are also consuming at the same time then it’s actually 
4.2 Gbps + 1.4 Gbps = 5.6 Gbps.  

You would completely saturate the network if you added a second producer and 
consumer at those rates (if your storage system can keep up to the network 
bandwidth).

-hans



> On Apr 1, 2017, at 10:25 AM, Archie <anubhavnidhi1...@gmail.com> wrote:
> 
> I have set up my kafka cluster in a network with 9.3 Gbps links. And am
> using the bin/kafka-producer-perf-test.sh script to test the throughput
> performance of kafka for a topic which has 1 partition.
> 
> Right now I am only able to use 1.4 Gbps of network link, i.e. the script
> returns a throughput of 1.4 Gbps (179 MBps)
> 
> My basic question is how can I increase the throughput of kafka to
> completely saturate the network?
> 
> Thanks,
> Archie



Re: Which is True? Kafka site vs Confluent 3.2 site upgrade doc details contradiction regarding 0.10.2 clients backward compatible to resp. 0.10.0 vs 0.10.1?

2017-04-01 Thread Hans Jespersen
They are both true. The Apache text is talking about the compatibility of the 
Producer/Consumer API and the Confluent text is talking about the Streams API.

-hans

> On Mar 31, 2017, at 11:46 PM, Roger Vandusen 
> <roger.vandu...@ticketmaster.com> wrote:
> 
> Read below and answer: So which is the source of truth ?
> Is 0.10.2.0 compatible to 0.10.0 or 0.10.1?
> 
> Which site needs correction?
> 
> 
> From current Kafka docs.
> Statement from Kafka site:
> https://kafka.apache.org/documentation/#upgrade
> 
> Starting with version 0.10.2, Java clients (producer and consumer) have 
> acquired the ability to communicate with older brokers.
> Version 0.10.2 clients can talk to version 0.10.0 or newer brokers.
> However, if your brokers are older than 0.10.0, you must upgrade all the 
> brokers in the Kafka cluster before upgrading your clients.
> Version 0.10.2 brokers support 0.8.x and newer clients. Before 0.10.2, Kafka 
> is backward compatible,
> which means that clients from Kafka 0.8.x releases (CP 1.0.x) will work with 
> brokers from Kafka 0.9.x, 0.10.0, 0.10.1 and 0.10.2
> releases (CP 2.0.x, 3.0.x, 3.1.x and 3.2.x), but not vice-versa.
> This means you always need to plan upgrades such that all brokers are 
> upgraded before clients.
> 
> 
> Hmm...do some more reading and research on Confluent site found this which 
> seems to contradict the above statement:
> http://docs.confluent.io/3.2.0/streams/upgrade-guide.html
> 
> 
> Upgrading from CP 3.1.x (Kafka 0.10.1.x-cp2) to CP 3.2.0 (Kafka 0.10.2.0-cp1)
> Compatibility
> Kafka Streams applications built with CP 3.2.0 (Kafka 0.10.2.0-cp1) are 
> forward and backward compatible with certain Kafka clusters.
> 
> Compatibility Matrix:
> 
> 
> Kafka Broker (columns)
> 
> Streams API (rows)
> 
> 3.0.x / 0.10.0.x
> 
> 3.1.x / 0.10.1.x
> 
> 3.2.0 / 0.10.2.0
> 
> 3.0.x / 0.10.0.x
> 
> compatible
> 
> compatible
> 
> compatible
> 
> 3.1.x / 0.10.1.x
> 
> 
> 
> compatible
> 
> compatible
> 
> 3.2.0 / 0.10.2.0
> 
> 
> 
> compatible
> 
> compatible
> 
> 
> EMPHASIS ON CONTRADICTION BELOW from site desciption:
> 
> Backward-compatible to CP 3.1.x clusters (Kafka 0.10.1.x-cp2):
> This is the first release allowing to upgrade your Kafka Streams application 
> without a broker upgrade.
> New Kafka Streams applications built with CP 3.2.0 (Kafka 0.10.2.x-cp1) will 
> work with older Kafka clusters running CP 3.1.x (Kafka 0.10.1.x-cp2).
> Kafka clusters running CP 3.0.x (Kafka 0.10.0.x-cp1) are not compatible with 
> new CP 3.2.0 Kafka Streams applications though.
> 
> -Roger
> 


Re: How to assign client-id to a particular kafka producer or topic?

2017-03-31 Thread Hans Jespersen
username comes from authenticated clients
client.id can be assigned by any client (no authentication required).

It’s hard to enforce a quota on a client.id when the clients can just change 
the code to use a difference client.id, hence the recent enhancement to add 
user quotas based on the authenticated username which cannot be changed without 
the consent of the broker administrator.

0hans



> On Mar 31, 2017, at 9:16 AM, Archie  wrote:
> 
> Thanks for the reply. I had few more questions regarding quotas
> 
> What is the difference between the user-quota and client-quota? How can we
> assign user-id to producers?
> 
> Also is it possible to assign client-id to a topic (or) partitions
> belonging to a topic? If yes how can we do that?
> 
> 
> 
> 
> Thanks,
> Archie
> 
> On Fri, Mar 31, 2017 at 11:05 AM, Manikumar 
> wrote:
> 
>> you can pass client-id using --producer-props option.
>> ex: --producer-props client.id=id1
>> 
>> On Fri, Mar 31, 2017 at 9:32 PM, Archie 
>> wrote:
>> 
>>> I know that quotas are based on client-id
>>> 
>>> Basically I want to run the kafka-producer-perf-test with a particular
>>> client id to test whether the quotas work properly
>>> 
>>> My question is how can I assign a client-id for a particular producer
>> (or)
>>> partition?
>>> 
>>> 
>>> 
>>> Thanks,
>>> Archie
>>> 
>> 



Re: kafka not throwing any exception not any response in call back

2017-03-30 Thread Hans Jespersen
In your producer have you set acks to be “all” and retries to be something 
higher than “0”?


props.put("acks", "all"); 
props.put("retries", 2147483647);
Also if the order of the messages matter you should also set the max inflight 
messages to 1 so retries happen and succeed before any other messages are sent.
props.put(“max.in.flight.requests.per.connection”,1);

-hans


> On Mar 30, 2017, at 7:15 AM, Laxmi Narayan <nit.dgp...@gmail.com> wrote:
> 
> Hi ,
>  I am using kafka 10.2 and sometime my producer does not sends me any ACK
> and in that case data is also not pushed.
> 
> Whenever I get ACK I am able to consume data.
> 
> But this is happening quiet often with me and i have no clue why data is
> not being pushed inside and my request ends gracefully and no exception no
> response inside callback code.
> 
> Any extra param or config to enable debug for such cases ?
> 
> 
> Keep learning keep moving .



Re: kafka is not accepting number of partitions from configuration

2017-03-26 Thread Hans Jespersen
The num.partitions parameter is a server/broker config but you are using it as 
a client/producer parameter so it will not work and will be ignored.

http://stackoverflow.com/questions/22152269/how-to-specify-number-of-partitions-on-kafka-2-8

I assume the CLI command you are using is the administrative kafka-topics.sh 
tool which talks directly to zookeeper and the Kafka brokers to create or 
modify topics in the Kafka cluster. This will work to create a topic before you 
start your producer app.

-hans

> On Mar 26, 2017, at 2:00 AM, Laxmi Narayan <nit.dgp...@gmail.com> wrote:
> 
> ​Hi ,
> Kafka not accepting number of partitions from config , while from CLI it is
> accepting.
> 
> What am I missing here ?
> 
> 
> props.put("bootstrap.servers",  kafkaConstants.Bootstrap_Servers);
> props.put("enable.auto.commit", kafkaConstants.Enable_Auto_Commit);
> props.put("auto.commit.interval.ms",
> kafkaConstants.Auto_Commit_Interval_Ms);
> props.put("session.timeout.ms", kafkaConstants.Session_Timeout_Ms);
> props.put("linger.ms",  "1");
> 
> props.put("key.deserializer",   kafkaConstants.Key_Deserializer);
> props.put("value.deserializer", kafkaConstants.Value_Deserializer);
> 
> props.put("key.serializer", kafkaConstants.Key_Serializer);
> props.put("value.serializer",   kafkaConstants.Value_Serializer);
> 
> props.put("partitioner.class",  kafkaConstants.Partitioner_Class);
> props.put("num.partitions", 999);
> props.put("group.id",   kafkaConstants.KafkaGroupId);
> 
> 
> 
> ​
> 
> 
> Keep learning keep moving .


Re: Out of order message processing with Kafka Streams

2017-03-21 Thread Hans Jespersen
Yes, and yes!

-hans



> On Mar 21, 2017, at 7:45 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> 
> That would require
> 
> - Knowing the current window's id (or some other identifier) to
> differentiate it from other windows
> 
> - Being able to process individual messages in a window
> 
> Are those 2 things possible w/ kafka streams? (java)
> 
> On Tue, Mar 21, 2017 at 7:43 PM, Hans Jespersen <h...@confluent.io> wrote:
> 
>> While it's not exactly the same as the window start/stop time you can
>> store (in the state store) the earliest and latest timestamps of any
>> messages in each window and use that as a good approximation for the window
>> boundary times.
>> 
>> -hans
>> 
>>> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>>> 
>>> Yeah, windowing seems perfect, if only I could find out the current
>>> window's start time (so I can log the current bucket's start & end times)
>>> and process window messages individually rather than as aggregates.
>>> 
>>> It doesn't seem like i can get this metadata from ProcessorContext
>> though,
>>> from looking over the javadocs
>>> 
>>>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io>
>> wrote:
>>>> 
>>>> Ali,
>>>> 
>>>> what you describe is (roughly!) how Kafka Streams implements the
>> internal
>>>> state stores to support windowing.
>>>> 
>>>> Some users have been following a similar approach as you outlined, using
>>>> the Processor API.
>>>> 
>>>> 
>>>> 
>>>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com>
>> wrote:
>>>>> 
>>>>> It would be helpful to know the 'start' and 'end' of the current
>>>> metadata,
>>>>> so if an out of order message arrives late, and is being processed in
>>>>> foreach(), you'd know which window / bucket it belongs to, and can
>> handle
>>>>> it accordingly.
>>>>> 
>>>>> I'm guessing that's not possible at the moment.
>>>>> 
>>>>> (My use case is, i receive a stream of messages. Messages need to be
>>>> stored
>>>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
>>>> gap
>>>>> of 30 mins or more since the last message (under a key), a new
>> 'session'
>>>>> (bucket) should be started, and future messages should belong to that
>>>>> 'session', until the next 30+ min gap).
>>>>> 
>>>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io>
>>>>> wrote:
>>>>> 
>>>>>>> Can windows only be used for aggregations, or can they also be used
>>>> for
>>>>>> foreach(),
>>>>>> and such?
>>>>>> 
>>>>>> As of today, you can use windows only in aggregations.
>>>>>> 
>>>>>>> And is it possible to get metadata on the message, such as whether or
>>>>>> not its
>>>>>> late, its index/position within the other messages, etc?
>>>>>> 
>>>>>> If you use the Processor API of Kafka Streams, you can have access to
>>>> an
>>>>>> incoming record's topic, partition, offset, etc. via the so-called
>>>>>> ProcessorContext (which is updated for every new incoming record):
>>>>>> 
>>>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>>>> apache/kafka/streams/processor/Processor.html
>>>>>> - You can get/store a reference to the ProcessorContext from
>>>>>> `Processor#init()`.
>>>>>> 
>>>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>>>> apache/kafka/streams/processor/ProcessorContext.html
>>>>>> - The context can then be used within `Processor#process()` when you
>>>>>> process a new record.  As I said, the context is updated behind the
>>>>> scenes
>>>>>> to match the record that is currently being processed.
>>>>>> 
>>>>>> 
>>>>>> Best,
>>>>>> Michael
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Ak

Re: Out of order message processing with Kafka Streams

2017-03-21 Thread Hans Jespersen
While it's not exactly the same as the window start/stop time you can store (in 
the state store) the earliest and latest timestamps of any messages in each 
window and use that as a good approximation for the window boundary times.  

-hans

> On Mar 20, 2017, at 1:00 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
> 
> Yeah, windowing seems perfect, if only I could find out the current
> window's start time (so I can log the current bucket's start & end times)
> and process window messages individually rather than as aggregates.
> 
> It doesn't seem like i can get this metadata from ProcessorContext though,
> from looking over the javadocs
> 
>> On Tue, Mar 21, 2017 at 12:38 AM, Michael Noll <mich...@confluent.io> wrote:
>> 
>> Ali,
>> 
>> what you describe is (roughly!) how Kafka Streams implements the internal
>> state stores to support windowing.
>> 
>> Some users have been following a similar approach as you outlined, using
>> the Processor API.
>> 
>> 
>> 
>>> On Mon, Mar 20, 2017 at 7:54 PM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>>> 
>>> It would be helpful to know the 'start' and 'end' of the current
>> metadata,
>>> so if an out of order message arrives late, and is being processed in
>>> foreach(), you'd know which window / bucket it belongs to, and can handle
>>> it accordingly.
>>> 
>>> I'm guessing that's not possible at the moment.
>>> 
>>> (My use case is, i receive a stream of messages. Messages need to be
>> stored
>>> and sorted into 'buckets', to indicate 'sessions'. Each time there's a
>> gap
>>> of 30 mins or more since the last message (under a key), a new 'session'
>>> (bucket) should be started, and future messages should belong to that
>>> 'session', until the next 30+ min gap).
>>> 
>>> On Mon, Mar 20, 2017 at 11:44 PM, Michael Noll <mich...@confluent.io>
>>> wrote:
>>> 
>>>>> Can windows only be used for aggregations, or can they also be used
>> for
>>>> foreach(),
>>>> and such?
>>>> 
>>>> As of today, you can use windows only in aggregations.
>>>> 
>>>>> And is it possible to get metadata on the message, such as whether or
>>>> not its
>>>> late, its index/position within the other messages, etc?
>>>> 
>>>> If you use the Processor API of Kafka Streams, you can have access to
>> an
>>>> incoming record's topic, partition, offset, etc. via the so-called
>>>> ProcessorContext (which is updated for every new incoming record):
>>>> 
>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>> apache/kafka/streams/processor/Processor.html
>>>> - You can get/store a reference to the ProcessorContext from
>>>> `Processor#init()`.
>>>> 
>>>> http://docs.confluent.io/current/streams/javadocs/org/
>>>> apache/kafka/streams/processor/ProcessorContext.html
>>>> - The context can then be used within `Processor#process()` when you
>>>> process a new record.  As I said, the context is updated behind the
>>> scenes
>>>> to match the record that is currently being processed.
>>>> 
>>>> 
>>>> Best,
>>>> Michael
>>>> 
>>>> 
>>>> 
>>>> 
>>>> On Mon, Mar 20, 2017 at 5:59 PM, Ali Akhtar <ali.rac...@gmail.com>
>>> wrote:
>>>> 
>>>>> Can windows only be used for aggregations, or can they also be used
>> for
>>>>> foreach(), and such?
>>>>> 
>>>>> And is it possible to get metadata on the message, such as whether or
>>> not
>>>>> its late, its index/position within the other messages, etc?
>>>>> 
>>>>> On Mon, Mar 20, 2017 at 9:44 PM, Michael Noll <mich...@confluent.io>
>>>>> wrote:
>>>>> 
>>>>>> And since you asked for a pointer, Ali:
>>>>>> http://docs.confluent.io/current/streams/concepts.html#windowing
>>>>>> 
>>>>>> 
>>>>>> On Mon, Mar 20, 2017 at 5:43 PM, Michael Noll <
>> mich...@confluent.io>
>>>>>> wrote:
>>>>>> 
>>>>>>> Late-arriving and out-of-order data is only treated specially for
>>>>>> windowed
>>>>>>> aggregations.
>>>>>>> 
>>>>>>> For sta

Re: validate identity of producer in each record

2017-03-20 Thread Hans Jespersen
Nothing on the broker today but if you use Kafka Connect API in 0.10.2 and 
above there is a pluggable interface called Transformations. 

See org.apache.kafka.connect.transforms in 
https://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/connect

Source Connector transformations happen before storage in the Kafka log and 
Sink Connector transformations happen afterwards for consumers.

-hans

> On Mar 20, 2017, at 6:52 PM, Matt Magoffin <apache@msqr.us> wrote:
> 
> Thanks, Hans.
> 
> Signing messages is a good idea. Other than that, is there possibly an 
> extension point in Kafka itself on the receiving of records, before they are 
> stored/distributed? I was thinking along the lines of
> 
> org.apache.kafka.clients.producer.ProducerInterceptor
> 
> but on the server side?
> 
> — m@
> 
>> On 21/03/2017, at 12:22 PM, Hans Jespersen <h...@confluent.io> wrote:
>> 
>> You can configure Kafka with ACLs that only allow certain users to
>> produce/consume to certain topics but if multiple producers are allowed to
>> produce to a shared topic then you cannot identify them without adding
>> something to the messages.
>> 
>> For example, you can have each producer digitally sign (or encrypt) each
>> message and include the signature as a separate field (ie. separate from
>> the original message body). Then the consumers can independently check that
>> the signature is valid and that he message comes from a known/valid
>> publisher. This pattern is similar to how signed email messages work.
>> 
>> -hans
> 


Re: validate identity of producer in each record

2017-03-20 Thread Hans Jespersen
You can configure Kafka with ACLs that only allow certain users to
produce/consume to certain topics but if multiple producers are allowed to
produce to a shared topic then you cannot identify them without adding
something to the messages.

For example, you can have each producer digitally sign (or encrypt) each
message and include the signature as a separate field (ie. separate from
the original message body). Then the consumers can independently check that
the signature is valid and that he message comes from a known/valid
publisher. This pattern is similar to how signed email messages work.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Mon, Mar 20, 2017 at 3:54 PM, Matt Magoffin <apache@msqr.us> wrote:

> Hello,
>
> I am new to Kafka and am looking for a way for consumers to be able to
> identify the producer of each message in a topic. There are a large number
> of producers (lets say on the order of millions), and each producer would
> be connecting via SSL and using a unique client certificate. Essentially
> I'd like consumers to know the certificate of the producer of any given
> message.
>
> The producer identify of the message must not be forgeable, i.e. producer
> A must not be allowed to generate records that appear to consumers to be
> from producer B.
>
> Is it possible for Kafka to be configured to perform this type of identity
> validation? For example:
>
> * reject records that contain a certificate identity that differs from the
> producer connection’s client certificate
> * inject the producer connection’s certificate identity into each record
>
> Or would a proxy application need to sit in front of Kafka to perform one
> of these functions?
>
> Thank you in advance for offering any advice,
> Matt
>
>


Re: Out of order message processing with Kafka Streams

2017-03-18 Thread Hans Jespersen
Yes stream processing and CEP are subtlety different things. 

Kafka Streams helps you write stateful apps and allows that state to be 
preserved on disk (a local State store) as well as distributed for HA or for 
parallel partitioned processing (via Kafka topic partitions and consumer 
groups) as well as in memory (as a performance enhancement).

However a classical CEP engine with a pre-modeled state machine and pattern 
matching rules is something different from stream processing.

It is on course possible to build a CEP system on top on Kafka Streams and get 
the best of both worlds.

-hans

> On Mar 18, 2017, at 11:36 AM, Sabarish Sasidharan <sabarish@gmail.com> 
> wrote:
> 
> Hans
> 
> What you state would work for aggregations, but not for state machines and
> CEP.
> 
> Regards
> Sab
> 
>> On 19 Mar 2017 12:01 a.m., "Hans Jespersen" <h...@confluent.io> wrote:
>> 
>> The only way to make sure A is consumed first would be to delay the
>> consumption of message B for at least 15 minutes which would fly in the
>> face of the principals of a true streaming platform so the short answer to
>> your question is "no" because that would be batch processing not stream
>> processing.
>> 
>> However, Kafka Streams does handle late arriving data. So if you had some
>> analytics that computes results on a time window or a session window then
>> Kafka streams will compute on the stream in real time (processing message
>> B) and then later when message A arrives it will put that message back into
>> the right temporal context and publish an amended result for the proper
>> time/session window as if message B were consumed in the timestamp order
>> before message A. The end result of this flow is that you eventually get
>> the same results you would get in a batch processing system but with the
>> added benefit of getting intermediary result at much lower latency.
>> 
>> -hans
>> 
>> /**
>> * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>> * h...@confluent.io (650)924-2670
>> */
>> 
>>> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>>> 
>>> Is it possible to have Kafka Streams order messages correctly by their
>>> timestamps, even if they arrived out of order?
>>> 
>>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
>>> timestamp of 5:15 PM, are sent.
>>> 
>>> Message B arrives sooner than Message A, due to network issues.
>>> 
>>> Is it possible to make sure that, across all consumers of Kafka Streams
>>> (even if they are across different servers, but have the same consumer
>>> group), Message A is consumed first, before Message B?
>>> 
>>> Thanks.
>>> 
>> 


Re: Out of order message processing with Kafka Streams

2017-03-18 Thread Hans Jespersen
The only way to make sure A is consumed first would be to delay the
consumption of message B for at least 15 minutes which would fly in the
face of the principals of a true streaming platform so the short answer to
your question is "no" because that would be batch processing not stream
processing.

However, Kafka Streams does handle late arriving data. So if you had some
analytics that computes results on a time window or a session window then
Kafka streams will compute on the stream in real time (processing message
B) and then later when message A arrives it will put that message back into
the right temporal context and publish an amended result for the proper
time/session window as if message B were consumed in the timestamp order
before message A. The end result of this flow is that you eventually get
the same results you would get in a batch processing system but with the
added benefit of getting intermediary result at much lower latency.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:

> Is it possible to have Kafka Streams order messages correctly by their
> timestamps, even if they arrived out of order?
>
> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
> timestamp of 5:15 PM, are sent.
>
> Message B arrives sooner than Message A, due to network issues.
>
> Is it possible to make sure that, across all consumers of Kafka Streams
> (even if they are across different servers, but have the same consumer
> group), Message A is consumed first, before Message B?
>
> Thanks.
>


Re: Out of order message processing with Kafka Streams

2017-03-18 Thread Hans Jespersen
sorry I mixed up Message A and B wrt the to question but the answer is
still valid.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Sat, Mar 18, 2017 at 11:07 AM, Hans Jespersen <h...@confluent.io> wrote:

> The only way to make sure A is consumed first would be to delay the
> consumption of message B for at least 15 minutes which would fly in the
> face of the principals of a true streaming platform so the short answer to
> your question is "no" because that would be batch processing not stream
> processing.
>
> However, Kafka Streams does handle late arriving data. So if you had some
> analytics that computes results on a time window or a session window then
> Kafka streams will compute on the stream in real time (processing message
> B) and then later when message A arrives it will put that message back into
> the right temporal context and publish an amended result for the proper
> time/session window as if message B were consumed in the timestamp order
> before message A. The end result of this flow is that you eventually get
> the same results you would get in a batch processing system but with the
> added benefit of getting intermediary result at much lower latency.
>
> -hans
>
> /**
>  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
>  * h...@confluent.io (650)924-2670 <(650)%20924-2670>
>  */
>
> On Sat, Mar 18, 2017 at 10:29 AM, Ali Akhtar <ali.rac...@gmail.com> wrote:
>
>> Is it possible to have Kafka Streams order messages correctly by their
>> timestamps, even if they arrived out of order?
>>
>> E.g, say Message A with a timestamp of 5:00 PM and Message B with a
>> timestamp of 5:15 PM, are sent.
>>
>> Message B arrives sooner than Message A, due to network issues.
>>
>> Is it possible to make sure that, across all consumers of Kafka Streams
>> (even if they are across different servers, but have the same consumer
>> group), Message A is consumed first, before Message B?
>>
>> Thanks.
>>
>
>


Re: kafka-topics[.sh]: fail to support connecting via broker / v0.10 style

2017-03-17 Thread Hans Jespersen
I can be updated once the Kafka AdminAPI is available and does everything
over the Kafka wire protocol that the current kafka-topics command does by
talking directly with zookeeper. For example create a topic or delete a
topic. Unfortunately is has to remain this way for just a little while
longer.

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Fri, Mar 17, 2017 at 1:20 PM, Andrew Pennebaker <
andrew.penneba...@gmail.com> wrote:

> If I understand Kafka correctly, since v0.9 / v0.10, users are often
> recommended to connect consumers to the Kafka cluster via bootstrap.servers
> AKA broker node addresses.
>
> However, the kafka-topics shell scripts fails to support this interface,
> still requiring the legacy zookeeper connect string.
>
> Could this be updated?
>
> --
> Cheers,
> Andrew
>


Re: Performance and Encryption

2017-03-15 Thread Hans Jespersen
You are correct that a Kafka broker is not just writing to one file. Jay Kreps 
wrote a great blog post with lots of links to even greater detail on the topic 
of Kafka and disk write performance. Still a good read many years later.

https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
 
<https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines>

-hans


> On Mar 15, 2017, at 7:51 AM, Nicolas MOTTE <nicolas.mo...@amadeus.com> wrote:
> 
> Ok that makes sense, thanks !
> 
> The next question I have regarding performance is about the way Kafka writes 
> in the data files.
> I often hear Kafka is very performant because it writes in an append-only 
> fashion.
> So even with hard disk (not SSD) we get a great performance because it writes 
> in sequence.
> 
> I could understand that if Kafka was only writing to one file.
> But in reality it s writing to N files, N being the number of partitions 
> hosted by the broker.
> So even though it appends the data to each file, overall I assume it is not 
> writing in sequence on the disk.
> 
> Am I wrong ?
> 
> -Original Message-
> From: Tauzell, Dave [mailto:dave.tauz...@surescripts.com] 
> Sent: 08 March 2017 22:09
> To: users@kafka.apache.org
> Subject: RE: Performance and Encryption
> 
> I think because the product batches messages which could be for different 
> topics.
> 
> -Dave
> 
> -Original Message-
> From: Nicolas MOTTE [mailto:nicolas.mo...@amadeus.com]
> Sent: Wednesday, March 8, 2017 2:41 PM
> To: users@kafka.apache.org
> Subject: Performance and Encryption
> 
> Hi everyone,
> 
> I understand one of the reasons why Kafka is performant is by using zero-copy.
> 
> I often hear that when encryption is enabled, then Kafka has to copy the data 
> in user space to decode the message, so it has a big impact on performance.
> 
> If it is true, I don t get why the message has to be decoded by Kafka. I 
> would assume that whether the message is encrypted or not, Kafka simply 
> receives it, appends it to the file, and when a consumer wants to read it, it 
> simply reads at the right offset...
> 
> Also I m wondering if it s the case if we don t use keys (pure queuing system 
> with key=null).
> 
> Cheers
> Nico
> 
> This e-mail and any files transmitted with it are confidential, may contain 
> sensitive information, and are intended solely for the use of the individual 
> or entity to whom they are addressed. If you have received this e-mail in 
> error, please notify the sender by reply e-mail immediately and destroy all 
> copies of the e-mail and any attachments.
> 



Re: Common Identity between brokers

2017-03-14 Thread Hans Jespersen
This might be useful reading as it outlines why Cluster ID was added and lists 
a few ways that clusters can be identifies prior to that feature enhancement.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-78%3A+Cluster+Id 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-78:+Cluster+Id>

-hans




> On Mar 14, 2017, at 11:20 AM, Sumit Maheshwari <sumitm.i...@gmail.com> wrote:
> 
> Can anyone answer the above query?
> 
> On Mon, Mar 13, 2017 at 3:41 PM, Sumit Maheshwari <sumitm.i...@gmail.com>
> wrote:
> 
>> Hi,
>> 
>> How can we identify if a set of brokers (nodes) belong to same cluster?
>> I understand we can use the zookeeper where all the brokers pointing to
>> same zookeeper URL's belong to same cluster.
>> But is there a common identity between brokers which can help identify if
>> brokers belong to same cluster?
>> 
>> I have seen in recent Kafka release there is concept of clusterId but that
>> is available from 0.10.1.0. I am using little older version of Kafka.
>> 
>> Thanks,
>> Sumit
>> 



Re: Kafka Retention Policy to Indefinite

2017-03-14 Thread Hans Jespersen
I am saying that replication quotas will mitigate one of the potential
downsides of setting an infinite retention policy.

There is no clear set yes/no best practice rule for setting an extremely
large retention policy. It is clearly a valid configuration and there are
people who run this way.

The issues have more to do will the amount of data you expect to be stored
over the life of the system. If you have a Kafka cluster with petabytes of
data in it and a consumer comes along and blindly consumes from the
beginning, they will be getting a lot of data. So much so that this might
be considered an anti-pattern because their apps might not behave as they
expect and the network bandwidth used by lots of clients operating this way
may be considered bad practice.

Another way to avoid collecting too much data is to use compacted topics,
which are a special kind of topic that keeps the latest value for each key
forever, but removes the older messages with the same key in order to
reduce the total about of messages stored.

How much data do you expect to store in your largest topic over the life of
the cluster?

-hans





/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Tue, Mar 14, 2017 at 10:36 AM, Joe San <codeintheo...@gmail.com> wrote:

> So that means with replication quotas, I can set the retention policy to be
> infinite?
>
> On Tue, Mar 14, 2017 at 6:25 PM, Hans Jespersen <h...@confluent.io> wrote:
>
> > You might want to use the new replication quotas mechanism (i.e. network
> > throttling) to make sure that replication traffic doesn't negatively
> impact
> > your production traffic.
> >
> > See for details:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 73+Replication+Quotas
> >
> > This feature was added in 0.10.1
> >
> > -hans
> >
> > /**
> >  * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
> >  * h...@confluent.io (650)924-2670
> >  */
> >
> > On Tue, Mar 14, 2017 at 10:09 AM, Joe San <codeintheo...@gmail.com>
> wrote:
> >
> > > Dear Kafka Users,
> > >
> > > What are the arguments against setting the retention plociy on a Kafka
> > > topic to infinite? I was in an interesting discussion with one of my
> > > colleagues where he was suggesting to set the retention policy for a
> > topic
> > > to be indefinite.
> > >
> > > So how does this play up when adding new broker partitions? Say, I have
> > > accumulated in my topic some gigabytes of data and now I realize that I
> > > have to scale up by adding another partition. Now is this going to pose
> > me
> > > a problem? The partition rebalance has to happen and I'm not sure what
> > the
> > > implications are with rebalancing a partition that has gigabytes of
> data.
> > >
> > > Any thoughts on this?
> > >
> > > Thanks and Regards,
> > > Jothi
> > >
> >
>


Re: Kafka Retention Policy to Indefinite

2017-03-14 Thread Hans Jespersen
You might want to use the new replication quotas mechanism (i.e. network
throttling) to make sure that replication traffic doesn't negatively impact
your production traffic.

See for details:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-73+Replication+Quotas

This feature was added in 0.10.1

-hans

/**
 * Hans Jespersen, Principal Systems Engineer, Confluent Inc.
 * h...@confluent.io (650)924-2670
 */

On Tue, Mar 14, 2017 at 10:09 AM, Joe San <codeintheo...@gmail.com> wrote:

> Dear Kafka Users,
>
> What are the arguments against setting the retention plociy on a Kafka
> topic to infinite? I was in an interesting discussion with one of my
> colleagues where he was suggesting to set the retention policy for a topic
> to be indefinite.
>
> So how does this play up when adding new broker partitions? Say, I have
> accumulated in my topic some gigabytes of data and now I realize that I
> have to scale up by adding another partition. Now is this going to pose me
> a problem? The partition rebalance has to happen and I'm not sure what the
> implications are with rebalancing a partition that has gigabytes of data.
>
> Any thoughts on this?
>
> Thanks and Regards,
> Jothi
>


Re: Question on Metadata

2017-03-14 Thread Hans Jespersen
You may also be interested to try out the new Confluent JMS client for Kafka. 
It implements the JMS 1.1. API along with all the JMS metadata fields and 
access methods. It does this by putting/getting the JMS metadata into the body 
of an underlying Kafka message which is defined with a special JMS AVRO schema 
that includes both the JMS metadata as well as the JMS message body (which can 
be any of the JMS message types).

-hans


> On Mar 14, 2017, at 9:26 AM, Robert Quinlivan <rquinli...@signal.co> wrote:
> 
> Did you look at the ConsumerRecord
> <https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html>
> class?
> 
> On Tue, Mar 14, 2017 at 11:09 AM, Syed Mudassir Ahmed <
> smudas...@snaplogic.com> wrote:
> 
>> Can anyone help?
>> 
>> -- Forwarded message --
>> From: "Syed Mudassir Ahmed" <smudas...@snaplogic.com>
>> Date: Mar 14, 2017 12:28 PM
>> Subject: Question on Metadata
>> To: <users@kafka.apache.org>
>> Cc:
>> 
>> Hi guys,
>>  When we consume a JMS message, we get a Message object that has methods
>> to fetch implicit metadata provided by JMS server.
>> http://docs.oracle.com/javaee/6/api/javax/jms/Message.html.  There are
>> methods to fetch that implicit metadata such as Expiration, Correlation ID,
>> etc.
>> 
>>  Is there a way to fetch any such implicit metadata while consuming a
>> kafka message?  Any pointers to do that would be greatly appreciated.
>> 
>>  Thanks in advance.
>> 
>> --
>> Thanks,
>> Syed.
>> 
> 
> 
> 
> -- 
> Robert Quinlivan
> Software Engineer, Signal



  1   2   >