Re: Mirror Maker - Message Format Issue?

2016-10-12 Thread Craig Swift
Hello,

Just to close this issue out. The 8 producer going to the 10 cluster was
the root issue. The mirror maker by default was unable to produce the
message to the destination cluster. The work around was to include a
MirrorMakerMessageHandler
that did nothing but repackage the message again. In the future it might be
nice if the mirror handled this auto-magically, but at least the ability to
alter the behavior provided an easy fix. Hope this helps someone else,
thanks.

Craig

Hello,
>
> I think we're misunderstanding the docs on some level and I need a little
> clarification. We have the following setup:
>
> 1) 0.8.2 producer -> writing to Kafka 0.10.0.1 cluster w/ version 10
> message format (source cluster).
> 2) 0.10.0.1 mirror using the 'new consumer' reading from the source
> cluster and writing to Kafka 0.10.0.1 cluster w/version 0.8.2 message
> format (destination cluster). We need some of the features like SSL, hence
> using the new consumer.
> 3) Lots of old 0.8.2 consumers reading from the destination cluster that
> still need to be upgraded.
>
> We're seeing errors from the mirror maker when trying to produce to the
> destination cluster like the following:
>
> java.lang.IllegalArgumentException: Invalid timestamp -1
> at org.apache.kafka.clients.producer.ProducerRecord.
> (ProducerRecord.java:60)
>
> Is the root problem the 0.8.2 producer sending data to the source cluster
> or the new 10 mirror writing data to the destination cluster in 0.8.2
> format? From the docs we were under the impression that the data would be
> stored in the source cluster in 10 format regardless of the producer and
> the mirror could produce to the destination cluster regardless of it's
> message format setting.
>
> Is this current setup non-functional or is there a way to make this work?
> For example, if the mirror producing is the issue could we implement a
> custom MirrorMakerMessageHandler? Any advice and clarification would be
> helpful, thanks.
>
> Craig
>


Re: HL7 messages to Kafka consumer

2016-10-12 Thread Artem Ervits
Nifi HL7 processor is built using HAPI API, which supports z-segments
http://hl7api.sourceforge.net/xref/ca/uhn/hl7v2/examples/CustomModelClasses.html


On Wed, Oct 12, 2016 at 10:10 PM, Martin Gainty  wrote:

>
>
>
> > From: dbis...@gmail.com
> > Date: Wed, 12 Oct 2016 20:42:04 -0400
> > Subject: RE: HL7 messages to Kafka consumer
> > To: users@kafka.apache.org
> >
> > I did it with HAPI API and Kafka producer way back when and it worked
> well.
> > Times have changed, If you consider using Apache Nifi, besides native HL7
> > processor,
> MG>since this is where i get 99% of the applications i work on I have to
> ask will Nifi process Z segments?
> MG>if Nifi does not not process  Z segments you might want to delay being
> a Nifi evangelist and go with
> MG>aforementioned solution
>  you can push to Kafka by dragging a processor on canvas. HL7
> > processor also is built on HAPI API. Here's an example but instead of
> Kafka
> > it's pushing to Solr, replacing solr processor with Kafka will do a
> trick.
> MG>kafka server.properties does support a zk provider so kafka server can
> ingest resultset(s) from zk
> # Zookeeper #
> # Zookeeper connection string (see zookeeper docs for details).# This is a
> comma separated host:port pairs, each corresponding to a zk# server. e.g. "
> 127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an
> optional chroot string to the urls to specify the# root directory for all
> kafka znodes.
> zookeeper.connect=localhost:2181
> # Timeout in ms for connecting to zookeeper
> zookeeper.connection.timeout.ms=6000
> MG>kafkas clear advantage over zk is to control flow by pausing or
> resuming partitions to your kafka consumer
> MG>possible side-effect of relying only on zks provider would disable this
> control-flow capability of kafka
> > Old and new consumer API is available.
> >
> > https://community.hortonworks.com/articles/20318/visualize-
> patients-complaints-to-their-doctors-usi.html
> >
> > On Oct 12, 2016 4:33 PM, "Martin Gainty"  wrote:
> >
> > > provisionally accomplished task by embedding A01,A03 and A08 HL7
> > > Event-types into SOAP 1.2 Envelopes
> > > I remember having difficulty transporting over a non-dedicated
> transport
> > > such as what Kafka implements
> > > Producer Embeds Fragment1 into SOAPEnvelope
> > > Producer Sends Fragment1-SOAPEnvelope of A01
> > > Consumer pulls Fragment1 of A01 from SOAP1.2 Body and places
> SOAPEnvelope
> > > into cache
> > > Consumer quiesces connection presumably so other SOAP 1.2 messages can
> be
> > > transported
> > > Consumer re-activates connection when sufficient bandwidth
> detected(higher
> > > priirity SOAP1.2 envelopes have been transmitted)
> > > Producer Embed Fragment2 into SOAPEnvelope
> > >
> > > Producer Sends Fragment2-SOAPEnvelope of A01
> > > Consumer pulls Fragment2 of A01 from SOAP1.2Body and places into cache
> > > When Consumer detects EOT Consumer aggregates n Fragments from cache to
> > > all-inclusive A01 event
> > > Consumer parses A01 to segments
> > > Consumer parses attributes of each segment
> > > Consumer insert(s)/update(s) segment-attribute(s)  into database
> > > Consumer displays updated individual segment-attributes to UI and or
> > > displays inserted segment-attributes to UI
> > >
> > > Clear?Martin
> > > __
> > >
> > >
> > >
> > > > From: samglo...@cloudera.com
> > > > Date: Wed, 12 Oct 2016 09:22:32 -0500
> > > > Subject: HL7 messages to Kafka consumer
> > > > To: users@kafka.apache.org
> > > >
> > > > Has anyone done this?   I'm working with medical hospital company
> that
> > > > wants to ingest HL7 messages into Kafka cluster, topics.
> > > >
> > > > Any guidance appreciated.
> > > >
> > > > --
> > > > *Sam Glover*
> > > > Solutions Architect
> > > >
> > > > *M*   512.550.5363 samglo...@cloudera.com
> > > > 515 Congress Ave, Suite 1212 | Austin, TX | 78701
> > > > Celebrating a decade of community accomplishments
> > > > cloudera.com/hadoop10
> > > > #hadoop10
> > >
>
>


RE: HL7 messages to Kafka consumer

2016-10-12 Thread Martin Gainty



> From: dbis...@gmail.com
> Date: Wed, 12 Oct 2016 20:42:04 -0400
> Subject: RE: HL7 messages to Kafka consumer
> To: users@kafka.apache.org
> 
> I did it with HAPI API and Kafka producer way back when and it worked well.
> Times have changed, If you consider using Apache Nifi, besides native HL7
> processor,
MG>since this is where i get 99% of the applications i work on I have to ask 
will Nifi process Z segments?
MG>if Nifi does not not process  Z segments you might want to delay being a 
Nifi evangelist and go with 
MG>aforementioned solution
 you can push to Kafka by dragging a processor on canvas. HL7
> processor also is built on HAPI API. Here's an example but instead of Kafka
> it's pushing to Solr, replacing solr processor with Kafka will do a trick.
MG>kafka server.properties does support a zk provider so kafka server can 
ingest resultset(s) from zk 
# Zookeeper #
# Zookeeper connection string (see zookeeper docs for details).# This is a 
comma separated host:port pairs, each corresponding to a zk# server. e.g. 
"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".# You can also append an 
optional chroot string to the urls to specify the# root directory for all kafka 
znodes.
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
MG>kafkas clear advantage over zk is to control flow by pausing or resuming 
partitions to your kafka consumer
MG>possible side-effect of relying only on zks provider would disable this 
control-flow capability of kafka
> Old and new consumer API is available.
> 
> https://community.hortonworks.com/articles/20318/visualize-patients-complaints-to-their-doctors-usi.html
> 
> On Oct 12, 2016 4:33 PM, "Martin Gainty"  wrote:
> 
> > provisionally accomplished task by embedding A01,A03 and A08 HL7
> > Event-types into SOAP 1.2 Envelopes
> > I remember having difficulty transporting over a non-dedicated transport
> > such as what Kafka implements
> > Producer Embeds Fragment1 into SOAPEnvelope
> > Producer Sends Fragment1-SOAPEnvelope of A01
> > Consumer pulls Fragment1 of A01 from SOAP1.2 Body and places SOAPEnvelope
> > into cache
> > Consumer quiesces connection presumably so other SOAP 1.2 messages can be
> > transported
> > Consumer re-activates connection when sufficient bandwidth detected(higher
> > priirity SOAP1.2 envelopes have been transmitted)
> > Producer Embed Fragment2 into SOAPEnvelope
> >
> > Producer Sends Fragment2-SOAPEnvelope of A01
> > Consumer pulls Fragment2 of A01 from SOAP1.2Body and places into cache
> > When Consumer detects EOT Consumer aggregates n Fragments from cache to
> > all-inclusive A01 event
> > Consumer parses A01 to segments
> > Consumer parses attributes of each segment
> > Consumer insert(s)/update(s) segment-attribute(s)  into database
> > Consumer displays updated individual segment-attributes to UI and or
> > displays inserted segment-attributes to UI
> >
> > Clear?Martin
> > __
> >
> >
> >
> > > From: samglo...@cloudera.com
> > > Date: Wed, 12 Oct 2016 09:22:32 -0500
> > > Subject: HL7 messages to Kafka consumer
> > > To: users@kafka.apache.org
> > >
> > > Has anyone done this?   I'm working with medical hospital company that
> > > wants to ingest HL7 messages into Kafka cluster, topics.
> > >
> > > Any guidance appreciated.
> > >
> > > --
> > > *Sam Glover*
> > > Solutions Architect
> > >
> > > *M*   512.550.5363 samglo...@cloudera.com
> > > 515 Congress Ave, Suite 1212 | Austin, TX | 78701
> > > Celebrating a decade of community accomplishments
> > > cloudera.com/hadoop10
> > > #hadoop10
> >
  

RE: HL7 messages to Kafka consumer

2016-10-12 Thread Artem Ervits
I did it with HAPI API and Kafka producer way back when and it worked well.
Times have changed, If you consider using Apache Nifi, besides native HL7
processor, you can push to Kafka by dragging a processor on canvas. HL7
processor also is built on HAPI API. Here's an example but instead of Kafka
it's pushing to Solr, replacing solr processor with Kafka will do a trick.
Old and new consumer API is available.

https://community.hortonworks.com/articles/20318/visualize-patients-complaints-to-their-doctors-usi.html

On Oct 12, 2016 4:33 PM, "Martin Gainty"  wrote:

> provisionally accomplished task by embedding A01,A03 and A08 HL7
> Event-types into SOAP 1.2 Envelopes
> I remember having difficulty transporting over a non-dedicated transport
> such as what Kafka implements
> Producer Embeds Fragment1 into SOAPEnvelope
> Producer Sends Fragment1-SOAPEnvelope of A01
> Consumer pulls Fragment1 of A01 from SOAP1.2 Body and places SOAPEnvelope
> into cache
> Consumer quiesces connection presumably so other SOAP 1.2 messages can be
> transported
> Consumer re-activates connection when sufficient bandwidth detected(higher
> priirity SOAP1.2 envelopes have been transmitted)
> Producer Embed Fragment2 into SOAPEnvelope
>
> Producer Sends Fragment2-SOAPEnvelope of A01
> Consumer pulls Fragment2 of A01 from SOAP1.2Body and places into cache
> When Consumer detects EOT Consumer aggregates n Fragments from cache to
> all-inclusive A01 event
> Consumer parses A01 to segments
> Consumer parses attributes of each segment
> Consumer insert(s)/update(s) segment-attribute(s)  into database
> Consumer displays updated individual segment-attributes to UI and or
> displays inserted segment-attributes to UI
>
> Clear?Martin
> __
>
>
>
> > From: samglo...@cloudera.com
> > Date: Wed, 12 Oct 2016 09:22:32 -0500
> > Subject: HL7 messages to Kafka consumer
> > To: users@kafka.apache.org
> >
> > Has anyone done this?   I'm working with medical hospital company that
> > wants to ingest HL7 messages into Kafka cluster, topics.
> >
> > Any guidance appreciated.
> >
> > --
> > *Sam Glover*
> > Solutions Architect
> >
> > *M*   512.550.5363 samglo...@cloudera.com
> > 515 Congress Ave, Suite 1212 | Austin, TX | 78701
> > Celebrating a decade of community accomplishments
> > cloudera.com/hadoop10
> > #hadoop10
>


Manually update consumer offset stored in Kafka

2016-10-12 Thread Yifan Ying
Hi,

In old consumers, we use the following command line tool to manually update
offsets stored in zk:

*./kafka-run-class.sh kafka.tools.UpdateOffsetsInZK [latest | earliest]
[consumer.properties file path] [topic]*

But it doesn't work with offsets stored in Kafka. How can I update the
Kafka offsets to latest?

Yifan

-- 
Yifan


RE: HL7 messages to Kafka consumer

2016-10-12 Thread Martin Gainty
provisionally accomplished task by embedding A01,A03 and A08 HL7 Event-types 
into SOAP 1.2 Envelopes
I remember having difficulty transporting over a non-dedicated transport such 
as what Kafka implements
Producer Embeds Fragment1 into SOAPEnvelope
Producer Sends Fragment1-SOAPEnvelope of A01
Consumer pulls Fragment1 of A01 from SOAP1.2 Body and places SOAPEnvelope into 
cache
Consumer quiesces connection presumably so other SOAP 1.2 messages can be 
transported
Consumer re-activates connection when sufficient bandwidth detected(higher 
priirity SOAP1.2 envelopes have been transmitted)
Producer Embed Fragment2 into SOAPEnvelope

Producer Sends Fragment2-SOAPEnvelope of A01
Consumer pulls Fragment2 of A01 from SOAP1.2Body and places into cache
When Consumer detects EOT Consumer aggregates n Fragments from cache to 
all-inclusive A01 event
Consumer parses A01 to segments
Consumer parses attributes of each segment 
Consumer insert(s)/update(s) segment-attribute(s)  into database
Consumer displays updated individual segment-attributes to UI and or displays 
inserted segment-attributes to UI

Clear?Martin 
__ 



> From: samglo...@cloudera.com
> Date: Wed, 12 Oct 2016 09:22:32 -0500
> Subject: HL7 messages to Kafka consumer
> To: users@kafka.apache.org
> 
> Has anyone done this?   I'm working with medical hospital company that
> wants to ingest HL7 messages into Kafka cluster, topics.
> 
> Any guidance appreciated.
> 
> -- 
> *Sam Glover*
> Solutions Architect
> 
> *M*   512.550.5363 samglo...@cloudera.com
> 515 Congress Ave, Suite 1212 | Austin, TX | 78701
> Celebrating a decade of community accomplishments
> cloudera.com/hadoop10
> #hadoop10
  

Re: Understanding out of order message processing w/ Streaming

2016-10-12 Thread Ali Akhtar
Thanks Matthias.

So, if I'm understanding this right, Kafka will not discard which messages
which arrive out of order.

What it will do is show messages in the order in which they arrive.

But if they arrive out of order, I have to detect / process that myself in
the processor logic.

Is that correct?

Thanks.

On Wed, Oct 12, 2016 at 11:37 PM, Matthias J. Sax 
wrote:

> -BEGIN PGP SIGNED MESSAGE-
> Hash: SHA512
>
> Last question first: A KTable is basically in finite window over the
> whole stream providing a single result (that gets updated when new
> data arrives). If you use windows, you cut the overall stream into
> finite subsets and get a result per window. Thus, I guess you do not
> need windows (if I understood you use case correctly).
>
> However, current state of Kafka Streams DSL, you will not be able to
> use KTable (directly -- see suggestion to fix this below) because is
> does (currently) not allow to access the timestamp of the current
> record (thus, you can not know if a record is late or not). You will
> need to use Processor API which allows you to access the current
> records timestamp via the Context object given in init()
>
> Your reasoning about partitions and Streams instances is correct.
> However, the following two are not
>
> > - Because I'm using a KTable, the timestamp of the messages is
> > extracted, and I'm not shown the older bid because I've already
> > processed the later bid. The older bid is ignored.
>
> and
>
> > - Because of this, the replica already knows which timestamps it
> > has processed, and is able to ignore the older messages.
>
> Late arriving records are not dropped but processes regularly. Thus,
> your KTable aggregate function will be called for the late arriving
> record, too (but as described about, you have currently no way to know
> it is a later record).
>
>
> Last but not least, you last statement is a valid concern:
>
> > Also, what will happen if bid 2 arrived and got processed, and then
> > the particular replica crashed, and was restarted. The restarted
> > replica won't have any memory of which timestamps it has previously
> > processed.
> >
> > So if bid 2 got processed, replica crashed and restarted, and then
> > bid 1 arrived, what would happen in that case?
>
> In order to make this work, you would need to store the timestamp in
> you store next to the actual data. Thus, you can compare the timestamp
> of the latest result (safely stored in operator state) with the
> timestamp of the current record.
>
> Does this makes sense?
>
> To fix you issue, you could add a .transformValue() before you KTable,
> which allows you to access the timestamp of a record. If you add this
> timestamp to you value and pass it to KTable afterwards, you can
> access it and it gets also store reliably.
>
>  => transformValue =>  timestamp} => aggregate
>
> Hope this helps.
>
> - -Matthias
>
>
> On 10/11/16 9:12 PM, Ali Akhtar wrote:
> > P.S, does my scenario require using windows, or can it be achieved
> > using just KTable?
> >
> > On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar 
> > wrote:
> >
> >> Heya,
> >>
> >> Say I'm building a live auction site, with different products.
> >> Different users will bid on different products. And each time
> >> they do, I want to update the product's price, so it should
> >> always have the latest price in place.
> >>
> >> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
> >> the same product 100 ms later.
> >>
> >> The second bid arrives first and the price is updated to $5. Then
> >> the first bid arrives. I want the price to not be updated in this
> >> case, as this bid is older than the one I've already processed.
> >>
> >> Here's my understanding of how I can achieve this with Kafka
> >> Streaming - is my understanding correct?
> >>
> >> - I have a topic for receiving bids. The topic has N partitions,
> >> and I have N replicas of my application which hooks up w/ Kafka
> >> Streaming, up and running.
> >>
> >> - I assume each replica of my app will listen to a different
> >> partition of the topic.
> >>
> >> - A user makes a bid on product A.
> >>
> >> - This is pushed to the topic with the key bid_a
> >>
> >> - Another user makes a bid. This is also pushed with the same key
> >> (bid_a)
> >>
> >> - The 2nd bid arrives first, and gets processed. Then the first
> >> (older) bid arrives.
> >>
> >> - Because I'm using a KTable, the timestamp of the messages is
> >> extracted, and I'm not shown the older bid because I've already
> >> processed the later bid. The older bid is ignored.
> >>
> >> - All bids on product A go to the same topic partition, and hence
> >> the same replica of my app, because they all have the key bid_a.
> >>
> >> - Because of this, the replica already knows which timestamps it
> >> has processed, and is able to ignore the older messages.
> >>
> >> Is the above understandning correct?
> >>
> >> Also, what will happen if bid 2 arrived 

Re: [kafka-clients] [VOTE] 0.10.1.0 RC2

2016-10-12 Thread Dana Powers
+1; all kafka-python integration tests pass.

-Dana


On Wed, Oct 12, 2016 at 10:41 AM, Jason Gustafson  wrote:
> Hello Kafka users, developers and client-developers,
>
> One more RC for 0.10.1.0. I think we're getting close!
>
> Release plan:
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.
>
> Release notes for the 0.10.1.0 release:
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/RELEASE_NOTES.html
>
> *** Please download, test and vote by Saturday, Oct 15, 11am PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> http://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/
>
> * Javadoc:
> http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/javadoc/
>
> * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc2 tag:
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=8702d66434b86092a3738472f9186d6845ab0720
>
> * Documentation:
> http://kafka.apache.org/0101/documentation.html
>
> * Protocol:
> http://kafka.apache.org/0101/protocol.html
>
> * Tests:
> Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/68/
> System tests:
> http://confluent-kafka-0-10-1-system-test-results.s3-us-west-2.amazonaws.com/2016-10-11--001.1476197348--apache--0.10.1--d981dd2/
>
> Thanks,
>
> Jason
>
> --
> You received this message because you are subscribed to the Google Groups
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to kafka-clients+unsubscr...@googlegroups.com.
> To post to this group, send email to kafka-clie...@googlegroups.com.
> Visit this group at https://groups.google.com/group/kafka-clients.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/kafka-clients/CAJDuW%3DDk7Mi6ZsiniHcdbCCBdBhasjSeb7_N3EW%3D97OrfvFyew%40mail.gmail.com.
> For more options, visit https://groups.google.com/d/optout.


Re: Understanding out of order message processing w/ Streaming

2016-10-12 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512

Last question first: A KTable is basically in finite window over the
whole stream providing a single result (that gets updated when new
data arrives). If you use windows, you cut the overall stream into
finite subsets and get a result per window. Thus, I guess you do not
need windows (if I understood you use case correctly).

However, current state of Kafka Streams DSL, you will not be able to
use KTable (directly -- see suggestion to fix this below) because is
does (currently) not allow to access the timestamp of the current
record (thus, you can not know if a record is late or not). You will
need to use Processor API which allows you to access the current
records timestamp via the Context object given in init()

Your reasoning about partitions and Streams instances is correct.
However, the following two are not

> - Because I'm using a KTable, the timestamp of the messages is
> extracted, and I'm not shown the older bid because I've already
> processed the later bid. The older bid is ignored.

and

> - Because of this, the replica already knows which timestamps it
> has processed, and is able to ignore the older messages.

Late arriving records are not dropped but processes regularly. Thus,
your KTable aggregate function will be called for the late arriving
record, too (but as described about, you have currently no way to know
it is a later record).


Last but not least, you last statement is a valid concern:

> Also, what will happen if bid 2 arrived and got processed, and then
> the particular replica crashed, and was restarted. The restarted
> replica won't have any memory of which timestamps it has previously
> processed.
> 
> So if bid 2 got processed, replica crashed and restarted, and then
> bid 1 arrived, what would happen in that case?

In order to make this work, you would need to store the timestamp in
you store next to the actual data. Thus, you can compare the timestamp
of the latest result (safely stored in operator state) with the
timestamp of the current record.

Does this makes sense?

To fix you issue, you could add a .transformValue() before you KTable,
which allows you to access the timestamp of a record. If you add this
timestamp to you value and pass it to KTable afterwards, you can
access it and it gets also store reliably.

 => transformValue =>  aggregate

Hope this helps.

- -Matthias


On 10/11/16 9:12 PM, Ali Akhtar wrote:
> P.S, does my scenario require using windows, or can it be achieved
> using just KTable?
> 
> On Wed, Oct 12, 2016 at 8:56 AM, Ali Akhtar 
> wrote:
> 
>> Heya,
>> 
>> Say I'm building a live auction site, with different products.
>> Different users will bid on different products. And each time
>> they do, I want to update the product's price, so it should
>> always have the latest price in place.
>> 
>> Example: Person 1 bids $3 on Product A, and Person 2 bids $5 on
>> the same product 100 ms later.
>> 
>> The second bid arrives first and the price is updated to $5. Then
>> the first bid arrives. I want the price to not be updated in this
>> case, as this bid is older than the one I've already processed.
>> 
>> Here's my understanding of how I can achieve this with Kafka
>> Streaming - is my understanding correct?
>> 
>> - I have a topic for receiving bids. The topic has N partitions,
>> and I have N replicas of my application which hooks up w/ Kafka
>> Streaming, up and running.
>> 
>> - I assume each replica of my app will listen to a different
>> partition of the topic.
>> 
>> - A user makes a bid on product A.
>> 
>> - This is pushed to the topic with the key bid_a
>> 
>> - Another user makes a bid. This is also pushed with the same key
>> (bid_a)
>> 
>> - The 2nd bid arrives first, and gets processed. Then the first
>> (older) bid arrives.
>> 
>> - Because I'm using a KTable, the timestamp of the messages is
>> extracted, and I'm not shown the older bid because I've already
>> processed the later bid. The older bid is ignored.
>> 
>> - All bids on product A go to the same topic partition, and hence
>> the same replica of my app, because they all have the key bid_a.
>> 
>> - Because of this, the replica already knows which timestamps it
>> has processed, and is able to ignore the older messages.
>> 
>> Is the above understandning correct?
>> 
>> Also, what will happen if bid 2 arrived and got processed, and
>> then the particular replica crashed, and was restarted. The
>> restarted replica won't have any memory of which timestamps it
>> has previously processed.
>> 
>> So if bid 2 got processed, replica crashed and restarted, and
>> then bid 1 arrived, what would happen in that case?
>> 
>> Thanks.
>> 
> 
-BEGIN PGP SIGNATURE-
Comment: GPGTools - https://gpgtools.org

iQIcBAEBCgAGBQJX/oLPAAoJECnhiMLycopP8akP/3Fo24Xeu1/0LuNdBuwTlJd7
6r9WrSiSbpiVlWoA1dRjSrkjQoUOwgAD6vXji5Jb8BIT5tMi57KQVrTmXWz/feuy
6qJIvfxj8vYdFLTcTOYZKWCEHQK1am2SGkFEeZKY0BbABNqwWzx6lWAJxKlxoBcn

How to specify starting position for consumers when using dynamic group re-balancing?

2016-10-12 Thread Marina
Hi, 
Is it possible to start 0.9 or 0.10 consumers from a specified offset, while 
still using consumer groups with dynamic re-balancing? 

Here is what have found so far: 
Case 1: If we use consumer.assign(…)  method to manually assign partitions to 
consumers - we can do all below actions: 
consumer.seek(, ); or: 
consumer.seekToBeginning(); 
consumer.seekToEnd(); 

Basically, we have full control over which position to start the consumer from, 
BUT at the expense of not having the partition re-assignment done dynamically 
by Kafka 

Case 2: If we use consumer.subscribe(…)  method  - Kafka will manage the 
re-ballancing, however, we cannot do any of the three options above … :( 
So, we tried the following to “hack” around it - at the consumer start up time, 
*before* entering the poll() loop: 

// get coordinator from the private field of the consumer: 
ConsumerCoordinator coordinator = (ConsumerCoordinator) 
FieldUtils.readField(consumer, "coordinator", true); 
// make sure all partitions are already  
coordinator.ensurePartitionAssignment(); 
// get the list of partitions assigned to this specific consumer: 
Set assignedTopicPartitions = consumer.assignment() 
// now we can go ahead and do the same three actions (seek(), sequined() or 
seekToBeginning()) on those partitions only for this consumer as above. 
for (TopicPartition assignedPartition: assignedTopicPartitions) { 
 consumer.seek(, ) // or whatever
...
}
// now start the poll() loop:
while (true) { 
 ConsumerRecords records = consumer.poll(pollIntervalMs); 
 for (ConsumerRecord record : records) { 
  // processMessage(record.value(), record.offset()); 
 } 
} 

This feels too hack-y for my taste, and, also, I am not sure if this logic will 
hold during the actual re-balancing , when, say, new consumers are added to the 
group. 

Could somebody validate this approach or suggest a better way to accomplish 
what we need ? 


thanks! 

Marina


[VOTE] 0.10.1.0 RC2

2016-10-12 Thread Jason Gustafson
Hello Kafka users, developers and client-developers,

One more RC for 0.10.1.0. I think we're getting close!

Release plan: https://cwiki.apache.org/confluence/display/KAFKA/Rele
ase+Plan+0.10.1.

Release notes for the 0.10.1.0 release:
http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/RELEASE_NOTES.html

*** Please download, test and vote by Saturday, Oct 15, 11am PT

Kafka's KEYS file containing PGP keys we use to sign the release:
http://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/

* Javadoc:
http://home.apache.org/~jgus/kafka-0.10.1.0-rc2/javadoc/

* Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc2 tag:
https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
8702d66434b86092a3738472f9186d6845ab0720

* Documentation:
http://kafka.apache.org/0101/documentation.html

* Protocol:
http://kafka.apache.org/0101/protocol.html

* Tests:
Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/68/
System tests: http://confluent-kafka-0-10-1-system-test-results.s3-
us-west-2.amazonaws.com/2016-10-11--001.1476197348--apache--0.10.1--d981dd2/

Thanks,

Jason


Re: Process to enable log compaction on a cluster

2016-10-12 Thread Mario Ricci
Sathya,

Did you ever figure out what to do here?

On Mon, Jul 4, 2016 at 12:19 AM Sathyakumar Seshachalam <
sathyakumar_seshacha...@trimble.com> wrote:

> My another followup question is that Am I right in assuming that per topic
> retention minutes or clean up policy, they all have any effect only when
> you log.cleaner.enable-false ?
> So In other words, I choose to truncate topic data, then __consumer_offsets
> topic will also be either deleted or compacted ?
>
> On Mon, Jul 4, 2016 at 11:38 AM, Sathyakumar Seshachalam <
> sathyakumar_seshacha...@trimble.com> wrote:
>
> > Ok, Am in situation where all kafka nodes are going to run out of space.
> > This is because I had been running uncompacted __consumer_offset topic
> and
> > everything retained topics .
> >
> > Now at a place, where I can afford to compact __consumer_offset topic and
> > also delete certain topics. I would like to know the right process to do
> > this.
> >
> > Now since I am having close 1.8T of data on __consumer_offset topic and
> > more in the topics data, any log compaction and log deletion/trunction is
> > going to take time. Should I do this node by node. Will Kafka's
> replication
> > come in the way. (I have read that uncompacted data from the leader is
> sent
> > to the followers.)
> >
> > Is there a clean process for this for a 3 node Kafka cluster ? Last time
> I
> > triggered a log compaction in all the 3 node simultaneously, all
> consumers
> > broke (I raised this in the same email group and got an answer to
> increase
> > the memory). Eventually they self-healed, but this caused some serious
> > disruption to the service, so before trying I want to make sure, there
> is a
> > cleaner process here.
> >
> > Any help/pointers will be greatly appreciated.
> >
> > Thanks,
> > Sathya
> >
> >
> >
>
-- 

Mario Ricci | Software Engineer | Trimble Navigation Limited | VirtualSite
Solutions | Office: +1 (303) 635-8604 / x228604


delete topic causing spikes in fetch/metadata requests

2016-10-12 Thread sunil kalva
We are using kafka 0.8.2.2 (client and server), when ever we delete a topic
we see lot of errors in broker logs like below, and there is also a spike
in fetch/metadata requests. Can i correlate these errors with topic delete
or its a known issue. Since there is spike in metadata requests and fetch
requests broker throughput has comedown.

--
[2016-10-12 16:04:55,054] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,056] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,057] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,059] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,060] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,062] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,064] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,065] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,067] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,068] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,070] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,072] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,073] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to
202816546. (kafka.server.ReplicaManager)
[2016-10-12 16:04:55,075] ERROR [Replica Manager on Broker 4]: Error when
processing fetch request for partition [xyz,0] offset 161946645 from
consumer with correlation id 0. Possible cause: Request for offset
161946645 but we only have log segments in the range 185487049 to

Re: In Kafka Streaming, Serdes should use Optionals

2016-10-12 Thread Guozhang Wang
Haha, I feel the same pain with you man.

On Tue, Oct 11, 2016 at 8:59 PM, Ali Akhtar  wrote:

> Thanks. That filter() method is a good solution. But whenever I look at it,
> I feel an empty spot in my heart which can only be filled by:
> filter(Optional::isPresent)
>
> On Wed, Oct 12, 2016 at 12:15 AM, Guozhang Wang 
> wrote:
>
> > Ali,
> >
> > We are working on moving from Java7 to Java8 in Apache Kafka, and the
> > Streams client is one of the motivations doing so. Stay tuned on the
> > mailing list when it will come.
> >
> > Currently Streams won't automatically filter out null values for you
> since
> > in some other cases they may have semantic meanings and cannot be simply
> > ignored; you can, though, apply a simple filter such like "filter((key,
> > value) => value != null)" before your processor lambda operator, if it
> > looks clearer in your code.
> >
> > Guozhang
> >
> >
> > On Sun, Oct 9, 2016 at 3:14 PM, Ali Akhtar  wrote:
> >
> > > It isn't a fatal error. It should be logged as a warning, and then the
> > > stream should be continued w/ the next message.
> > >
> > > Checking for null is 'ok', in the sense that it gets the job done, but
> > > after java 8's release, we really should be using optionals.
> > >
> > > Hopefully we can break compatibility w/ the bad old days soon and move
> > into
> > > the future.
> > >
> > > (If there's a way to do the null check automatically, i.e before
> calling
> > > the lambda, please let me know).
> > >
> > > On Sun, Oct 9, 2016 at 11:14 PM, Guozhang Wang 
> > wrote:
> > >
> > > > Ali,
> > > >
> > > > In your scenario, if serde fails to parse the bytes should that be
> > > treated
> > > > as a fatal failure or it is expected?
> > > >
> > > > In the former case, instead of returning a null I think it is better
> to
> > > > throw a runtime exception in order to let the whole client to stop
> and
> > > > notify the error; in the latter case, returning and checking null
> looks
> > > > fine to me.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Fri, Oct 7, 2016 at 3:12 PM, Ali Akhtar 
> > wrote:
> > > >
> > > > > Hey G,
> > > > >
> > > > > Looks like the only difference is a valueSerde parameter.
> > > > >
> > > > > How does that prevent having to look for nulls in the consumer?
> > > > >
> > > > > E.g, I wrote a custom Serde which converts the messages (which are
> > json
> > > > > strings) into a Java class using Jackson.
> > > > >
> > > > > If the json parse fails, it sends back a null.
> > > > >
> > > > > When I'm reading this stream, in my callback, how would I prevent
> > > having
> > > > to
> > > > > check if the serialized value isn't null?
> > > > >
> > > > > On Sat, Oct 8, 2016 at 1:07 AM, Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > Hello Ali,
> > > > > >
> > > > > > We do have corresponding overloaded functions for most of
> KStream /
> > > > > KTable
> > > > > > operators to avoid enforcing users to specify "null"; in these
> > cases
> > > > the
> > > > > > default serdes specified in the configs are then used. For
> example:
> > > > > >
> > > > > >  KTable aggregate(Initializer initializer,
> > > > > >Aggregator adder,
> > > > > >Aggregator subtractor,
> > > > > >Serde aggValueSerde,
> > > > > >String storeName);
> > > > > >
> > > > > > /**
> > > > > >  * .. using default serializers and deserializers.
> > > > > >  */
> > > > > >  KTable aggregate(Initializer initializer,
> > > > > >Aggregator adder,
> > > > > >Aggregator subtractor,
> > > > > >String storeName);
> > > > > >
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 7, 2016 at 9:20 AM, Michael Noll <
> mich...@confluent.io
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Ali, the Apache Kafka project still targets Java 7, which means
> > we
> > > > > can't
> > > > > > > use Java 8 features just yet.
> > > > > > >
> > > > > > > FYI: There's on ongoing conversation about when Kafka would
> move
> > > from
> > > > > > Java
> > > > > > > 7 to Java 8.
> > > > > > >
> > > > > > > On Fri, Oct 7, 2016 at 6:14 PM, Ali Akhtar <
> ali.rac...@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Since we're using Java 8 in most cases anyway, Serdes /
> > > > Serialiazers
> > > > > > > should
> > > > > > > > use options, to avoid having to deal with the lovely nulls.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang


Re: [VOTE] 0.10.1.0 RC1

2016-10-12 Thread Jason Gustafson
FYI: I'm cutting another RC this morning due to
https://issues.apache.org/jira/browse/KAFKA-4290. Hopefully this is the
last!

-Jason

On Mon, Oct 10, 2016 at 8:20 PM, Jason Gustafson  wrote:

> The documentation is mostly fixed now: http://kafka.apache.org/0
> 101/documentation.html. Thanks to Derrick Or for all the help. Let me
> know if anyone notices any additional problems.
>
> -Jason
>
> On Mon, Oct 10, 2016 at 1:10 PM, Jason Gustafson 
> wrote:
>
>> Hello Kafka users, developers and client-developers,
>>
>> This is the second candidate for release of Apache Kafka 0.10.1.0. This
>> is a minor release that includes great new features including throttled
>> replication, secure quotas, time-based log searching, and queryable state
>> for Kafka Streams. A full list of the content can be found here:
>> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1.
>>
>> One quick note on the docs. Because of all the recent improvements, the
>> documentation is still a bit out of sync with what's visible on the Kafka
>> homepage. This should be fixed soon (definitely before the release is
>> finalized).
>>
>> Release notes for the 0.10.1.0 release:
>> http://home.apache.org/~jgus/kafka-0.10.1.0-rc1/RELEASE_NOTES.html
>> 
>>
>> *** Please download, test and vote by Thursday, Oct 13, 1pm PT
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> http://kafka.apache.org/KEYS
>>
>> * Release artifacts to be voted upon (source and binary):
>> http://home.apache.org/~jgus/kafka-0.10.1.0-rc1/
>> 
>>
>> * Maven artifacts to be voted upon:
>> https://repository.apache.org/content/groups/staging/
>>
>> * Javadoc:
>> http://home.apache.org/~jgus/kafka-0.10.1.0-rc1/javadoc/
>> 
>>
>> * Tag to be voted upon (off 0.10.1 branch) is the 0.10.1.0-rc1 tag:
>> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
>> 6eda15a97ffe17d636c390c0e0b28c8349993941
>>
>> * Documentation:
>> http://kafka.apache.org/0101/documentation.html
>>
>> * Protocol:
>> http://kafka.apache.org/0101/protocol.html
>>
>> * Tests:
>> Unit tests: https://builds.apache.org/job/kafka-0.10.1-jdk7/59/
>> System tests: http://testing.confluent.io/co
>> nfluent-kafka-0-10-1-system-test-results/?prefix=2016-10-10-
>> -001.1476110532--apache--0.10.1--e696f17/
>>
>> Thanks,
>>
>> Jason
>>
>
>


Re: JVM crash when closing persistent store (rocksDB)

2016-10-12 Thread Eno Thereska
Depending on how voting goes, the tentative date is Oct 17th:
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.10.1 


Thanks
Eno
> On 12 Oct 2016, at 16:00, Damian Guy  wrote:
> 
> 0.10.1 will release will hopefully be within the next couple of weeks.
> 
> On Wed, 12 Oct 2016 at 15:52 Pierre Coquentin  >
> wrote:
> 
>> Ok it works against the trunk and the branch 0.10.1, both have a dependency
>> to rockdb 4.9.0 vs 4.4.1 for kafka 0.10.0.
>> Do you know when 0.10.1 will be released ?
>> 
>> On Tue, Oct 11, 2016 at 9:39 PM, Pierre Coquentin <
>> pierre.coquen...@gmail.com> wrote:
>> 
>>> Hi,
>>> 
>>> I already tried to store rocks db files somewhere else by specifying the
>>> kafa state dir properties, but no luck, same behavior.
>>> I will try to run with the trunk tomorrow to see if it's stop correctly,
>>> and I will keep you inform. There must be something with my
>> configuration,
>>> because I googled and saw nothing about this problem.
>>> 
>>> On Tue, Oct 11, 2016 at 9:28 PM, Eno Thereska 
>>> wrote:
>>> 
 Hi Pierre,
 
 I tried the exact code on MacOs and am not getting any errors. Could you
 check if all the directories in /tmp where Kafka Streams writes the
>> RocksDb
 files are empty? I'm wondering if there is some bad state left over.
 
 Finally looks like you are running 0.10.0, could you try running trunk
>> to
 see if the problem still exists? I'm running trunk. I know there were a
 couple of RocksDb fixes after 0.10.0
 
 Thanks
 Eno
 
> On 11 Oct 2016, at 16:41, Pierre Coquentin <
>> pierre.coquen...@gmail.com>
 wrote:
> 
> Hi,
> 
> I have a simple test where I create a topology builder with one topic,
 one
> processor using a persistent store, then I create a kafka streams,
>> start
> it, wait a bit, then close. Each time, the jvm crashes (seg fault)
>> when
> flushing the data. Anyone has already met this kind of problem ?
> 
> OS:
> Ubuntu 16.04
> 
> JVM:
> openjdk version "1.8.0_91"
> OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.
 04.1-b14)
> OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
> 
> Kafka:
> kafka_2.11-0.10.0.1
> 
> The java code to reproduce the problem:
> 
> public static void main(String[] args) throws InterruptedException {
>   Map configs = new HashMap<>();
>   configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> "localhost:2181");
>   configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9092");
>   configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>   configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> StringSerde.class.getName());
>   configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> StringSerde.class.getName());
> 
>   TopologyBuilder builder = new TopologyBuilder()
>   .addSource("source", "test")
>   .addProcessor("processor", () -> new Processor String>() {
> 
>   private KeyValueStore kvStore;
> 
>   public void init(ProcessorContext context) {
>   kvStore = (KeyValueStore)
> context.getStateStore("store");
>   }
> 
>   public void process(String key, String value) {
> 
>   }
> 
>   public void close() {
>   kvStore.close();
>   }
> 
>   @Override
>   public void punctuate(long timestamp) {
> 
> 
>   }
>   }, "source")
> .addStateStore(Stores.create("store").withKeys(new
> StringSerde()).withValues(new StringSerde()).persistent().build(),
> "processor");
> 
> 
>   KafkaStreams streams = new KafkaStreams(builder, new
> StreamsConfig(configs));
>   streams.start();
>   TimeUnit.SECONDS.sleep(20);
>   streams.close();
>   }
> 
> 
> There is the log:
> 
> 
> 11.10.2016 17:27:11 [main] INFO
> [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
> replication.factor = 1
> num.standby.replicas = 0
> metric.reporters = []
> commit.interval.ms = 3
> bootstrap.servers = [localhost:9092]
> state.dir = /tmp/kafka-streams
> partition.grouper = class
> org.apache.kafka.streams.processor.DefaultPartitionGrouper
> state.cleanup.delay.ms = 6
> poll.ms = 100
> zookeeper.connect = localhost:2181
> key.serde = class org.apache.kafka.common.serial
 

Re: JVM crash when closing persistent store (rocksDB)

2016-10-12 Thread Damian Guy
0.10.1 will release will hopefully be within the next couple of weeks.

On Wed, 12 Oct 2016 at 15:52 Pierre Coquentin 
wrote:

> Ok it works against the trunk and the branch 0.10.1, both have a dependency
> to rockdb 4.9.0 vs 4.4.1 for kafka 0.10.0.
> Do you know when 0.10.1 will be released ?
>
> On Tue, Oct 11, 2016 at 9:39 PM, Pierre Coquentin <
> pierre.coquen...@gmail.com> wrote:
>
> > Hi,
> >
> > I already tried to store rocks db files somewhere else by specifying the
> > kafa state dir properties, but no luck, same behavior.
> > I will try to run with the trunk tomorrow to see if it's stop correctly,
> > and I will keep you inform. There must be something with my
> configuration,
> > because I googled and saw nothing about this problem.
> >
> > On Tue, Oct 11, 2016 at 9:28 PM, Eno Thereska 
> > wrote:
> >
> >> Hi Pierre,
> >>
> >> I tried the exact code on MacOs and am not getting any errors. Could you
> >> check if all the directories in /tmp where Kafka Streams writes the
> RocksDb
> >> files are empty? I'm wondering if there is some bad state left over.
> >>
> >> Finally looks like you are running 0.10.0, could you try running trunk
> to
> >> see if the problem still exists? I'm running trunk. I know there were a
> >> couple of RocksDb fixes after 0.10.0
> >>
> >> Thanks
> >> Eno
> >>
> >> > On 11 Oct 2016, at 16:41, Pierre Coquentin <
> pierre.coquen...@gmail.com>
> >> wrote:
> >> >
> >> > Hi,
> >> >
> >> > I have a simple test where I create a topology builder with one topic,
> >> one
> >> > processor using a persistent store, then I create a kafka streams,
> start
> >> > it, wait a bit, then close. Each time, the jvm crashes (seg fault)
> when
> >> > flushing the data. Anyone has already met this kind of problem ?
> >> >
> >> > OS:
> >> > Ubuntu 16.04
> >> >
> >> > JVM:
> >> > openjdk version "1.8.0_91"
> >> > OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.
> >> 04.1-b14)
> >> > OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
> >> >
> >> > Kafka:
> >> > kafka_2.11-0.10.0.1
> >> >
> >> > The java code to reproduce the problem:
> >> >
> >> > public static void main(String[] args) throws InterruptedException {
> >> >Map configs = new HashMap<>();
> >> >configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> >> > "localhost:2181");
> >> >configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> >> > "localhost:9092");
> >> >configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
> >> >configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> >> > StringSerde.class.getName());
> >> >configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> >> > StringSerde.class.getName());
> >> >
> >> >TopologyBuilder builder = new TopologyBuilder()
> >> >.addSource("source", "test")
> >> >.addProcessor("processor", () -> new Processor >> > String>() {
> >> >
> >> >private KeyValueStore kvStore;
> >> >
> >> >public void init(ProcessorContext context) {
> >> >kvStore = (KeyValueStore)
> >> > context.getStateStore("store");
> >> >}
> >> >
> >> >public void process(String key, String value) {
> >> >
> >> >}
> >> >
> >> >public void close() {
> >> >kvStore.close();
> >> >}
> >> >
> >> >@Override
> >> >public void punctuate(long timestamp) {
> >> >
> >> >
> >> >}
> >> >}, "source")
> >> >  .addStateStore(Stores.create("store").withKeys(new
> >> > StringSerde()).withValues(new StringSerde()).persistent().build(),
> >> > "processor");
> >> >
> >> >
> >> >KafkaStreams streams = new KafkaStreams(builder, new
> >> > StreamsConfig(configs));
> >> >streams.start();
> >> >TimeUnit.SECONDS.sleep(20);
> >> >streams.close();
> >> >}
> >> >
> >> >
> >> > There is the log:
> >> >
> >> >
> >> > 11.10.2016 17:27:11 [main] INFO
> >> > [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
> >> > replication.factor = 1
> >> > num.standby.replicas = 0
> >> > metric.reporters = []
> >> > commit.interval.ms = 3
> >> > bootstrap.servers = [localhost:9092]
> >> > state.dir = /tmp/kafka-streams
> >> > partition.grouper = class
> >> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
> >> > state.cleanup.delay.ms = 6
> >> > poll.ms = 100
> >> > zookeeper.connect = localhost:2181
> >> > key.serde = class org.apache.kafka.common.serial
> >> ization.Serdes$StringSerde
> >> > metrics.sample.window.ms = 3
> >> > buffered.records.per.partition = 1000
> >> > value.serde = class org.apache.kafka.common.serial
> >> ization.Serdes$StringSerde
> >> > timestamp.extractor = class
> >> > 

Re: JVM crash when closing persistent store (rocksDB)

2016-10-12 Thread Pierre Coquentin
Ok it works against the trunk and the branch 0.10.1, both have a dependency
to rockdb 4.9.0 vs 4.4.1 for kafka 0.10.0.
Do you know when 0.10.1 will be released ?

On Tue, Oct 11, 2016 at 9:39 PM, Pierre Coquentin <
pierre.coquen...@gmail.com> wrote:

> Hi,
>
> I already tried to store rocks db files somewhere else by specifying the
> kafa state dir properties, but no luck, same behavior.
> I will try to run with the trunk tomorrow to see if it's stop correctly,
> and I will keep you inform. There must be something with my configuration,
> because I googled and saw nothing about this problem.
>
> On Tue, Oct 11, 2016 at 9:28 PM, Eno Thereska 
> wrote:
>
>> Hi Pierre,
>>
>> I tried the exact code on MacOs and am not getting any errors. Could you
>> check if all the directories in /tmp where Kafka Streams writes the RocksDb
>> files are empty? I'm wondering if there is some bad state left over.
>>
>> Finally looks like you are running 0.10.0, could you try running trunk to
>> see if the problem still exists? I'm running trunk. I know there were a
>> couple of RocksDb fixes after 0.10.0
>>
>> Thanks
>> Eno
>>
>> > On 11 Oct 2016, at 16:41, Pierre Coquentin 
>> wrote:
>> >
>> > Hi,
>> >
>> > I have a simple test where I create a topology builder with one topic,
>> one
>> > processor using a persistent store, then I create a kafka streams, start
>> > it, wait a bit, then close. Each time, the jvm crashes (seg fault) when
>> > flushing the data. Anyone has already met this kind of problem ?
>> >
>> > OS:
>> > Ubuntu 16.04
>> >
>> > JVM:
>> > openjdk version "1.8.0_91"
>> > OpenJDK Runtime Environment (build 1.8.0_91-8u91-b14-3ubuntu1~16.
>> 04.1-b14)
>> > OpenJDK 64-Bit Server VM (build 25.91-b14, mixed mode)
>> >
>> > Kafka:
>> > kafka_2.11-0.10.0.1
>> >
>> > The java code to reproduce the problem:
>> >
>> > public static void main(String[] args) throws InterruptedException {
>> >Map configs = new HashMap<>();
>> >configs.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
>> > "localhost:2181");
>> >configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
>> > "localhost:9092");
>> >configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
>> >configs.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
>> > StringSerde.class.getName());
>> >configs.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
>> > StringSerde.class.getName());
>> >
>> >TopologyBuilder builder = new TopologyBuilder()
>> >.addSource("source", "test")
>> >.addProcessor("processor", () -> new Processor> > String>() {
>> >
>> >private KeyValueStore kvStore;
>> >
>> >public void init(ProcessorContext context) {
>> >kvStore = (KeyValueStore)
>> > context.getStateStore("store");
>> >}
>> >
>> >public void process(String key, String value) {
>> >
>> >}
>> >
>> >public void close() {
>> >kvStore.close();
>> >}
>> >
>> >@Override
>> >public void punctuate(long timestamp) {
>> >
>> >
>> >}
>> >}, "source")
>> >  .addStateStore(Stores.create("store").withKeys(new
>> > StringSerde()).withValues(new StringSerde()).persistent().build(),
>> > "processor");
>> >
>> >
>> >KafkaStreams streams = new KafkaStreams(builder, new
>> > StreamsConfig(configs));
>> >streams.start();
>> >TimeUnit.SECONDS.sleep(20);
>> >streams.close();
>> >}
>> >
>> >
>> > There is the log:
>> >
>> >
>> > 11.10.2016 17:27:11 [main] INFO
>> > [org.apache.kafka.streams.StreamsConfig:178] StreamsConfig values:
>> > replication.factor = 1
>> > num.standby.replicas = 0
>> > metric.reporters = []
>> > commit.interval.ms = 3
>> > bootstrap.servers = [localhost:9092]
>> > state.dir = /tmp/kafka-streams
>> > partition.grouper = class
>> > org.apache.kafka.streams.processor.DefaultPartitionGrouper
>> > state.cleanup.delay.ms = 6
>> > poll.ms = 100
>> > zookeeper.connect = localhost:2181
>> > key.serde = class org.apache.kafka.common.serial
>> ization.Serdes$StringSerde
>> > metrics.sample.window.ms = 3
>> > buffered.records.per.partition = 1000
>> > value.serde = class org.apache.kafka.common.serial
>> ization.Serdes$StringSerde
>> > timestamp.extractor = class
>> > org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
>> > num.stream.threads = 1
>> > metrics.num.samples = 2
>> > application.id = test
>> > client.id =
>> >
>> > 11.10.2016 17:27:11 [main] INFO
>> > [org.apache.kafka.streams.processor.internals.StreamThread:170]
>> Creating
>> > producer client for stream thread [StreamThread-1]
>> > 11.10.2016 17:27:11 [main] INFO
>> > [org.apache.kafka.clients.producer.ProducerConfig:178] 

Kafka-connect cannot find configuration in config.storage.topic

2016-10-12 Thread Kristoffer Sjögren
Hi

We have noticed that kafka-connect cannot find its connector
configuration after a few passed weeks. The web ui reports that no
connectors are available even though the configuration records are
still available in config.storage.topic. Its possible to start the
connectors again by curling the configuration.

Looking through the KafkaConfigBackingStore.java source and tried it
locally and it looks like it does the right thing. But maybe this is
related to compaction?

Any ideas what could go wrong here?

Cheers,
-Kristoffer


HL7 messages to Kafka consumer

2016-10-12 Thread Samuel Glover
Has anyone done this?   I'm working with medical hospital company that
wants to ingest HL7 messages into Kafka cluster, topics.

Any guidance appreciated.

-- 
*Sam Glover*
Solutions Architect

*M*   512.550.5363 samglo...@cloudera.com
515 Congress Ave, Suite 1212 | Austin, TX | 78701
Celebrating a decade of community accomplishments
cloudera.com/hadoop10
#hadoop10


Re: Force producer topic metadata refresh.

2016-10-12 Thread Ismael Juma
Hi Alexandru,

I think your issue will be fixed by KAFKA-4254. There's a PR available and
should be merged shortly. Can you please verify?

Thanks,
Ismael

On Wed, Oct 12, 2016 at 11:00 AM, Alexandru Ionita <
alexandru.ion...@gmail.com> wrote:

> OK. then my question is: why is not the producer trying to recover from
> this error by updating its topic metadata right away instead of waiting for
> the "metadata.max.age.ms" to expire?
>
> 2016-10-12 11:43 GMT+02:00 Manikumar :
>
> > we have similar setting "metadata.max.age.ms" in new producer api.
> > Its default value is 300sec.
> >
> > On Wed, Oct 12, 2016 at 3:04 PM, Alexandru Ionita <
> > alexandru.ion...@gmail.com> wrote:
> >
> > > Hello kafka users!!
> > >
> > > I'm trying implement/use a mechanism to make a Kafka producer
> > imperatively
> > > update its topic metadata for a particular topic.
> > >
> > > Here is the use case:
> > >
> > > we are adding partitions on topics programmatically because we want to
> > very
> > > strictly control how messages are published to particular partitions.
> > >
> > > We are using AdminUtils.addPartitions to achieve this.
> > > We then store the ID of the newly added partition in Zookeeper so that
> we
> > > persist a mapping to a partition ID for our particular domain key.
> > >
> > > The problem we are facing right now is that the Kafka producer won't
> > > refresh its topic metadata until after a while, preventing the producer
> > > from posting to those partitions by throwing an error :
> > >
> > > Caused by: java.lang.IllegalArgumentException: Invalid partition given
> > > with
> > > record: 56 is not in the range [0...55].
> > > at
> > > org.apache.kafka.clients.producer.KafkaProducer.
> > > partition(KafkaProducer.java:717)
> > > ~[kafka-clients-0.10.0.1.jar:na]
> > > at
> > > org.apache.kafka.clients.producer.KafkaProducer.doSend(
> > > KafkaProducer.java:459)
> > > ~[kafka-clients-0.10.0.1.jar:na]
> > > at
> > > org.apache.kafka.clients.producer.KafkaProducer.send(
> > > KafkaProducer.java:430)
> > > ~[kafka-clients-0.10.0.1.jar:na]
> > > at
> > > org.apache.kafka.clients.producer.KafkaProducer.send(
> > > KafkaProducer.java:353)
> > > ~[kafka-clients-0.10.0.1.jar:na]
> > >
> > > As I somewhere read (https://github.com/SOHU-Co/kafka-node/issues/175
> ),
> > > the
> > > producer should try to recover from such error by pulling the latest
> > > version of the topic metadata.
> > >
> > > This doesn't happening and I will keep getting those errors for like 60
> > > seconds until the producer eventually will be able to publish to that
> > > partition.
> > >
> > > In the previous version of kafka (0.8) there was a producer setting
> > called
> > > topic.metadata.refresh.interval.ms that was aimed to make the producer
> > > pull
> > > that information. This is what I found related to that setting in the
> 0.8
> > > documentation: "The producer generally refreshes the topic metadata
> from
> > > brokers when there is a failure (partition missing, leader not
> > > available...)
> > > "
> > >
> > > Any ideas and comments on this are much appreciated.
> > > Thanks
> > >
> >
>


Re: Force producer topic metadata refresh.

2016-10-12 Thread Alexandru Ionita
OK. then my question is: why is not the producer trying to recover from
this error by updating its topic metadata right away instead of waiting for
the "metadata.max.age.ms" to expire?

2016-10-12 11:43 GMT+02:00 Manikumar :

> we have similar setting "metadata.max.age.ms" in new producer api.
> Its default value is 300sec.
>
> On Wed, Oct 12, 2016 at 3:04 PM, Alexandru Ionita <
> alexandru.ion...@gmail.com> wrote:
>
> > Hello kafka users!!
> >
> > I'm trying implement/use a mechanism to make a Kafka producer
> imperatively
> > update its topic metadata for a particular topic.
> >
> > Here is the use case:
> >
> > we are adding partitions on topics programmatically because we want to
> very
> > strictly control how messages are published to particular partitions.
> >
> > We are using AdminUtils.addPartitions to achieve this.
> > We then store the ID of the newly added partition in Zookeeper so that we
> > persist a mapping to a partition ID for our particular domain key.
> >
> > The problem we are facing right now is that the Kafka producer won't
> > refresh its topic metadata until after a while, preventing the producer
> > from posting to those partitions by throwing an error :
> >
> > Caused by: java.lang.IllegalArgumentException: Invalid partition given
> > with
> > record: 56 is not in the range [0...55].
> > at
> > org.apache.kafka.clients.producer.KafkaProducer.
> > partition(KafkaProducer.java:717)
> > ~[kafka-clients-0.10.0.1.jar:na]
> > at
> > org.apache.kafka.clients.producer.KafkaProducer.doSend(
> > KafkaProducer.java:459)
> > ~[kafka-clients-0.10.0.1.jar:na]
> > at
> > org.apache.kafka.clients.producer.KafkaProducer.send(
> > KafkaProducer.java:430)
> > ~[kafka-clients-0.10.0.1.jar:na]
> > at
> > org.apache.kafka.clients.producer.KafkaProducer.send(
> > KafkaProducer.java:353)
> > ~[kafka-clients-0.10.0.1.jar:na]
> >
> > As I somewhere read (https://github.com/SOHU-Co/kafka-node/issues/175),
> > the
> > producer should try to recover from such error by pulling the latest
> > version of the topic metadata.
> >
> > This doesn't happening and I will keep getting those errors for like 60
> > seconds until the producer eventually will be able to publish to that
> > partition.
> >
> > In the previous version of kafka (0.8) there was a producer setting
> called
> > topic.metadata.refresh.interval.ms that was aimed to make the producer
> > pull
> > that information. This is what I found related to that setting in the 0.8
> > documentation: "The producer generally refreshes the topic metadata from
> > brokers when there is a failure (partition missing, leader not
> > available...)
> > "
> >
> > Any ideas and comments on this are much appreciated.
> > Thanks
> >
>


Re: Force producer topic metadata refresh.

2016-10-12 Thread Manikumar
we have similar setting "metadata.max.age.ms" in new producer api.
Its default value is 300sec.

On Wed, Oct 12, 2016 at 3:04 PM, Alexandru Ionita <
alexandru.ion...@gmail.com> wrote:

> Hello kafka users!!
>
> I'm trying implement/use a mechanism to make a Kafka producer imperatively
> update its topic metadata for a particular topic.
>
> Here is the use case:
>
> we are adding partitions on topics programmatically because we want to very
> strictly control how messages are published to particular partitions.
>
> We are using AdminUtils.addPartitions to achieve this.
> We then store the ID of the newly added partition in Zookeeper so that we
> persist a mapping to a partition ID for our particular domain key.
>
> The problem we are facing right now is that the Kafka producer won't
> refresh its topic metadata until after a while, preventing the producer
> from posting to those partitions by throwing an error :
>
> Caused by: java.lang.IllegalArgumentException: Invalid partition given
> with
> record: 56 is not in the range [0...55].
> at
> org.apache.kafka.clients.producer.KafkaProducer.
> partition(KafkaProducer.java:717)
> ~[kafka-clients-0.10.0.1.jar:na]
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(
> KafkaProducer.java:459)
> ~[kafka-clients-0.10.0.1.jar:na]
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(
> KafkaProducer.java:430)
> ~[kafka-clients-0.10.0.1.jar:na]
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(
> KafkaProducer.java:353)
> ~[kafka-clients-0.10.0.1.jar:na]
>
> As I somewhere read (https://github.com/SOHU-Co/kafka-node/issues/175),
> the
> producer should try to recover from such error by pulling the latest
> version of the topic metadata.
>
> This doesn't happening and I will keep getting those errors for like 60
> seconds until the producer eventually will be able to publish to that
> partition.
>
> In the previous version of kafka (0.8) there was a producer setting called
> topic.metadata.refresh.interval.ms that was aimed to make the producer
> pull
> that information. This is what I found related to that setting in the 0.8
> documentation: "The producer generally refreshes the topic metadata from
> brokers when there is a failure (partition missing, leader not
> available...)
> "
>
> Any ideas and comments on this are much appreciated.
> Thanks
>


Force producer topic metadata refresh.

2016-10-12 Thread Alexandru Ionita
Hello kafka users!!

I'm trying implement/use a mechanism to make a Kafka producer imperatively
update its topic metadata for a particular topic.

Here is the use case:

we are adding partitions on topics programmatically because we want to very
strictly control how messages are published to particular partitions.

We are using AdminUtils.addPartitions to achieve this.
We then store the ID of the newly added partition in Zookeeper so that we
persist a mapping to a partition ID for our particular domain key.

The problem we are facing right now is that the Kafka producer won't
refresh its topic metadata until after a while, preventing the producer
from posting to those partitions by throwing an error :

Caused by: java.lang.IllegalArgumentException: Invalid partition given with
record: 56 is not in the range [0...55].
at
org.apache.kafka.clients.producer.KafkaProducer.partition(KafkaProducer.java:717)
~[kafka-clients-0.10.0.1.jar:na]
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:459)
~[kafka-clients-0.10.0.1.jar:na]
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
~[kafka-clients-0.10.0.1.jar:na]
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
~[kafka-clients-0.10.0.1.jar:na]

As I somewhere read (https://github.com/SOHU-Co/kafka-node/issues/175), the
producer should try to recover from such error by pulling the latest
version of the topic metadata.

This doesn't happening and I will keep getting those errors for like 60
seconds until the producer eventually will be able to publish to that
partition.

In the previous version of kafka (0.8) there was a producer setting called
topic.metadata.refresh.interval.ms that was aimed to make the producer pull
that information. This is what I found related to that setting in the 0.8
documentation: "The producer generally refreshes the topic metadata from
brokers when there is a failure (partition missing, leader not available...)
"

Any ideas and comments on this are much appreciated.
Thanks


Re: KafkaStream Merging two topics is not working fro custom datatypes

2016-10-12 Thread Michael Noll
Happy to hear it works now for you, Ratha.

-Michael


On Wed, Oct 12, 2016 at 6:06 AM, Ratha v  wrote:

> Sorry my fault, In the kafkaConsumer I messed with 'value.deserializer'
> property..
> Now things are working fine..
> Thanks a lot.
>
> On 12 October 2016 at 14:10, Ratha v  wrote:
>
> > HI Michael;
> > Sorry , after setting "auto.offset.reset"  to 'earliest' , I see messages
> > in my 'targetTopic'.
> > But still I get my class cast exception issue, when I consume message
> from
> > the 'targetTopic'. (To consume message I use KafkaConsumer highlevel API)
> >
> > *ConsumerRecords records = consumer.poll(Long.MAX_VALUE);*
> >
> >
> >
> > *Exception*
> >
> > *java.lang.ClassCastException: java.lang.String cannot be cast to
> > xxx.core.kafkamodels.KafkaPayload at
> > xx.core.listener.KafkaMessageListener.receiveData(
> KafkaMessageListener.java:108)
> > ~[classes/:?]*
> >
> > at xx.core.listener.KafkaMessageListenerThread.process(
> > KafkaMessageListenerThread.java:68) ~[classes/:?]
> >
> > at xx.core.listener.KafkaMessageListenerThread.lambda$run$1(
> > KafkaMessageListenerThread.java:50) ~[classes/:?]
> >
> > at java.lang.Iterable.forEach(Iterable.java:75) ~[?:1.8.0_66]
> >
> > at com.leightonobrien.core.listener.KafkaMessageListenerThread.run(
> > KafkaMessageListenerThread.java:50) [classes/:?]
> >
> > at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> > [?:1.8.0_66]
> >
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_66]
> >
> > at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> > [?:1.8.0_66]
> >
> > at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> > [?:1.8.0_66]
> >
> > at java.lang.Thread.run(Thread.java:745) [?:1.8.0_66]
> >
> >
> >
> > On 12 October 2016 at 13:19, Ratha v  wrote:
> >
> >> HI Michael;
> >>
> >> Really appreciate for the clear explanation..
> >> I modified my code as you mentioned. I have written custom, Serde,
> >> serializer,deserializer.
> >> But now the problem i see is, both topics are not merged. Means,
> Messages
> >> in the 'sourcetopic' not to passed to 'targetTopic' . ('targetTopic has
> '0'
> >> messages)
> >> I do not see any exceptions.
> >>
> >> Here is my custom serde, serializer/deserializer and the logic; Also I
> >> have properties file where i defined  following parameters;
> >>
> >> *bootstrap.servers=xx.com \:9092,xx.com
> >> \:9092,xx.com \:9092*
> >>
> >> *key.serde=org.apache.kafka.com
> >> mon.serialization.Serdes$StringSerde*
> >>
> >> *value.serde=xx.kafkamodels.KafkaPayloadSerdes$KafkaPayloadSerde*
> >>
> >> *application.id =stream-pipe*
> >>
> >>
> >> Do you see any issue here? Why messages are not written to '
> targetTopic'?
> >>
> >>
> >>
> >> *LOGIC*
> >>
> >> /**
> >>
> >> * create stream from source topics and write it to the target topic
> >>
> >> * @param sourceTopics
> >>
> >> * @param targetTopic
> >>
> >> */
> >>
> >> public void write(String[] sourceTopics, String targetTopic) {
> >>
> >>  KafkaStreams streams = null;
> >>
> >>  KStreamBuilder builder = new KStreamBuilder();
> >>
> >>   try {
> >>
> >>KStream kafkaPayloadStream = builder
> >> .stream(stringSerde, kafkaPayloadSerde, sourceTopics);
> >>
> >>kafkaPayloadStream.to(stringSerde, kafkaPayloadSerde,
> >> targetTopic);
> >>
> >>streams = new KafkaStreams(builder, properties);
> >>
> >>streams.start();
> >>
> >>Thread.sleep(5000L);
> >>
> >>   } catch (InterruptedException e) {
> >>
> >>   log.warn(e);
> >>
> >>  } catch (Exception e) {
> >>
> >>  log.error("Topic merge failed. ",e);
> >>
> >>   } finally {
> >>
> >>if (streams != null) {
> >>
> >>streams.close();
> >>
> >>  }
> >>
> >> }
> >>
> >> }
> >>
> >>
> >>
> >>
> >> *SERDE*
> >>
> >>
> >> public class KafkaPayloadSerdes {
> >>
> >> static private class WrapperSerde implements
> >> Serde {
> >> final private Serializer serializer;
> >> final private Deserializer deserializer;
> >>
> >> public WrapperSerde(Serializer serializer,
> >> Deserializer deserializer) {
> >> this.serializer = serializer;
> >> this.deserializer = deserializer;
> >> }
> >>
> >> @Override
> >> public void configure(Map configs, boolean isKey) {
> >> serializer.configure(configs, isKey);
> >> deserializer.configure(configs, isKey);
> >> }
> >>
> >> @Override
> >> public void close() {
> >> serializer.close();
> >> deserializer.close();
> >> }
> >>
> >> @Override
> >> public Serializer serializer() {
> >> return serializer;
> >> }
> >>
> >> @Override
> >> public Deserializer deserializer() {
> >> return deserializer;
> >> }
> >> }
> >>
> >> static public final class KafkaPayloadSerde extends
> >> 

Re: How can I delete a topic programatically?

2016-10-12 Thread Ratha v
Thank you..

On 12 October 2016 at 16:30, Jianbin Wei  wrote:

> You can check this
>
> http://kafka.apache.org/documentation.html#basic_ops_add_topic
>
> But from our experience it is best to delete topics one by one, i.e., make
> sure Kafka is in good shape before and after deleting a topic before
> working on next one.
>
> Regards,
>
> -- Jianbin
>
> > On Oct 11, 2016, at 9:26 PM, Ratha v  wrote:
> >
> > Thanks..Which bash script I need to run?
> >
> >> On 12 October 2016 at 15:20, Ali Akhtar  wrote:
> >>
> >> The last time I tried, I couldn't find a way to do it, other than to
> >> trigger the bash script for topic deletion programatically.
> >>
> >>> On Wed, Oct 12, 2016 at 9:18 AM, Ratha v 
> wrote:
> >>>
> >>> Hi all;
> >>>
> >>> I have two topics(source and target). I do some processing on the
> message
> >>> available in the source topic and i merge both topic.
> >>> That is;
> >>>
> >>> builder.stream(sourceTopic).to(targetTopic)
> >>>
> >>> Once merged I no longer require the sourceTopic. I want to delete it.
> >>>
> >>> How can I do that programatically in java? I use highelevel  client
> APIs,
> >>> kafka v 0.10.0.1
> >>>
> >>>
> >>> Thanks
> >>> --
> >>> -Ratha
> >>> http://vvratha.blogspot.com/
> >>>
> >>
> >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/