Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-10 Thread Ratha v
I checked my target topic and I see few messages than the source topic. (If
source topic have 5 messages, I see 2 messages in my target topic) What
settings I need to do ?

And, when I try to consume message from the target topic, I get ClassCast
Exception.

java.lang.ClassCastException: java.lang.String cannot be cast to
xx.yy.core.kafkamodels.KafkaPayload;

* receivedPayload = (KafkaPayload) consumerRecord.value();*


I Merge two topics like;

* KStreamBuilder builder = new KStreamBuilder();*

* KStream kafkaPayloadStream =
builder.stream(sourceTopics);*

* kafkaPayloadStream.to(targetTopic);*

* streams = new KafkaStreams(builder, properties);*

* streams.start();*


Why do I see classcast exception when consuming the message?


On 11 October 2016 at 15:19, Ratha v  wrote:

> Hi all;
> I have custom datatype defined (a pojo class).
> I copy  messages from one topic to another topic.
> I do not see any messages in my target topic.
> This works fro string messages, but not for my custom message.
> Waht might be the cause?
> I followed this sample [1]
> [1]
> https://github.com/apache/kafka/blob/trunk/streams/
> examples/src/main/java/org/apache/kafka/streams/examples/
> pipe/PipeDemo.java
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: Data loss when ack != -1

2016-10-10 Thread Justin Lin
Hi Andrew,

Thank you for your reply.  I did some experiments on the setting of ack ==
-1 and had a few seconds of downtime, but it's way better than losing
messages so i will go with that. Thanks again for your help.

-- Justin

On Mon, Oct 10, 2016 at 6:49 AM, Andrew Grasso 
wrote:

> Hi Justin,
>
> Setting the required acks to -1 does not require that all assigned brokers
> are available, only that all members of the ISR are available. If a broker
> goes down, the producer is able to commit messages once the faulty broker
> is evicted from the ISR. This can continue even if only one broker is
> alive, in which case only that broker will be eligible to be leader. If
> you'd like to ensure that all committed messages are present on at least N
> machines, set min.insync.replicas to N and required acks to -1.
>
> -Andrew
>
> On Fri, Oct 7, 2016 at 5:05 PM, Justin Lin 
> wrote:
>
> > Hi everyone,
> >
> > I am currently running kafka 0.8.1.1 in a cluster, with 6 brokers and i
> set
> > the replication factor to 3. My producer set the ack to be 2 when
> producing
> > messages. I recently came across a bad situation that i had to reboot one
> > broker machine by shutdown the power, and that caused data loss.
> >
> > This is what actually happened.
> >
> > Producer 1(PD1) sends message (M100) to Partition 10 (leader h1, ISR h1,
> > h2, h3) and since the ack == 2, so as long as there are two brokers
> > acknowledged, M100 is considered as committed and ready for consumer.  So
> > h1 and h2 got M100 and consumer (C1) pulls M100 down and handle the
> > message. So far so good, we are just waiting for h3 to catch up.
> > But before that, h1 gets shutdown and h3 doesn't get the change the get
> > M100, while still in ISR. So partition 88 will choose a new leader from
> h2
> > and h3. And somehow (randomly) it chooses h3 so M100 in h2 will be
> > truncated and the data is lost.
> > But this is not the worst part, because consumer C1 already got M100.
> After
> > C1 handled the message it commits it's offset(100) back to a key value
> > store and started to pull message 101 from new leader h3. Since h3
> doesn't
> > have the M100, it responded with error "Offset out of bound".
> > Now Producer PD1 Keeps producing messages to partition 88, say it
> produces
> > two message (M1 and M2), The offset of M1 and M2 in h3 is 100 and 101.
> Now
> > consumer C1 pulls the messages from h3 at offset 101, it sees one message
> > M2. There M1 will never be processed by consumer.
> >
> > This is extremely bad because the producer get acknowledgement but the
> > consumer will never be able to process the message.
> >
> > I googled a bit on how to solve the problem. Most of the post suggest to
> > change the ack to be -1(all). That is also prone to failure since now if
> > one broker is down, producers will lose the ability to produce any data.
> >
> > I want to seek for more wisdom on how to solve this problem in the
> > community. Any idea or previous experience is welcome.
> >
> > Thanks ahead.
> >
> > --
> > come on
> >
>



-- 
come on


KafkaStream Merging two topics is not working fro custom datatypes

2016-10-10 Thread Ratha v
Hi all;
I have custom datatype defined (a pojo class).
I copy  messages from one topic to another topic.
I do not see any messages in my target topic.
This works fro string messages, but not for my custom message.
Waht might be the cause?
I followed this sample [1]
[1]
https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java


-- 
-Ratha
http://vvratha.blogspot.com/


Re: How to merge two topics in kafka?

2016-10-10 Thread Ratha v
I checked my targetTopic for available messages, but it says '0' . What
might cause issue here to merge  two topics  with custom type messages ?

On 11 October 2016 at 14:44, Ratha v  wrote:

> Thanks, this demo works perfectly for string messages.
>
> I have custom messageType defined( a java pojo class). And i have SerDe
> implemented for that.
> Now after merging sourceTopic-->Target Topic,
> I could not consume the messages..Means, Consumer does not return any
> messages.
>
> What might be the cause?
>
> On 10 October 2016 at 17:54, Sachin Mittal  wrote:
>
>> Check this example
>> https://github.com/apache/kafka/blob/trunk/streams/examples/
>> src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java
>>
>> On Mon, Oct 10, 2016 at 11:34 AM, Ratha v  wrote:
>>
>> > Hi Sachin;
>> > I went through the KStream/KTable Documentation. My scenario is very
>> > simple..I want to merge two topics( ie: Send messages available in the
>> > topic A -->topic B , in my case i'll be having only single message in
>> that
>> > topicA)
>> >
>> > Do I need Stateful processing (KStream)?
>> >
>> > Thanks.
>> >
>> > On 10 October 2016 at 13:10, Ratha v  wrote:
>> >
>> > > Thank you.I'll try the solution.
>> > > But in the highlevel consumer API, topics will be created
>> automatically?
>> > ,
>> > > We are not using zookeeper?
>> > >
>> > > On 10 October 2016 at 12:34, Sachin Mittal 
>> wrote:
>> > >
>> > >> You can use topicA.leftJoin (topicB).to (new-topic).
>> > >>
>> > >> You can the consume message from that new topic via second process.
>> Note
>> > >> you need to create all three topics in zookeeper first.
>> > >>
>> > >> On 10 Oct 2016 5:19 a.m., "Ratha v"  wrote:
>> > >>
>> > >> Hi all;
>> > >>
>> > >> I have two topics in the broker. Without consuming from one topic, I
>> > want
>> > >> to merge both topics, and will consume messages from the second
>> topic.
>> > >>
>> > >> It is because, I have two processes, one process, pushes messages to
>> > topic
>> > >> A. And the second process once finished processing, it wants to merge
>> > both
>> > >> topicA and TopicB. Then another process will consume messages from
>> the
>> > >> merged topic.
>> > >>
>> > >> How can I merge both topics in high level kafka APIs. I use Kafka
>> > >> 0.10.0.1.
>> > >>
>> > >>
>> > >> Thanks.
>> > >> --
>> > >> -Ratha
>> > >> http://vvratha.blogspot.com/
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > -Ratha
>> > > http://vvratha.blogspot.com/
>> > >
>> >
>> >
>> >
>> > --
>> > -Ratha
>> > http://vvratha.blogspot.com/
>> >
>>
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: How to merge two topics in kafka?

2016-10-10 Thread Ratha v
Thanks, this demo works perfectly for string messages.

I have custom messageType defined( a java pojo class). And i have SerDe
implemented for that.
Now after merging sourceTopic-->Target Topic,
I could not consume the messages..Means, Consumer does not return any
messages.

What might be the cause?

On 10 October 2016 at 17:54, Sachin Mittal  wrote:

> Check this example
> https://github.com/apache/kafka/blob/trunk/streams/
> examples/src/main/java/org/apache/kafka/streams/examples/
> pipe/PipeDemo.java
>
> On Mon, Oct 10, 2016 at 11:34 AM, Ratha v  wrote:
>
> > Hi Sachin;
> > I went through the KStream/KTable Documentation. My scenario is very
> > simple..I want to merge two topics( ie: Send messages available in the
> > topic A -->topic B , in my case i'll be having only single message in
> that
> > topicA)
> >
> > Do I need Stateful processing (KStream)?
> >
> > Thanks.
> >
> > On 10 October 2016 at 13:10, Ratha v  wrote:
> >
> > > Thank you.I'll try the solution.
> > > But in the highlevel consumer API, topics will be created
> automatically?
> > ,
> > > We are not using zookeeper?
> > >
> > > On 10 October 2016 at 12:34, Sachin Mittal  wrote:
> > >
> > >> You can use topicA.leftJoin (topicB).to (new-topic).
> > >>
> > >> You can the consume message from that new topic via second process.
> Note
> > >> you need to create all three topics in zookeeper first.
> > >>
> > >> On 10 Oct 2016 5:19 a.m., "Ratha v"  wrote:
> > >>
> > >> Hi all;
> > >>
> > >> I have two topics in the broker. Without consuming from one topic, I
> > want
> > >> to merge both topics, and will consume messages from the second topic.
> > >>
> > >> It is because, I have two processes, one process, pushes messages to
> > topic
> > >> A. And the second process once finished processing, it wants to merge
> > both
> > >> topicA and TopicB. Then another process will consume messages from the
> > >> merged topic.
> > >>
> > >> How can I merge both topics in high level kafka APIs. I use Kafka
> > >> 0.10.0.1.
> > >>
> > >>
> > >> Thanks.
> > >> --
> > >> -Ratha
> > >> http://vvratha.blogspot.com/
> > >>
> > >
> > >
> > >
> > > --
> > > -Ratha
> > > http://vvratha.blogspot.com/
> > >
> >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> >
>



-- 
-Ratha
http://vvratha.blogspot.com/


Re: [VOTE] 0.10.1.0 RC1

2016-10-10 Thread Jason Gustafson
The documentation is mostly fixed now: http://kafka.apache.org/
0101/documentation.html. Thanks to Derrick Or for all the help. Let me know
if anyone notices any additional problems.

-Jason

On Mon, Oct 10, 2016 at 1:10 PM, Jason Gustafson  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the second candidate for release of Apache Kafka 0.10.1.0. This is
> a minor release that includes great new features including throttled
> replication, secure quotas, time-based log searching, and queryable state
> for Kafka Streams. A full list of the content can be found here:
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.
>
> One quick note on the docs. Because of all the recent improvements, the
> documentation is still a bit out of sync with what's visible on the Kafka
> homepage. This should be fixed soon (definitely before the release is
> finalized).
>
> Release notes for the 0.10.1.0 release:
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc1/RELEASE_NOTES.html
> 
>
> *** Please download, test and vote by Thursday, Oct 13, 1pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc1/
> 
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc1/javadoc/
> 
>
> * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc1 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> 6eda15a97ffe17d636c390c0e0b28c8349993941
>
> * Documentation:
> http://kafka.apache.org/0101/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0101/protocol.html
>
> * Tests:
> Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/59/
> System tests: http://testing.confluent.io/confluent-kafka-0-10-1-system-
> test-results/?prefix=2016-10-10--001.1476110532--apache--0.10.1--e696f17/
>
> Thanks,
>
> Jason
>


Re: Tuning for high RAM and 10GBe

2016-10-10 Thread Christopher Stelly
With that link I came across the producer-perf-test tool, quite useful as
it gets rid of the Go (Sarama) variable. Since it can quickly tweak
settings, it's extremely useful.

As you suggested Eno, I attempted to copy the LinkedIn settings. With 100
byte records, I get up to about 600,000 records/second. Still not quite
what they were able to do with cheap hardware.

As far as throughput, I tend to max out at around 200MB/s on a single
producer (record size 1, no acks, linger 100, batch size 100 and
with compression), along with a generous HEAP_OPT env setting of 50G.

If I do the above settings with 4 producers, things start to slow down. It
seems to add up to around 500MB/s, which is about what the SSD can write
at.

Could this number be improved if I let the memory take care of this instead
of flushing to disk? I understand that Kafka likes to flush often, but even
relaxing broker's flush settings I can't seem to make an impact on this
500MB/s number (w/ 4 producers). After all, we have a lot of RAM to play
with. The hackish solution would be to make a tempfs mount and store
kafka-logs there, but that seems like the wrong approach. Any thoughts? Do
you think flushing is my hold up at this point?

Again, thanks!

On Mon, Oct 10, 2016 at 12:45 PM, Christopher Stelly 
wrote:

> Sure, good ideas. I'll try multiple producers, localhost and LAN, to see
> if any difference
>
> Yep, Gwen, the Sarama client. Anything to worry about there outside of
> setting the producer configs (which would you set?) and number of buffered
> channels? (currently, buffered channels up to 10k).
>
> Thanks!
>
> On Mon, Oct 10, 2016 at 12:04 PM, Gwen Shapira  wrote:
>
>> Out of curiosity - what is "Golang's Kafka interface"? Are you
>> referring to Sarama client?
>>
>> On Sun, Oct 9, 2016 at 9:28 AM, Christopher Stelly 
>> wrote:
>> > Hello,
>> >
>> > The last thread available regarding 10GBe is about 2 years old, with no
>> > obvious recommendations on tuning.
>> >
>> > Is there a more complex tuning guide than the example production config
>> > available on Kafka's main site? Anything other than the list of possible
>> > configs?
>> >
>> > I currently have access to a rather substantial academic cluster to test
>> > on, including multiple machines with the following hardware:
>> >
>> > 10GBe NICs
>> > 250GB RAM each
>> > SSDs on each
>> > (also, optional access to single NVMe)
>> >
>> > Using Golang's Kafka interface, I can only seem to get about 80MB/s on
>> the
>> > producer pushing to logs on the localhost, using no replication and
>> reading
>> > from/logging to SSD. If it helps, I can post my configs. I've tried
>> > fiddling with a bunch of broker configs as well as producer configs,
>> > raising the memory limits, max message size, io&network threads etc.
>> >
>> > Since the last post from 2014 indicates that there is no public
>> > benchmarking for 10GBe, I'd be happy to run benchmarks /publish results
>> on
>> > this hardware if we can get it tuned up properly.
>> >
>> > What kind of broker/producer/consumer settings would you recommend?
>> >
>> > Thanks!
>> > - chris
>>
>>
>>
>> --
>> Gwen Shapira
>> Product Manager | Confluent
>> 650.450.2760 | @gwenshap
>> Follow us: Twitter | blog
>>
>
>


[VOTE] 0.10.1.0 RC1

2016-10-10 Thread Jason Gustafson
Hello Kafka users, developers and client-developers,

This is the second candidate for release of Apache Kafka 0.10.1.0. This is
a minor release that includes great new features including throttled
replication, secure quotas, time-based log searching, and queryable state
for Kafka Streams. A full list of the content can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.

One quick note on the docs. Because of all the recent improvements, the
documentation is still a bit out of sync with what's visible on the Kafka
homepage. This should be fixed soon (definitely before the release is
finalized).

Release notes for the 0.10.1.0 release:
http://home.apache.org/~jgus/kafka-0.10.1.0-rc1/RELEASE_NOTES.html


*** Please download, test and vote by Thursday, Oct 13, 1pm PT

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~jgus/kafka-0.10.1.0-rc1/


* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~jgus/kafka-0.10.1.0-rc1/javadoc/


* Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc1 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=6eda15a97ffe17d636c390c0e0b28c8349993941

* Documentation:
http://kafka.apache.org/0101/documentation.html

* Protocol:
http://kafka.apache.org/0101/protocol.html

* Tests:
Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/59/
System tests:
http://testing.confluent.io/confluent-kafka-0-10-1-system-test-results/?prefix=2016-10-10--001.1476110532--apache--0.10.1--e696f17/

Thanks,

Jason


Frequent Consumer Rebalance/ Commit fail exception

2016-10-10 Thread Misra, Rahul
Hi,

I have a custom Kafka consumer which reads messages from a topic, hands over 
the processing of the messages  to a different thread, and while the messages 
are being processed, it pauses the topic and keeps polling the Kafka topic (to 
maintain heartbeats) and also commits offsets using commitSync() once the 
processing thread returns success.
This consumer is the only consumer in its group. Auto commit for offsets is set 
to false.

The consumer also registers the onPartitionsAssigned() and 
onPartitionsRevoked() listeners.

Recently I observed that the consumer frequently crashes (if consuming large 
number of messages) with the following exception:


org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot 
be completed due to group rebalance at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)

None of the rebalance listeners were called before this exception.
Could somebody suggest why this rebalancing is being triggered, always while 
committing the offsets (or is the actual issue somewhere else?)

Regards,
Rahul Misra

This email message and any attachments are intended solely for the use of the 
addressee. If you are not the intended recipient, you are prohibited from 
reading, disclosing, reproducing, distributing, disseminating or otherwise 
using this transmission. If you have received this message in error, please 
promptly notify the sender by reply email and immediately delete this message 
from your system. This message and any attachments may contain information that 
is confidential, privileged or exempt from disclosure. Delivery of this message 
to any person other than the intended recipient is not intended to waive any 
right or privilege. Message transmission is not guaranteed to be secure or free 
of software viruses. 
***


Re: Tuning for high RAM and 10GBe

2016-10-10 Thread Christopher Stelly
Sure, good ideas. I'll try multiple producers, localhost and LAN, to see if
any difference

Yep, Gwen, the Sarama client. Anything to worry about there outside of
setting the producer configs (which would you set?) and number of buffered
channels? (currently, buffered channels up to 10k).

Thanks!

On Mon, Oct 10, 2016 at 12:04 PM, Gwen Shapira  wrote:

> Out of curiosity - what is "Golang's Kafka interface"? Are you
> referring to Sarama client?
>
> On Sun, Oct 9, 2016 at 9:28 AM, Christopher Stelly 
> wrote:
> > Hello,
> >
> > The last thread available regarding 10GBe is about 2 years old, with no
> > obvious recommendations on tuning.
> >
> > Is there a more complex tuning guide than the example production config
> > available on Kafka's main site? Anything other than the list of possible
> > configs?
> >
> > I currently have access to a rather substantial academic cluster to test
> > on, including multiple machines with the following hardware:
> >
> > 10GBe NICs
> > 250GB RAM each
> > SSDs on each
> > (also, optional access to single NVMe)
> >
> > Using Golang's Kafka interface, I can only seem to get about 80MB/s on
> the
> > producer pushing to logs on the localhost, using no replication and
> reading
> > from/logging to SSD. If it helps, I can post my configs. I've tried
> > fiddling with a bunch of broker configs as well as producer configs,
> > raising the memory limits, max message size, io&network threads etc.
> >
> > Since the last post from 2014 indicates that there is no public
> > benchmarking for 10GBe, I'd be happy to run benchmarks /publish results
> on
> > this hardware if we can get it tuned up properly.
> >
> > What kind of broker/producer/consumer settings would you recommend?
> >
> > Thanks!
> > - chris
>
>
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Re: difficulty to delete a topic because of its syntax

2016-10-10 Thread Gwen Shapira
Just note that in general doing what Todd advice is pretty risky.
We've seen controllers get into all kinds of weird situations when
topics were deleted from ZK directly (including getting stuck in an
infinite loop, deleting unrelated topics and all kinds of strangeness)
- we have no tests for those scenarios so behavior can get really
unexpected.

On Thu, Oct 6, 2016 at 2:12 AM, Hamza HACHANI  wrote:
> Thanks Todd,
>
>
> I've resolved it by suing what you told me.
>
> Thanks very much. But i think that there is a problem with kafka by letting 
> the saving names of topic and logs where there is a space as i showes in the 
> images.
>
> Have a good day to you all.
>
>
> Hamza
>
> 
> De : Hamza HACHANI 
> Envoyé : mercredi 5 octobre 2016 19:23:00
> À : users@kafka.apache.org
> Objet : RE: difficulty to delete a topic because of its syntax
>
>
> Hi,
>
> Attached the files showing what i'm talking about.
>
>
> Hamza
>
> 
> De : Todd S 
> Envoyé : mercredi 5 octobre 2016 07:25:48
> À : users@kafka.apache.org
> Objet : Re: difficulty to delete a topic because of its syntax
>
> You *could* go in to zookeeper and nuke the topic, then delete the files on
> disk
>
> Slightly more risky but it should work
>
> On Wednesday, 5 October 2016, Manikumar  wrote:
>
>> Kafka doesn't support white spaces in topic names.  Only support '.', '_'
>> and '-' these are allowed.
>> Not sure how you got white space in topic name.
>>
>> On Wed, Oct 5, 2016 at 8:19 PM, Hamza HACHANI > >
>> wrote:
>>
>> > Well ackwardly when i list the topics i find it but when i do delete it
>> it
>> > says that this topic does not exist.
>> >
>> > 
>> > De : Ben Davison >
>> > Envoyé : mercredi 5 octobre 2016 02:37:14
>> > À : users@kafka.apache.org 
>> > Objet : Re: difficulty to delete a topic because of its syntax
>> >
>> > Try putting "" or '' around the string when running the command.
>> >
>> > On Wed, Oct 5, 2016 at 3:29 PM, Hamza HACHANI > >
>> > wrote:
>> >
>> > > It's between "the" and "metric"
>> > >
>> > > 
>> > > De : Ali Akhtar >
>> > > Envoyé : mercredi 5 octobre 2016 02:16:33
>> > > À : users@kafka.apache.org 
>> > > Objet : Re: difficulty to delete a topic because of its syntax
>> > >
>> > > I don't see a space in that topic name
>> > >
>> > > On Wed, Oct 5, 2016 at 6:42 PM, Hamza HACHANI > >
>> > > wrote:
>> > >
>> > > > Hi,
>> > > >
>> > > > I created a topic called device-connection-invert-key-value-the
>> > > > metric-changelog.
>> > > >
>> > > > I insit that there is a space in it.
>> > > >
>> > > >
>> > > >
>> > > > Now that i want to delete it because my  cluster can no longer work
>> > > > correctly i can't do it as it  only reads the first part of it : (
>> > > > device-connection-invert-key-value-the) which obviously it doesn't
>> > find.
>> > > >
>> > > > Does some body have a wolution to delete it ?
>> > > >
>> > > > Thanks in advance.
>> > > >
>> > > >
>> > > > Hamza
>> > > >
>> > > >
>> > >
>> >
>> > --
>> >
>> >
>> > This email, including attachments, is private and confidential. If you
>> have
>> > received this email in error please notify the sender and delete it from
>> > your system. Emails are not secure and may contain viruses. No liability
>> > can be accepted for viruses that might be transferred by this email or
>> any
>> > attachment. Any unauthorised copying of this message or unauthorised
>> > distribution and publication of the information contained herein are
>> > prohibited.
>> >
>> > 7digital Limited. Registered office: 69 Wilson Street, London EC2A 2BB.
>> > Registered in England and Wales. Registered No. 04843573.
>> >
>>



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: Tuning for high RAM and 10GBe

2016-10-10 Thread Gwen Shapira
Out of curiosity - what is "Golang's Kafka interface"? Are you
referring to Sarama client?

On Sun, Oct 9, 2016 at 9:28 AM, Christopher Stelly  wrote:
> Hello,
>
> The last thread available regarding 10GBe is about 2 years old, with no
> obvious recommendations on tuning.
>
> Is there a more complex tuning guide than the example production config
> available on Kafka's main site? Anything other than the list of possible
> configs?
>
> I currently have access to a rather substantial academic cluster to test
> on, including multiple machines with the following hardware:
>
> 10GBe NICs
> 250GB RAM each
> SSDs on each
> (also, optional access to single NVMe)
>
> Using Golang's Kafka interface, I can only seem to get about 80MB/s on the
> producer pushing to logs on the localhost, using no replication and reading
> from/logging to SSD. If it helps, I can post my configs. I've tried
> fiddling with a bunch of broker configs as well as producer configs,
> raising the memory limits, max message size, io&network threads etc.
>
> Since the last post from 2014 indicates that there is no public
> benchmarking for 10GBe, I'd be happy to run benchmarks /publish results on
> this hardware if we can get it tuned up properly.
>
> What kind of broker/producer/consumer settings would you recommend?
>
> Thanks!
> - chris



-- 
Gwen Shapira
Product Manager | Confluent
650.450.2760 | @gwenshap
Follow us: Twitter | blog


Re: puncutuate() bug

2016-10-10 Thread Guozhang Wang
Hello David,

Your observation is correct, the stream time reasoning is dependent on the
buffered records from each of the input topic-partitions, and hence is
"data-driven".

Currently to get around this I'd recommend letting the producer to send
certain "marker" messages periodically to ensure stream time advance; and
for the near future we are working on improving our stream time reasoning
logic, and hopefully resolving such data-driven dependencies in the next
release:

https://issues.apache.org/jira/browse/KAFKA-3514


Guozhang


On Sat, Oct 8, 2016 at 12:29 PM, David Garcia  wrote:

> Actually, I think the bug is more subtle.  What happens when a consumed
> topic stops receiving messages?  The smallest timestamp will always be the
> static timestamp of this topic.
>
> -David
>
> On 10/7/16, 5:03 PM, "David Garcia"  wrote:
>
> Ok I found the bug.  Basically, if there is an empty topic (in the
> list of topics being consumed), any partition-group with partitions from
> the topic will always return -1 as the smallest timestamp (see
> PartitionGroup.java).
>
> To reproduce, simply start a kstreams consumer with one or more empty
> topics.  Punctuate will never be called.
>
> -David
>
> On 10/7/16, 1:11 PM, "David Garcia"  wrote:
>
> Yeah, this is possible.  We have run the application (and have
> confirmed data is being received) for over 30 mins…with a 60-second timer.
> So, do we need to just rebuild our cluster with bigger machines?
>
> -David
>
> On 10/7/16, 11:18 AM, "Michael Noll"  wrote:
>
> David,
>
> punctuate() is still data-driven at this point, even when
> you're using the
> WallClock timestamp extractor.
>
> To use an example: Imagine you have configured punctuate() to
> be run every
> 5 seconds.  If there's no data being received for a minute,
> then punctuate
> won't be called -- even though you probably would have
> expected this to
> happen 12 times during this 1 minute.
>
> (FWIW, there's an ongoing discussion to improve punctuate(),
> part of which
> is motivated by the current behavior that arguably is not very
> intuitive to
> many users.)
>
> Could this be the problem you're seeing?  See also the related
> discussion
> at
> http://stackoverflow.com/questions/39535201/kafka-
> problems-with-timestampextractor
> .
>
>
>
>
>
>
> On Fri, Oct 7, 2016 at 6:07 PM, David Garcia <
> dav...@spiceworks.com> wrote:
>
> > Hello, I’m sure this question has been asked many times.
> > We have a test-cluster (confluent 3.0.0 release) of 3 aws
> m4.xlarges.  We
> > have an application that needs to use the punctuate()
> function to do some
> > work on a regular interval.  We are using the WallClock
> extractor.
> > Unfortunately, the method is never called.  I have checked
> the
> > filedescriptor setting for both the user as well as the
> process, and
> > everything seems to be fine.  Is this a known bug, or is
> there something
> > obvious I’m missing?
> >
> > One note, the application used to work on this cluster, but
> now it’s not
> > working.  Not really sure what is going on?
> >
> > -David
> >
>
>
>
>
>
>
>


-- 
-- Guozhang


Re: Hard delete topics

2016-10-10 Thread Radoslaw Gruchalski
Sachin, do you delete.topic.enable=true in your server.properties?

–
Best regards,
Radek Gruchalski
ra...@gruchalski.com


On October 10, 2016 at 4:11:36 PM, Sachin Mittal (sjmit...@gmail.com) wrote:

Hi,
We are doing some testing and need to frequently wipe out the kafka logs
and delete the topic completely. There is no problem starting/stopping
zookeeper and server multiple times.

So what is the best way of purging a topic and removing its reference from
zookeeper entirely.

I can physically delete the logs dir, but what else I need to do to remove
the topic.

when I do:
kafka-topics.sh--zookeeper localhost:2181 --delete --topic topic-name it
only marks the topic for deleting but does not delete it.

Thanks
Sachin


Re: Tuning for high RAM and 10GBe

2016-10-10 Thread Eno Thereska
Hi Chris,

I think the first step would be to set up the system so we can easily identify 
the bottlenecks. With your setup I'm currently worried about 2 things:

1. the system is not being driven hard enough. In particular 1 producer might 
not be enough. I'd recommend running 3 producer processes.
2. CPU interference between producers and Kafka itself, since they are on the 
same box. Given that you have 10Gbps you should probably set up the producer on 
a different machine to Kafka (so a 3 machine setup: producer, zk, 1 kafka 
broker). 

With these two in place, the next thing to look at would be perf metrics to see 
if CPU, network and storage is the bottleneck. Would you have access to those 
metrics while the test is running.

Also, for calibration, I'd run a test with small (100 byte) values. That way we 
can compare to 
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
 

 for example.

Does this help?

Thanks
Eno


> On 9 Oct 2016, at 18:19, Christopher Stelly  wrote:
> 
> Hey Eno, thanks
> 
> Yes, localhost would mean testing SSD, maybe moving to a remote producer in
> the future over 10GBe but baby steps first.
> 
> My record size is pretty flexible - I've tested from 4k to 16M and the
> results are +- 10MB/s.
> 
> SSD is a samsung 850, claiming 100k IOPS and >500MB/s read/write.
> 
> Again, thanks!
> 
> On Sun, Oct 9, 2016 at 12:55 PM, Eno Thereska 
> wrote:
> 
>> Hi Chris,
>> 
>> A couple of things: looks like you're primarily testing the SSD if you're
>> running on the localhost, right? The 10GBe shouldn't matter in this case.
>> 
>> The performance will depend a lot on the record sizes you are using. To
>> get a ballpark number if would help to know the SSD type, would you be
>> willing to share it? We can take it from there.
>> 
>> Thanks
>> Eno
>> 
>>> On 9 Oct 2016, at 17:28, Christopher Stelly  wrote:
>>> 
>>> Hello,
>>> 
>>> The last thread available regarding 10GBe is about 2 years old, with no
>>> obvious recommendations on tuning.
>>> 
>>> Is there a more complex tuning guide than the example production config
>>> available on Kafka's main site? Anything other than the list of possible
>>> configs?
>>> 
>>> I currently have access to a rather substantial academic cluster to test
>>> on, including multiple machines with the following hardware:
>>> 
>>> 10GBe NICs
>>> 250GB RAM each
>>> SSDs on each
>>> (also, optional access to single NVMe)
>>> 
>>> Using Golang's Kafka interface, I can only seem to get about 80MB/s on
>> the
>>> producer pushing to logs on the localhost, using no replication and
>> reading
>>> from/logging to SSD. If it helps, I can post my configs. I've tried
>>> fiddling with a bunch of broker configs as well as producer configs,
>>> raising the memory limits, max message size, io&network threads etc.
>>> 
>>> Since the last post from 2014 indicates that there is no public
>>> benchmarking for 10GBe, I'd be happy to run benchmarks /publish results
>> on
>>> this hardware if we can get it tuned up properly.
>>> 
>>> What kind of broker/producer/consumer settings would you recommend?
>>> 
>>> Thanks!
>>> - chris
>> 
>> 



Re: difficulty to delete a topic because of its syntax

2016-10-10 Thread Avi Flax

> On Oct 6, 2016, at 09:59, Ismael Juma  wrote:
> 
> On Thu, Oct 6, 2016 at 2:51 PM, Avi Flax  wrote:
>> 
>> Does this mean that the next release (after 0.10.1.0, maybe ~Feb?) might
>> remove altogether the requirement that Streams apps be able to access
>> ZooKeeper directly?
> 
> 
> That's the plan. See the following PR for details:
> 
> https://github.com/apache/kafka/pull/1884

Excellent! Thank you!


Software Architect @ Park Assist
We’re hiring! http://tech.parkassist.com/jobs/

Fwd: Content based enhancement for Apache Kafka

2016-10-10 Thread Janagan Sivagnanasundaram
Publisher/Subscriber systems can be divided into two categories.
1) Topic based model
2) Content based model - Provide accurate results compared to topic based
model, since subscribers interested on the content of the message rather
than subscribing to a topic and getting all the messages.

Kafka is a topic based subscription model. So I thought of enhancing the
Kafka framework to Content based model. For this enhancement purposes, I
have come up with two ideas.

First one is without modifying Kafka, enable a separate layer in between
Kafka broker and subscriber and which can be used as filter the messages
from the producers according to the interests externally specified by the
subscribers. (Like a string match/search)

Second one is extract the key words of the messages  produced by producers
and attach the keyword list as a header to each message and send it. For
this we can use any POS taggers. After that, enable the subscribers to
enter their interests externally and check their interests matches with the
header of the message without analyzing the entire message. If there is a
match deliver the corresponding message to subscribers else reject.

Usually second method consumes time compared to first one.

Any other ideas to perform the above Content based enhancement in efficient
way? Or any other optimization that we can inject to the above proposed
architectures?

Thanks.

Regards,
Janagan S.


Kafka producer 0.9.0.x if not closed.

2016-10-10 Thread Usman Liaqat
Hi ,

I am using kafka 0.9.0.x and in a multithreaded system I have created only
one instance of Kafka producer.

When I say producer.close(); it only closes communication and cannot send
the messages on topic.

BUT why its object is still valid if we cannot send requests to it. 

 

Moreover, I can understand kafka producer threads opened and not closed will
leak resources,

Why kafka cannot have a timer on Producer thread if it is idle close it and
make it available for GC and on need recreate it?

 

Thanks 

Usman.



Re: puncutuate() never called

2016-10-10 Thread David Garcia
Thx for the responses.  I was able to identify a bug in how the times are 
obtained (offsets resolved as unknown cause the issue):

“Actually, I think the bug is more subtle.  What happens when a consumed topic 
stops receiving messages?  The smallest timestamp will always be the static 
timestamp of this topic.

-David

On 10/7/16, 5:03 PM, "David Garcia"  wrote:

Ok I found the bug.  Basically, if there is an empty topic (in the list of 
topics being consumed), any partition-group with partitions from the topic will 
always return -1 as the smallest timestamp (see PartitionGroup.java).

To reproduce, simply start a kstreams consumer with one or more empty 
topics.  Punctuate will never be called.

-David ”

On 10/10/16, 1:55 AM, "Michael Noll"  wrote:

> We have run the application (and have confirmed data is being received)
for over 30 mins…with a 60-second timer.

Ok, so your app does receive data but punctuate() still isn't being called.
:-(


> So, do we need to just rebuild our cluster with bigger machines?

That's worth trying out.  See

http://www.confluent.io/blog/design-and-deployment-considerations-for-deploying-apache-kafka-on-aws/
for some EC2 instance types recommendations.

But I'd also suggest to look into the logs of (1) your application, (2) the
log files of the Kafka broker(s), and (3) the log files of ZooKeeper to see
whether you see anything suspicious?

Sorry for not being able to provide more actionable feedback at this
point.  Typically we have seen such issues only (but not exclusively) in
cases where there have been problems in the environment in which your
application is running and/or the environment of the Kafka clusters.
Unfortunately these environment problems are a bit tricky to debug remotely
via the mailing list.

-Michael





On Fri, Oct 7, 2016 at 8:11 PM, David Garcia  wrote:

> Yeah, this is possible.  We have run the application (and have confirmed
> data is being received) for over 30 mins…with a 60-second timer.  So, do 
we
> need to just rebuild our cluster with bigger machines?
>
> -David
>
> On 10/7/16, 11:18 AM, "Michael Noll"  wrote:
>
> David,
>
> punctuate() is still data-driven at this point, even when you're using
> the
> WallClock timestamp extractor.
>
> To use an example: Imagine you have configured punctuate() to be run
> every
> 5 seconds.  If there's no data being received for a minute, then
> punctuate
> won't be called -- even though you probably would have expected this 
to
> happen 12 times during this 1 minute.
>
> (FWIW, there's an ongoing discussion to improve punctuate(), part of
> which
> is motivated by the current behavior that arguably is not very
> intuitive to
> many users.)
>
> Could this be the problem you're seeing?  See also the related
> discussion
> at
> http://stackoverflow.com/questions/39535201/kafka-problems-with-
> timestampextractor
> .
>
>
>
>
>
>
> On Fri, Oct 7, 2016 at 6:07 PM, David Garcia 
> wrote:
>
> > Hello, I’m sure this question has been asked many times.
> > We have a test-cluster (confluent 3.0.0 release) of 3 aws
> m4.xlarges.  We
> > have an application that needs to use the punctuate() function to do
> some
> > work on a regular interval.  We are using the WallClock extractor.
> > Unfortunately, the method is never called.  I have checked the
> > filedescriptor setting for both the user as well as the process, and
> > everything seems to be fine.  Is this a known bug, or is there
> something
> > obvious I’m missing?
> >
> > One note, the application used to work on this cluster, but now it’s
> not
> > working.  Not really sure what is going on?
> >
> > -David
> >
>
>
>




Hard delete topics

2016-10-10 Thread Sachin Mittal
Hi,
We are doing some testing and need to frequently wipe out the kafka logs
and delete the topic completely. There is no problem starting/stopping
zookeeper and server multiple times.

So what is the best way of purging a topic and removing its reference from
zookeeper entirely.

I can physically delete the logs dir, but what else I need to do to remove
the topic.

when I do:
kafka-topics.sh--zookeeper localhost:2181 --delete --topic topic-name it
only marks the topic for deleting but does not delete it.

Thanks
Sachin


Re: Data loss when ack != -1

2016-10-10 Thread Andrew Grasso
Hi Justin,

Setting the required acks to -1 does not require that all assigned brokers
are available, only that all members of the ISR are available. If a broker
goes down, the producer is able to commit messages once the faulty broker
is evicted from the ISR. This can continue even if only one broker is
alive, in which case only that broker will be eligible to be leader. If
you'd like to ensure that all committed messages are present on at least N
machines, set min.insync.replicas to N and required acks to -1.

-Andrew

On Fri, Oct 7, 2016 at 5:05 PM, Justin Lin  wrote:

> Hi everyone,
>
> I am currently running kafka 0.8.1.1 in a cluster, with 6 brokers and i set
> the replication factor to 3. My producer set the ack to be 2 when producing
> messages. I recently came across a bad situation that i had to reboot one
> broker machine by shutdown the power, and that caused data loss.
>
> This is what actually happened.
>
> Producer 1(PD1) sends message (M100) to Partition 10 (leader h1, ISR h1,
> h2, h3) and since the ack == 2, so as long as there are two brokers
> acknowledged, M100 is considered as committed and ready for consumer.  So
> h1 and h2 got M100 and consumer (C1) pulls M100 down and handle the
> message. So far so good, we are just waiting for h3 to catch up.
> But before that, h1 gets shutdown and h3 doesn't get the change the get
> M100, while still in ISR. So partition 88 will choose a new leader from h2
> and h3. And somehow (randomly) it chooses h3 so M100 in h2 will be
> truncated and the data is lost.
> But this is not the worst part, because consumer C1 already got M100. After
> C1 handled the message it commits it's offset(100) back to a key value
> store and started to pull message 101 from new leader h3. Since h3 doesn't
> have the M100, it responded with error "Offset out of bound".
> Now Producer PD1 Keeps producing messages to partition 88, say it produces
> two message (M1 and M2), The offset of M1 and M2 in h3 is 100 and 101. Now
> consumer C1 pulls the messages from h3 at offset 101, it sees one message
> M2. There M1 will never be processed by consumer.
>
> This is extremely bad because the producer get acknowledgement but the
> consumer will never be able to process the message.
>
> I googled a bit on how to solve the problem. Most of the post suggest to
> change the ack to be -1(all). That is also prone to failure since now if
> one broker is down, producers will lose the ability to produce any data.
>
> I want to seek for more wisdom on how to solve this problem in the
> community. Any idea or previous experience is welcome.
>
> Thanks ahead.
>
> --
> come on
>


Re: Understanding how joins work in Kafka streams

2016-10-10 Thread Eno Thereska
Hi Sachin,

Yes it will be called each time a key is modified, it will do this continuously 
until you stop the app.

Eno
> On 9 Oct 2016, at 16:50, Sachin Mittal  wrote:
> 
> Hi,
> It is actually a KTable-KTable join.
> 
> I have a stream (K1, A) which is aggregated as (Key, List) hence it
> creates a KTable.
> I have another stream (K2, B) which is aggregated as (Key, List) hence
> it creates another KTable.
> 
> Then I have
> KTable (Key, List).leftJoin(  KTable(Key, List),   ValueJoiner {
> new List = merge (List, List)
>return (Key, new List)
> }).to("new-result-topic)
> 
> So what I understand is that every time ValueJoiner is called, it picks the
> latest modified List and List for a key and merges them and then
> updates the new-result-topic with new modified list for same key.
> 
> So then when I do KStream("new-result-topic).forEach((key, List)
>//this callback is called multiple times for same key kx and each time
> it contains new modified List (as and when it gets modified by above
> process)
> });
> 
> So please let me know if my understanding is correct. I suppose it will be
> called every time a key is modified or it buffers the changes and calls it
> once in a given time span.
> 
> Thanks
> Sachin
> 
> 
> 
> On Sun, Oct 9, 2016 at 3:07 PM, Eno Thereska  wrote:
> 
>> Hi Sachin,
>> 
>> Some comments inline:
>> 
>>> On 9 Oct 2016, at 08:19, Sachin Mittal  wrote:
>>> 
>>> Hi,
>>> I needed some light on how joins actually work on continuous stream of
>> data.
>>> 
>>> Say I have 2 topics which I need to join (left join).
>>> Data record in each topic is aggregated like (key, value) <=> (string,
>> list)
>>> 
>>> Topic 1
>>> key1: [A01, A02, A03, A04 ..]
>>> Key2: [A11, A12, A13, A14 ..]
>>> 
>>> 
>>> Topic 2
>>> key1: [B01, B02, B03, B04 ..]
>>> Key2: [B11, B12, B13, B14 ..]
>>> 
>>> 
>>> Joined topic
>>> Key1: [A01, B01...]
>>> Key2: [A11, B11 ...]
>>> 
>>> Now let us say I get 2 records [Key1: A05] & [Key1: B05]
>>> So as per aggregation they are appended to the Topic 1 and Topic 2.
>>> 
>>> I assume this will again call the join operation and the records would
>> get
>>> appended to Key1 data? Let me know if my understanding is correct here.
>> 
>> Yes, that is correct. The join operation is continuously called each time
>> there are new records consumed from the topic. The consuming happens
>> continuously too.
>> 
>>> 
>>> If I am reading the joined topic using foreach will I again get record
>> for
>>> key1 with new appended data in the original list so now my record is
>>> Key1: [A01, B01..., A05, B05 ... ]
>> 
>> Correct.
>> 
>> 
>>> 
>>> What I wanted to ask was in case of reading each record from a topic, if
>>> the value against that key is modified will it be read again (if it was
>>> read before also)?
>>> Or the record is read only once via that stream program?
>> 
>> So this depends on how the value for a key is modified. I'm assuming a new
>> record with the new value is produced to the topic. There will be two broad
>> options here:
>> 
>> - if you are doing a KSTream-KStream join, the "time" when the new value
>> is updated will matter (these kinds of joins are done with a time boundary,
>> e.g., join everything within a time difference of 10 minutes). E.g., say
>> the join result so far is Key1: [A01, B01..., A05, B05 ... ]. If the value
>> for Key1 is now [B06] then the output will depend on the time of the join.
>> - if you are doing a KStream-KTable join, it depends on whether the value
>> change happens on the KStream or KTable.
>> 
>> Before going further, could you clarify if you'll have a KStream-KStream
>> join or a KStream-KTable join?
>> 
>> Thanks
>> Eno
>> 
>>> 
>>> Please let me know how such a scenario works.
>>> 
>>> Thanks
>>> Sachin
>> 
>> 



Re: Support for Kafka

2016-10-10 Thread Jens Rantil
Hi Syed,

Apache Kafka runs on a JVM. I think the question you should ask is -- which
JVM does Apache Kafka require in production*? It doesn't really depend on
anything on a specific Linux distribution.

* ...and I don't have that answer ;-)

Cheers,
Jens

On Wednesday, October 5, 2016, Syed Hussaini <
syed.hussa...@theexchangelab.com> wrote:

> Dear Kafka team.
>
> I am in the Implementation stage of Kafka cluster and looking to find
> out does Apache Kafka supported for Ubuntu 16.04 LTS – Xenial.
>
>
>
> Would be great if you please let us know.
>
>
>
>
>
> [image: The Exchange Lab] 
>
> *Syed Hussaini*
> Infrastructure Engineer
>
> 1 Neathouse Place
> 5th Floor
> London, England, SW1V 1LH
>
>
> syed.hussa...@theexchangelab.com
> 
>
> T 0203 701 3177
>
>
> --
>
> Follow us on Twitter: @exchangelab  | Visit
> us on LinkedIn: The Exchange Lab
> 
>
>
>
>
>


-- 
Jens Rantil
Backend engineer
Tink AB

Email: jens.ran...@tink.se
Phone: +46 708 84 18 32
Web: www.tink.se

Facebook  Linkedin

 Twitter 


回复:Re: I found kafka lsot message

2016-10-10 Thread yangyuqi
Hi Guozhang,
At first, thank you answer my question, and give me some suggest.But, I'm sure 
I readed some introduction about kafka.
In my producer, My Code is( c code):res = rd_kafka_conf_set(kafka_conf, 
"request.required.acks", "-1", NULL, 0);res = rd_kafka_topic_conf_set( 
topic_conf, "produce.offset.report", "true", errstr, sizeof(errstr) );
In my consumer, My Code is(kafka-python):
self.__consumer = KafkaConsumer( 
bootstrap_servers=self.__kafka_config["hosts"], 
   group_id=self.__kafka_config["group"],   
 auto_offset_reset="earliest",  
  )
self.__consumer.subscribe([self.__kafka_config["data_topic"]])
for message in self.__consumer:

I think these codes is common, What's your suggest about these codes?
In the end, I must explain: Lost message is not often, some time, couple days 
can find one or two lost messages.But some day, maybe can find over 20 messages 
were losted.Our message over 1,200,000 in one day.  
So, do your have any suggest?Bye the way, can you speak Chinese? Thank you very 
much & Best wishesJerry




- 原始邮件 -
发件人:Guozhang Wang 
收件人:"users@kafka.apache.org" , yangy...@sina.com
主题:Re: I found kafka lsot message
日期:2016年10月10日 01点25分

Jerry,
Message lost scenarios usually are due to producer and consumer mis-configured. 
Have you read about the client configs web docs?
http://kafka.apache.org/documentation#producerconfigs

http://kafka.apache.org/documentation#newconsumerconfigs


If not I'd suggest you reading those first and see if you can tune some of 
these configs to have better delivery guarantees.
Guozhang

On Fri, Oct 7, 2016 at 9:48 PM,   wrote:
Hello every body,I build a kafka cluster(include 5 domains) use 
kafka_2.11-0.10.0.0 and kafka-python-1.3.1.I create a topic by 100 partitions 
and 2 replicate, then use 20 consumers to receive message.

But, I found sometime the kafka lost message! I found some partition's offsite 
lost at consumer.After, I make a global index for every message from producer 
for confirm this problem, and I also found the global index had been break!

Why the kafka lost message?  What can I do to fix the problem?

Thanks!Jerry









-- 
-- Guozhang





Re: sFlow/NetFlow/Pcap Plugin for Kafka Producer

2016-10-10 Thread Michael Noll
Aris,

I am not aware of an out of the box tool for Pcap->Kafka ingestion (in my
case back then we wrote our own).  Maybe others know.



On Monday, October 10, 2016, Aris Risdianto  wrote:

> Thank you for answer Michael.
>
> Actually, I have made a simple producer from Pcap to Kafka. Since it is not
> structured, so it is difficult for further processing by a consumer. But, I
> will take a look at Avro as you mentioned.
>
> I just wondering, if there are any proper implementation for this
> requirement, because I couldn't find any tool in the kafka ecosystem page.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem
>
>
> Best Regards,
> Aris.
>
>
> On Mon, Oct 10, 2016 at 6:55 PM, Michael Noll  > wrote:
>
> > Aris,
> >
> > even today you can already use Kafka to deliver Netflow/Pcap/etc.
> messages,
> > and people are already using it for that (I did that in previous projects
> > of mine, too).
> >
> > Simply encode your Pcap/... messages appropriately (I'd recommend to
> take a
> > look at Avro, which allows you to structure your data similar to e.g.
> > Pcap's native format [1]), and then write the encoded messages to Kafka.
> > Your downstream applications can then read the encoded messages back from
> > Kafka, decode, and commence processing.
> >
> > That was a brief summary to get you started, feel free to take a look at
> > the Apache Kafka docs at kafka.apache.org and/or ask further questions
> > here.
> >
> > -Michael
> >
> >
> >
> >
> > [1] https://wiki.wireshark.org/Development/LibpcapFileFormat
> >
> > On Mon, Oct 10, 2016 at 11:19 AM, Aris Risdianto  > wrote:
> >
> > > ​Hello,
> > >
> > >
> > > ​Is there any plan or implementation to use Kafka for delivering
> > > sFlow/NetFlow/Pcap messages?
> > >
> > >
> > > Best Regards,
> > > Aris.
> > >
> >
>


-- 
*Michael G. Noll*
Product Manager | Confluent
+1 650 453 5860 | @miguno 
Follow us: Twitter  | Blog



Re: sFlow/NetFlow/Pcap Plugin for Kafka Producer

2016-10-10 Thread Aris Risdianto
Thank you for answer Michael.

Actually, I have made a simple producer from Pcap to Kafka. Since it is not
structured, so it is difficult for further processing by a consumer. But, I
will take a look at Avro as you mentioned.

I just wondering, if there are any proper implementation for this
requirement, because I couldn't find any tool in the kafka ecosystem page.

https://cwiki.apache.org/confluence/display/KAFKA/Ecosystem


Best Regards,
Aris.


On Mon, Oct 10, 2016 at 6:55 PM, Michael Noll  wrote:

> Aris,
>
> even today you can already use Kafka to deliver Netflow/Pcap/etc. messages,
> and people are already using it for that (I did that in previous projects
> of mine, too).
>
> Simply encode your Pcap/... messages appropriately (I'd recommend to take a
> look at Avro, which allows you to structure your data similar to e.g.
> Pcap's native format [1]), and then write the encoded messages to Kafka.
> Your downstream applications can then read the encoded messages back from
> Kafka, decode, and commence processing.
>
> That was a brief summary to get you started, feel free to take a look at
> the Apache Kafka docs at kafka.apache.org and/or ask further questions
> here.
>
> -Michael
>
>
>
>
> [1] https://wiki.wireshark.org/Development/LibpcapFileFormat
>
> On Mon, Oct 10, 2016 at 11:19 AM, Aris Risdianto  wrote:
>
> > ​Hello,
> >
> >
> > ​Is there any plan or implementation to use Kafka for delivering
> > sFlow/NetFlow/Pcap messages?
> >
> >
> > Best Regards,
> > Aris.
> >
>


Re: sFlow/NetFlow/Pcap Plugin for Kafka Producer

2016-10-10 Thread Michael Noll
Aris,

even today you can already use Kafka to deliver Netflow/Pcap/etc. messages,
and people are already using it for that (I did that in previous projects
of mine, too).

Simply encode your Pcap/... messages appropriately (I'd recommend to take a
look at Avro, which allows you to structure your data similar to e.g.
Pcap's native format [1]), and then write the encoded messages to Kafka.
Your downstream applications can then read the encoded messages back from
Kafka, decode, and commence processing.

That was a brief summary to get you started, feel free to take a look at
the Apache Kafka docs at kafka.apache.org and/or ask further questions here.

-Michael




[1] https://wiki.wireshark.org/Development/LibpcapFileFormat

On Mon, Oct 10, 2016 at 11:19 AM, Aris Risdianto  wrote:

> ​Hello,
>
>
> ​Is there any plan or implementation to use Kafka for delivering
> sFlow/NetFlow/Pcap messages?
>
>
> Best Regards,
> Aris.
>


sFlow/NetFlow/Pcap Plugin for Kafka Producer

2016-10-10 Thread Aris Risdianto
​Hello,


​Is there any plan or implementation to use Kafka for delivering
sFlow/NetFlow/Pcap messages?


Best Regards,
Aris.


Re: Convert a KStream to KTable

2016-10-10 Thread Michael Noll
Elias,

yes, that is correct.

I also want to explain why:

One can always convert a KTable to a KStream (note: this is the opposite
direction of what you want to do) because one only needs to iterate through
the table to generate the stream.

To convert a KStream into a KTable (what you want to do) the user must tell
Kafka how multiple values of the same key should be processed to end up
with just a single value for that key (a KTable has only a single value for
each key).  This step -- the semantics desired by the user of how to
"squash" multiple values into a single value -- is something that Kafka
cannot second-guess.  Some users want to compute a sum and thus add all the
values for a key into a final "sum" value;  others may want to compute
MAX(), and so on.  This explains why there's no KStream#toTable()
equivalent to KTable#toStream().

Hope this helps!
Michael





On Sat, Oct 8, 2016 at 5:03 AM, Elias Levy 
wrote:

> I am correct in assuming there is no way to convert a KStream into a
> KTable, similar to KTable.toStream() but in the reverse direction, other
> than using KSteam.reduceByKey and a Reducer or looping back through Kafka
> and using KStreamBuilder.table?
>


Re: Kafka null keys - OK or a problem?

2016-10-10 Thread Michael Noll
Depends on which partitioner you are using, see [1] and [2].  From what I
understand the `NewHashPartitioner` comes closest to the behavior of Kafka
Java producer, but instead of going round-robin for null-keyed messages it
picks a partition at random.



[1] https://godoc.org/github.com/Shopify/sarama#Partitioner
[2] https://github.com/Shopify/sarama/blob/master/partitioner.go



On Mon, Oct 10, 2016 at 8:51 AM, Ali Akhtar  wrote:

> Hey Michael,
>
> We're using this one: https://github.com/Shopify/sarama
>
> Any ideas how that one works?
>
> On Mon, Oct 10, 2016 at 11:48 AM, Michael Noll 
> wrote:
>
> > FYI: Kafka's new Java producer (which ships with Kafka) the behavior is
> as
> > follows:  If no partition is explicitly specified (to send the message
> to)
> > AND the key is null, then the DefaultPartitioner [1] will assign messages
> > to topic partitions in a round-robin fashion.  See the javadoc and also
> the
> > little bit of code in [1] for details.
> >
> > Not sure which Go client you're using exactly so I can't tell whether
> your
> > Go client follows the behavior of Kafka's Java producer.
> >
> > -Michael
> >
> >
> >
> >
> > [1]
> > https://github.com/apache/kafka/blob/trunk/clients/src/
> > main/java/org/apache/kafka/clients/producer/internals/
> > DefaultPartitioner.java
> >
> >
> > On Mon, Oct 10, 2016 at 7:53 AM, Ali Akhtar 
> wrote:
> >
> > > If keys are null, what happens in terms of partitioning, is the load
> > spread
> > > evenly..?
> > >
> > > On Mon, Oct 10, 2016 at 7:59 AM, Gwen Shapira 
> wrote:
> > >
> > > > Kafka itself supports null keys. I'm not sure about the Go client you
> > > > use, but Confluent's Go client also supports null keys
> > > > (https://github.com/confluentinc/confluent-kafka-go/).
> > > >
> > > > If you decide to generate keys and you want even spread, a random
> > > > number generator is probably your best bet.
> > > >
> > > > Gwen
> > > >
> > > > On Sun, Oct 9, 2016 at 6:05 PM, Ali Akhtar 
> > wrote:
> > > > > A kafka producer written elsewhere that I'm using, which uses the
> Go
> > > > kafka
> > > > > driver, is sending messages where the key is null.
> > > > >
> > > > > Is this OK - or will this cause issues due to partitioning not
> > > happening
> > > > > correctly?
> > > > >
> > > > > What would be a good way to generate keys in this case, to ensure
> > even
> > > > > partition spread?
> > > > >
> > > > > Thanks.
> > > >
> > > >
> > > >
> > > > --
> > > > Gwen Shapira
> > > > Product Manager | Confluent
> > > > 650.450.2760 | @gwenshap
> > > > Follow us: Twitter | blog
> > > >
> > >
> >
>