Re: question about KIP-150 - Kafka-Streams Cogroup

2017-11-10 Thread Guozhang Wang
Matt is right. KIP-150 did not make it to 1.0.0. So it may need to wait for
the next minor release (1.1.0).


Guozhang

On Fri, Nov 10, 2017 at 10:25 AM, Matt Farmer  wrote:

> The JIRA ticket for its implementation still appears to be open, so I'd
> guess it's not in 1.0
>
> On Fri, Nov 10, 2017 at 12:28 PM Artur Mrozowski  wrote:
>
> > Hi,
> > I have a question about  KIP-150. Has that functionality been released in
> > version 1.0 or is it planned for version 1.1?
> >
> > Here it says 1.1
> > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams
> >
> > but in this blog post it seems to be part of 1.0.
> > https://www.confluent.io/blog/apache-kafka-goes-1-0/
> >
> > Has anyone worked with it in current version 1.0 and could point me to
> some
> > examples?
> >
> > Best Regards
> > Artur Mrozowski
> >
>



-- 
-- Guozhang


Re: [VOTE] 0.11.0.2 RC0

2017-11-10 Thread Rajini Sivaram
Resending to include kafka-clients.

On Sat, Nov 11, 2017 at 12:37 AM, Rajini Sivaram 
wrote:

> Hello Kafka users, developers and client-developers,
>
>
> This is the first candidate for release of Apache Kafka 0.11.0.2.
>
>
> This is a bug fix release and it includes fixes and improvements from 16 
> JIRAs,
> including a few critical bugs.
>
>
> Release notes for the 0.11.0.2 release:
>
> http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/RELEASE_NOTES.html
>
>
> *** Please download, test and vote by Wednesday the 15th of November, 8PM
> PT
>
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
>
> http://kafka.apache.org/KEYS
>
>
> * Release artifacts to be voted upon (source and binary):
>
> http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/
>
>
> * Maven artifacts to be voted upon:
>
> https://repository.apache.org/content/groups/staging/
>
>
> * Javadoc:
>
> http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/javadoc/
>
>
> * Tag to be voted upon (off 0.11.0 branch) is the 0.11.0.2 tag:
>
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=
> 25639822d6e23803c599cba35ad3dc1a2817b404
>
>
>
> * Documentation:
>
> Note the documentation can't be pushed live due to changes that will not
> go live until the release. You can manually verify by downloading
>
> http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/
> kafka_2.11-0.11.0.2-site-docs.tgz
>
>
>
> * Protocol:
>
> http://kafka.apache.org/0110/protocol.html
>
>
> * Successful Jenkins builds for the 0.11.0 branch:
>
> Unit/integration tests: https://builds.apache.
> org/job/kafka-0.11.0-jdk7/333/
>
>
>
>
> Thanks,
>
>
> Rajini
>
>


[VOTE] 0.11.0.2 RC0

2017-11-10 Thread Rajini Sivaram
Hello Kafka users, developers and client-developers,


This is the first candidate for release of Apache Kafka 0.11.0.2.


This is a bug fix release and it includes fixes and improvements from 16 JIRAs,
including a few critical bugs.


Release notes for the 0.11.0.2 release:

http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/RELEASE_NOTES.html


*** Please download, test and vote by Wednesday the 15th of November, 8PM PT


Kafka's KEYS file containing PGP keys we use to sign the release:

http://kafka.apache.org/KEYS


* Release artifacts to be voted upon (source and binary):

http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/


* Maven artifacts to be voted upon:

https://repository.apache.org/content/groups/staging/


* Javadoc:

http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/javadoc/


* Tag to be voted upon (off 0.11.0 branch) is the 0.11.0.2 tag:

https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=25639822d6e23803c599cba35ad3dc1a2817b404



* Documentation:

Note the documentation can't be pushed live due to changes that will
not go live
until the release. You can manually verify by downloading

http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/kafka_2.11-0.11.0.2-site-docs.tgz



* Protocol:

http://kafka.apache.org/0110/protocol.html


* Successful Jenkins builds for the 0.11.0 branch:

Unit/integration tests: https://builds.apache.org/job/kafka-0.11.0-jdk7/333/




Thanks,


Rajini


Re: Listeners and reference/docs

2017-11-10 Thread Thomas Stringer
Yep I'm familiar with that. Just curious where it's documented that, for
instance, the CLIENT listener is for client connections.

On Fri, Nov 10, 2017, 12:08 PM Kaufman Ng  wrote:

> This is related to another config "listener.security.protocol.map" (since
> version 0.10.2.0). The CLIENT, PLAINTEXT, etc are defined as a
> name-protocol mapping. So what you have in the listeners property (e.g.
> CLIENT) must have an entry in the protocol map which determines which
> protocol to use (e.g. CLIENT:SASL_PLAINTEXT).
>
> The idea is that the same protocol can be used in multiple listeners. For
> example you might have SASL/SSL in two listeners (different network
> interface or port).
>
> This is part of KIP-103, you can see more details there:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic
>
> On Fri, Nov 10, 2017 at 10:10 AM, Thomas Stringer 
> wrote:
>
> > I've been working with Kafka broker listeners and I'm curious is there
> > any documentation that explains what all of them apply to? Such as
> > CLIENT, PLAINTEXT, SASL/SSL, etc. I see the encryption part of the
> > documentation, but is it just inferred what these listeners apply to?
> >
> > Thank you in advance!
> >
>
>
>
> --
> Kaufman Ng
> +1 646 961 8063
> Solutions Architect | Confluent | www.confluent.io
>


Re: How do I gracefully handle stream joins where the other side never appears?

2017-11-10 Thread Matthias J. Sax
Messages that don't find a join partner are dropped.

For each incoming message, we do the following:
 1. insert it into it's window store
 2. lookup other window store for matching record
a) if matching records are found, compute join and emit

Note, that we maintain all records in the window store until retention
time passes. Thus, if there will be no matching join record, a record
will eventually be dropped.

There is no API to be notified about this.

With regard to left/outer join: your observation is correct -- we need
to implement it this way, as it's unclear for how long to delay the
computation and to wait for a matching record -- note, that Streams is
able to handle late data, thus, the only "save" way to avoid a "double"
call would be to change the implementation to wait until retention time
is over (default is 24h) -- this implies a way to high latency and also
result in out-of-order results.

To tackle this issue, you could implement a "de-duplication" operator
that consumes the join output stream. This stateful `Transfomer` could
buffer all "early" (msg,null) and (null,msg) record for some time to see
if there will be a "proper" join result later. Using punctuation you can
emit (msg,null)/(null,msg) join result if you think(!) there will be not
"proper" join result anymore.

Note, that there always might be a late join result, and thus, this
approach has it's own issues (of course, you could drop late "proper"
join result in case you did emit a (msg,null)/(null,msg) already.

Hope this helps.


-Matthias



On 11/9/17 2:53 PM, Thaler, Michael wrote:
> Hi all,
> 
> So let's say I have 2 topics coming that I want to join using KStream#join. I 
> set them up like so:
> 
> KStreamBuilder builder = new KStreamBuilder();
> KStream a = builder.stream(TOPIC_A);
> KStream b = builder.stream(TOPIC_B);
> 
> a.join(b, (msgA, msgB) -> msgA + msgB, 
> JoinWindows.of(TimeUnit.HOURS.inMillis(1))
>   .print();
> 
> So this works fine and joins the messages together. But what happens to 
> messages that don't find a join partner in the other topic within the window? 
> If I get a message in topic A and its partner doesn't occur in B, when and 
> how does the message get consumed? Is there a way to write my application so 
> that this is caught somehow and handled?
> 
> I'm aware that I could use a leftJoin instead, but that would call the merge 
> function twice, once with (msgA, null) and the second time with (msgA, msgB). 
> I'm trying to find a solution that only calls one or the other.
> 
> Is there a way to do this cleanly?
> 
> Thanks!
> --Michael Thaler
> 



signature.asc
Description: OpenPGP digital signature


Re: question about KIP-150 - Kafka-Streams Cogroup

2017-11-10 Thread Matt Farmer
The JIRA ticket for its implementation still appears to be open, so I'd
guess it's not in 1.0

On Fri, Nov 10, 2017 at 12:28 PM Artur Mrozowski  wrote:

> Hi,
> I have a question about  KIP-150. Has that functionality been released in
> version 1.0 or is it planned for version 1.1?
>
> Here it says 1.1
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams
>
> but in this blog post it seems to be part of 1.0.
> https://www.confluent.io/blog/apache-kafka-goes-1-0/
>
> Has anyone worked with it in current version 1.0 and could point me to some
> examples?
>
> Best Regards
> Artur Mrozowski
>


question about KIP-150 - Kafka-Streams Cogroup

2017-11-10 Thread Artur Mrozowski
Hi,
I have a question about  KIP-150. Has that functionality been released in
version 1.0 or is it planned for version 1.1?

Here it says 1.1
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams

but in this blog post it seems to be part of 1.0.
https://www.confluent.io/blog/apache-kafka-goes-1-0/

Has anyone worked with it in current version 1.0 and could point me to some
examples?

Best Regards
Artur Mrozowski


Re: Listeners and reference/docs

2017-11-10 Thread Kaufman Ng
This is related to another config "listener.security.protocol.map" (since
version 0.10.2.0). The CLIENT, PLAINTEXT, etc are defined as a
name-protocol mapping. So what you have in the listeners property (e.g.
CLIENT) must have an entry in the protocol map which determines which
protocol to use (e.g. CLIENT:SASL_PLAINTEXT).

The idea is that the same protocol can be used in multiple listeners. For
example you might have SASL/SSL in two listeners (different network
interface or port).

This is part of KIP-103, you can see more details there:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic

On Fri, Nov 10, 2017 at 10:10 AM, Thomas Stringer 
wrote:

> I've been working with Kafka broker listeners and I'm curious is there
> any documentation that explains what all of them apply to? Such as
> CLIENT, PLAINTEXT, SASL/SSL, etc. I see the encryption part of the
> documentation, but is it just inferred what these listeners apply to?
>
> Thank you in advance!
>



-- 
Kaufman Ng
+1 646 961 8063
Solutions Architect | Confluent | www.confluent.io


Re: why number of insync replicats is less than min.insync.replicas?

2017-11-10 Thread Vitaliy Semochkin
Thank you very much, Stanislav,

We managed to fix it by deleting everything from kafka data dir and
zookeeper data dir.

Regards,
Vitaliy.

On Fri, Nov 10, 2017 at 5:41 PM, Stas Chizhov  wrote:
> Hi, it looks like https://issues.apache.org/jira/browse/KAFKA-5970. Try
> restarting broker 1.
>
> Best regards,
> Stanislav.
>
> 2017-11-10 14:00 GMT+01:00 Vitaliy Semochkin :
>
>> Hi,
>>
>> I have a cluster with 3 brokers (0.11)
>> when I create a topic with min.insync.replicas=2 and replication-factor 2
>> I see number of  insync replicats in the created topic is less than
>> min.insync.replicas.
>> Why some partitions have less than 2 in sync repicas? How to prevent it?
>>
>> Here is the commands I used and the output.
>> kafka-topics.sh --create --topic test --replication-factor 2 --config
>> min.insync.replicas=2  --partitions 30 --zookeeper 127.0.0.1
>> kafka-topics.sh --describe --topic test  --zookeeper 127.0.0.1
>>
>> Topic:testPartitionCount:30ReplicationFactor:2
>> Configs:min.insync.replicas=2
>> Topic: highloadPartition: 0Leader: 2Replicas: 2,3Isr:
>> 2,3
>> Topic: highloadPartition: 1Leader: 3Replicas: 3,1Isr:
>> 3,1
>> Topic: highloadPartition: 2Leader: 1Replicas: 1,2Isr:
>> 1,2
>> Topic: highloadPartition: 3Leader: 2Replicas: 2,1Isr:
>> 2,1
>> Topic: highloadPartition: 4Leader: 3Replicas: 3,2Isr:
>> 3,2
>> Topic: highloadPartition: 5Leader: 1Replicas: 1,3Isr: 1
>> Topic: highloadPartition: 6Leader: 2Replicas: 2,3Isr:
>> 2,3
>> Topic: highloadPartition: 7Leader: 3Replicas: 3,1Isr:
>> 3,1
>> Topic: highloadPartition: 8Leader: 1Replicas: 1,2Isr:
>> 1,2
>> Topic: highloadPartition: 9Leader: 2Replicas: 2,1Isr:
>> 2,1
>> Topic: highloadPartition: 10Leader: 3Replicas: 3,2Isr:
>> 3,2
>> Topic: highloadPartition: 11Leader: 1Replicas: 1,3Isr:
>> 1
>> Topic: highloadPartition: 12Leader: 2Replicas: 2,3Isr:
>> 2,3
>> Topic: highloadPartition: 13Leader: 3Replicas: 3,1Isr:
>> 3,1
>> Topic: highloadPartition: 14Leader: 1Replicas: 1,2Isr:
>> 1,2
>> Topic: highloadPartition: 15Leader: 2Replicas: 2,1Isr:
>> 2,1
>> Topic: highloadPartition: 16Leader: 3Replicas: 3,2Isr:
>> 3,2
>> Topic: highloadPartition: 17Leader: 1Replicas: 1,3Isr:
>> 1
>> Topic: highloadPartition: 18Leader: 2Replicas: 2,3Isr:
>> 2,3
>> Topic: highloadPartition: 19Leader: 3Replicas: 3,1Isr:
>> 3,1
>> Topic: highloadPartition: 20Leader: 1Replicas: 1,2Isr:
>> 1,2
>> Topic: highloadPartition: 21Leader: 2Replicas: 2,1Isr:
>> 2,1
>> Topic: highloadPartition: 22Leader: 3Replicas: 3,2Isr:
>> 3,2
>> Topic: highloadPartition: 23Leader: 1Replicas: 1,3Isr:
>> 1
>> Topic: highloadPartition: 24Leader: 2Replicas: 2,3Isr:
>> 2,3
>> Topic: highloadPartition: 25Leader: 3Replicas: 3,1Isr:
>> 3,1
>> Topic: highloadPartition: 26Leader: 1Replicas: 1,2Isr:
>> 1,2
>> Topic: highloadPartition: 27Leader: 2Replicas: 2,1Isr:
>> 2,1
>> Topic: highloadPartition: 28Leader: 3Replicas: 3,2Isr:
>> 3,2
>> Topic: highloadPartition: 29Leader: 1Replicas: 1,3Isr:
>> 1
>>


Listeners and reference/docs

2017-11-10 Thread Thomas Stringer
I've been working with Kafka broker listeners and I'm curious is there
any documentation that explains what all of them apply to? Such as
CLIENT, PLAINTEXT, SASL/SSL, etc. I see the encryption part of the
documentation, but is it just inferred what these listeners apply to?

Thank you in advance!


Re: why number of insync replicats is less than min.insync.replicas?

2017-11-10 Thread Stas Chizhov
Hi, it looks like https://issues.apache.org/jira/browse/KAFKA-5970. Try
restarting broker 1.

Best regards,
Stanislav.

2017-11-10 14:00 GMT+01:00 Vitaliy Semochkin :

> Hi,
>
> I have a cluster with 3 brokers (0.11)
> when I create a topic with min.insync.replicas=2 and replication-factor 2
> I see number of  insync replicats in the created topic is less than
> min.insync.replicas.
> Why some partitions have less than 2 in sync repicas? How to prevent it?
>
> Here is the commands I used and the output.
> kafka-topics.sh --create --topic test --replication-factor 2 --config
> min.insync.replicas=2  --partitions 30 --zookeeper 127.0.0.1
> kafka-topics.sh --describe --topic test  --zookeeper 127.0.0.1
>
> Topic:testPartitionCount:30ReplicationFactor:2
> Configs:min.insync.replicas=2
> Topic: highloadPartition: 0Leader: 2Replicas: 2,3Isr:
> 2,3
> Topic: highloadPartition: 1Leader: 3Replicas: 3,1Isr:
> 3,1
> Topic: highloadPartition: 2Leader: 1Replicas: 1,2Isr:
> 1,2
> Topic: highloadPartition: 3Leader: 2Replicas: 2,1Isr:
> 2,1
> Topic: highloadPartition: 4Leader: 3Replicas: 3,2Isr:
> 3,2
> Topic: highloadPartition: 5Leader: 1Replicas: 1,3Isr: 1
> Topic: highloadPartition: 6Leader: 2Replicas: 2,3Isr:
> 2,3
> Topic: highloadPartition: 7Leader: 3Replicas: 3,1Isr:
> 3,1
> Topic: highloadPartition: 8Leader: 1Replicas: 1,2Isr:
> 1,2
> Topic: highloadPartition: 9Leader: 2Replicas: 2,1Isr:
> 2,1
> Topic: highloadPartition: 10Leader: 3Replicas: 3,2Isr:
> 3,2
> Topic: highloadPartition: 11Leader: 1Replicas: 1,3Isr:
> 1
> Topic: highloadPartition: 12Leader: 2Replicas: 2,3Isr:
> 2,3
> Topic: highloadPartition: 13Leader: 3Replicas: 3,1Isr:
> 3,1
> Topic: highloadPartition: 14Leader: 1Replicas: 1,2Isr:
> 1,2
> Topic: highloadPartition: 15Leader: 2Replicas: 2,1Isr:
> 2,1
> Topic: highloadPartition: 16Leader: 3Replicas: 3,2Isr:
> 3,2
> Topic: highloadPartition: 17Leader: 1Replicas: 1,3Isr:
> 1
> Topic: highloadPartition: 18Leader: 2Replicas: 2,3Isr:
> 2,3
> Topic: highloadPartition: 19Leader: 3Replicas: 3,1Isr:
> 3,1
> Topic: highloadPartition: 20Leader: 1Replicas: 1,2Isr:
> 1,2
> Topic: highloadPartition: 21Leader: 2Replicas: 2,1Isr:
> 2,1
> Topic: highloadPartition: 22Leader: 3Replicas: 3,2Isr:
> 3,2
> Topic: highloadPartition: 23Leader: 1Replicas: 1,3Isr:
> 1
> Topic: highloadPartition: 24Leader: 2Replicas: 2,3Isr:
> 2,3
> Topic: highloadPartition: 25Leader: 3Replicas: 3,1Isr:
> 3,1
> Topic: highloadPartition: 26Leader: 1Replicas: 1,2Isr:
> 1,2
> Topic: highloadPartition: 27Leader: 2Replicas: 2,1Isr:
> 2,1
> Topic: highloadPartition: 28Leader: 3Replicas: 3,2Isr:
> 3,2
> Topic: highloadPartition: 29Leader: 1Replicas: 1,3Isr:
> 1
>


why number of insync replicats is less than min.insync.replicas?

2017-11-10 Thread Vitaliy Semochkin
Hi,

I have a cluster with 3 brokers (0.11)
when I create a topic with min.insync.replicas=2 and replication-factor 2
I see number of  insync replicats in the created topic is less than
min.insync.replicas.
Why some partitions have less than 2 in sync repicas? How to prevent it?

Here is the commands I used and the output.
kafka-topics.sh --create --topic test --replication-factor 2 --config
min.insync.replicas=2  --partitions 30 --zookeeper 127.0.0.1
kafka-topics.sh --describe --topic test  --zookeeper 127.0.0.1

Topic:testPartitionCount:30ReplicationFactor:2
Configs:min.insync.replicas=2
Topic: highloadPartition: 0Leader: 2Replicas: 2,3Isr: 2,3
Topic: highloadPartition: 1Leader: 3Replicas: 3,1Isr: 3,1
Topic: highloadPartition: 2Leader: 1Replicas: 1,2Isr: 1,2
Topic: highloadPartition: 3Leader: 2Replicas: 2,1Isr: 2,1
Topic: highloadPartition: 4Leader: 3Replicas: 3,2Isr: 3,2
Topic: highloadPartition: 5Leader: 1Replicas: 1,3Isr: 1
Topic: highloadPartition: 6Leader: 2Replicas: 2,3Isr: 2,3
Topic: highloadPartition: 7Leader: 3Replicas: 3,1Isr: 3,1
Topic: highloadPartition: 8Leader: 1Replicas: 1,2Isr: 1,2
Topic: highloadPartition: 9Leader: 2Replicas: 2,1Isr: 2,1
Topic: highloadPartition: 10Leader: 3Replicas: 3,2Isr: 3,2
Topic: highloadPartition: 11Leader: 1Replicas: 1,3Isr: 1
Topic: highloadPartition: 12Leader: 2Replicas: 2,3Isr: 2,3
Topic: highloadPartition: 13Leader: 3Replicas: 3,1Isr: 3,1
Topic: highloadPartition: 14Leader: 1Replicas: 1,2Isr: 1,2
Topic: highloadPartition: 15Leader: 2Replicas: 2,1Isr: 2,1
Topic: highloadPartition: 16Leader: 3Replicas: 3,2Isr: 3,2
Topic: highloadPartition: 17Leader: 1Replicas: 1,3Isr: 1
Topic: highloadPartition: 18Leader: 2Replicas: 2,3Isr: 2,3
Topic: highloadPartition: 19Leader: 3Replicas: 3,1Isr: 3,1
Topic: highloadPartition: 20Leader: 1Replicas: 1,2Isr: 1,2
Topic: highloadPartition: 21Leader: 2Replicas: 2,1Isr: 2,1
Topic: highloadPartition: 22Leader: 3Replicas: 3,2Isr: 3,2
Topic: highloadPartition: 23Leader: 1Replicas: 1,3Isr: 1
Topic: highloadPartition: 24Leader: 2Replicas: 2,3Isr: 2,3
Topic: highloadPartition: 25Leader: 3Replicas: 3,1Isr: 3,1
Topic: highloadPartition: 26Leader: 1Replicas: 1,2Isr: 1,2
Topic: highloadPartition: 27Leader: 2Replicas: 2,1Isr: 2,1
Topic: highloadPartition: 28Leader: 3Replicas: 3,2Isr: 3,2
Topic: highloadPartition: 29Leader: 1Replicas: 1,3Isr: 1


Re: kafka streams with multiple threads and state store

2017-11-10 Thread Damian Guy
Hi Ranjit, it sounds like you might want to use a global table for this.
You can use StreamsBuilder#globalTable(String, Materialized) to create the
global table. You could do something like:

KeyValueBytesStoreSupplier supplier =
Stores.inMemoryKeyValueStore("global-store");
Materialized>
materialized = Materialized.as(supplier);
builder.globalTable("topic",
materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));


On Fri, 10 Nov 2017 at 09:24 Ranjit Kumar  wrote:

> Hi Guozhang,
>
> Thanks for the information.
>
> My requirement is some thing like this.
>
> 1. i want to read the data from one topic (which is continuously feeding),
> so i though of using the kafka streams with threads
> 2. want to store the data in one in memory data base (not the local data
> store per thread)
>
> If i have to write my own Statestore logic with handling of synchronization
> is it equal to having my own global data structure in all threads ?
>
> Any performance impact will be their with our own sync ? Can you pelase
> share if you have any sample programs or links describing on this .
>
> Thanks & Regards,
> Ranjit
>
> On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang  wrote:
>
> > Ranjit,
> >
> > Note that the "testStore" instance you are passing is a
> StateStoreSupplier
> > which will generate a new StateStore instance for each thread's task.
> >
> > If you really want to have all the thread's share the same state store
> you
> > should implement your own StateStoreSupplier that only return the same
> > StateStore instance in its "get()" call; however, keep in mind that in
> this
> > case this state store could be concurrently accessed by multi-threads
> which
> > is not protected by the library itself (by default single-thread access
> is
> > guaranteed on the state stores).
> >
> >
> > Guozhang
> >
> > On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar 
> wrote:
> >
> > > Hi All,
> > >
> > > I want to use one state store in all my kafka stream threads in my
> > > application, how can i do it.
> > >
> > > 1. i created one topic (name: test2) with 3 partitions .
> > > 2. wrote kafka stream with num.stream.threads = 3 in java code
> > > 3. using state store (name: count2) in my application.
> > >
> > > But state store (count2) is acting like local to thread, but it should
> be
> > > unique to entire application and the same value to be reflected every
> > where
> > > how can i do it ?
> > >
> > > Do i need to take care any synch also ?
> > >
> > > Code:
> > > 
> > > package com.javatpoint;
> > > import org.apache.kafka.common.serialization.Serdes;
> > > import org.apache.kafka.streams.KafkaStreams;
> > > import org.apache.kafka.streams.StreamsConfig;
> > > import org.apache.kafka.streams.processor.Processor;
> > > import org.apache.kafka.streams.processor.ProcessorContext;
> > > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > > import org.apache.kafka.streams.processor.TopologyBuilder;
> > > import org.apache.kafka.streams.state.Stores;
> > >
> > > import org.apache.kafka.streams.kstream.KStreamBuilder;
> > > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > > import org.apache.kafka.streams.state.KeyValueStore;
> > >
> > > import java.util.Properties;
> > > import java.lang.*;
> > >
> > > /**
> > >  * Hello world!
> > >  *
> > >  */
> > > public class App
> > > {
> > > public static void main( String[] args )
> > > {
> > > /*StateStoreSupplier testStore = Stores.create("count2")
> > > .withKeys(Serdes.String())
> > > .withValues(Serdes.Long())
> > > .persistent()
> > > .build();*/
> > > StateStoreSupplier testStore = Stores.create("count2")
> > > .withStringKeys()
> > > .withLongValues()
> > > .persistent()
> > > .build();
> > >
> > > //TopologyBuilder builder = new TopologyBuilder();
> > > final KStreamBuilder builder = new KStreamBuilder();
> > >
> > > builder.addSource("source", "test2").addProcessor("process",
> > > TestProcessor::new, "source").addStateStore(testStore, "process");
> > >
> > > Properties props = new Properties();
> > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
> > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > > "localhost:9092");
> > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass());
> > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.String().getClass());
> > > //props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > Serdes.ByteArray().getClass().getName());
> > > //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > Serdes.ByteArray().getClass().getName());
> > >
> > > props.put("auto.offset.reset", "latest");
> > > 

Kafka Data Directory

2017-11-10 Thread Rishikesh Gawade
Hello there!
I am newbie in the world of Apache Kafka and I have just implemented a
standalone cluster of Kafka. I also tested it by running one producer
process and 2 consumer processes and no issues were found.
However, I would just like to know in which directory in the
LocalFileSystem the data exchanged between consumers and producers is
stored. Is that data readable through a normal text editor?
Also, how do I read all the logs in the directory pointed by log.dir in
server.properties configuration file?
Thank you.
Regards,
Rishikesh Gawade


Re: kafka streams with multiple threads and state store

2017-11-10 Thread Ranjit Kumar
Hi Guozhang,

Thanks for the information.

My requirement is some thing like this.

1. i want to read the data from one topic (which is continuously feeding),
so i though of using the kafka streams with threads
2. want to store the data in one in memory data base (not the local data
store per thread)

If i have to write my own Statestore logic with handling of synchronization
is it equal to having my own global data structure in all threads ?

Any performance impact will be their with our own sync ? Can you pelase
share if you have any sample programs or links describing on this .

Thanks & Regards,
Ranjit

On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang  wrote:

> Ranjit,
>
> Note that the "testStore" instance you are passing is a StateStoreSupplier
> which will generate a new StateStore instance for each thread's task.
>
> If you really want to have all the thread's share the same state store you
> should implement your own StateStoreSupplier that only return the same
> StateStore instance in its "get()" call; however, keep in mind that in this
> case this state store could be concurrently accessed by multi-threads which
> is not protected by the library itself (by default single-thread access is
> guaranteed on the state stores).
>
>
> Guozhang
>
> On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar  wrote:
>
> > Hi All,
> >
> > I want to use one state store in all my kafka stream threads in my
> > application, how can i do it.
> >
> > 1. i created one topic (name: test2) with 3 partitions .
> > 2. wrote kafka stream with num.stream.threads = 3 in java code
> > 3. using state store (name: count2) in my application.
> >
> > But state store (count2) is acting like local to thread, but it should be
> > unique to entire application and the same value to be reflected every
> where
> > how can i do it ?
> >
> > Do i need to take care any synch also ?
> >
> > Code:
> > 
> > package com.javatpoint;
> > import org.apache.kafka.common.serialization.Serdes;
> > import org.apache.kafka.streams.KafkaStreams;
> > import org.apache.kafka.streams.StreamsConfig;
> > import org.apache.kafka.streams.processor.Processor;
> > import org.apache.kafka.streams.processor.ProcessorContext;
> > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > import org.apache.kafka.streams.processor.TopologyBuilder;
> > import org.apache.kafka.streams.state.Stores;
> >
> > import org.apache.kafka.streams.kstream.KStreamBuilder;
> > import org.apache.kafka.streams.processor.StateStoreSupplier;
> > import org.apache.kafka.streams.state.KeyValueStore;
> >
> > import java.util.Properties;
> > import java.lang.*;
> >
> > /**
> >  * Hello world!
> >  *
> >  */
> > public class App
> > {
> > public static void main( String[] args )
> > {
> > /*StateStoreSupplier testStore = Stores.create("count2")
> > .withKeys(Serdes.String())
> > .withValues(Serdes.Long())
> > .persistent()
> > .build();*/
> > StateStoreSupplier testStore = Stores.create("count2")
> > .withStringKeys()
> > .withLongValues()
> > .persistent()
> > .build();
> >
> > //TopologyBuilder builder = new TopologyBuilder();
> > final KStreamBuilder builder = new KStreamBuilder();
> >
> > builder.addSource("source", "test2").addProcessor("process",
> > TestProcessor::new, "source").addStateStore(testStore, "process");
> >
> > Properties props = new Properties();
> > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1");
> > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> > "localhost:9092");
> > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass());
> > //props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > Serdes.ByteArray().getClass().getName());
> > //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.ByteArray().getClass().getName());
> >
> > props.put("auto.offset.reset", "latest");
> > props.put("num.stream.threads", 3);
> >
> > System.out.println("test1");
> > KafkaStreams streams = new KafkaStreams(builder, props);
> > System.out.println("test2");
> > streams.start();
> > }
> >
> > //public static class TestProcessor implements Processor > byte[]> {
> > public static class TestProcessor implements Processor String>
> > {
> >  private  KeyValueStore kvStore;
> > private ProcessorContext context;
> >
> > @Override
> > public void init(ProcessorContext context) {
> > this.context = context;
> > //context.getStateStore("count2");
> > System.out.println("Initialized");
> > this.kvStore =