Re: question about KIP-150 - Kafka-Streams Cogroup
Matt is right. KIP-150 did not make it to 1.0.0. So it may need to wait for the next minor release (1.1.0). Guozhang On Fri, Nov 10, 2017 at 10:25 AM, Matt Farmerwrote: > The JIRA ticket for its implementation still appears to be open, so I'd > guess it's not in 1.0 > > On Fri, Nov 10, 2017 at 12:28 PM Artur Mrozowski wrote: > > > Hi, > > I have a question about KIP-150. Has that functionality been released in > > version 1.0 or is it planned for version 1.1? > > > > Here it says 1.1 > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams > > > > but in this blog post it seems to be part of 1.0. > > https://www.confluent.io/blog/apache-kafka-goes-1-0/ > > > > Has anyone worked with it in current version 1.0 and could point me to > some > > examples? > > > > Best Regards > > Artur Mrozowski > > > -- -- Guozhang
Re: [VOTE] 0.11.0.2 RC0
Resending to include kafka-clients. On Sat, Nov 11, 2017 at 12:37 AM, Rajini Sivaramwrote: > Hello Kafka users, developers and client-developers, > > > This is the first candidate for release of Apache Kafka 0.11.0.2. > > > This is a bug fix release and it includes fixes and improvements from 16 > JIRAs, > including a few critical bugs. > > > Release notes for the 0.11.0.2 release: > > http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/RELEASE_NOTES.html > > > *** Please download, test and vote by Wednesday the 15th of November, 8PM > PT > > > Kafka's KEYS file containing PGP keys we use to sign the release: > > http://kafka.apache.org/KEYS > > > * Release artifacts to be voted upon (source and binary): > > http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/ > > > * Maven artifacts to be voted upon: > > https://repository.apache.org/content/groups/staging/ > > > * Javadoc: > > http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/javadoc/ > > > * Tag to be voted upon (off 0.11.0 branch) is the 0.11.0.2 tag: > > https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h= > 25639822d6e23803c599cba35ad3dc1a2817b404 > > > > * Documentation: > > Note the documentation can't be pushed live due to changes that will not > go live until the release. You can manually verify by downloading > > http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/ > kafka_2.11-0.11.0.2-site-docs.tgz > > > > * Protocol: > > http://kafka.apache.org/0110/protocol.html > > > * Successful Jenkins builds for the 0.11.0 branch: > > Unit/integration tests: https://builds.apache. > org/job/kafka-0.11.0-jdk7/333/ > > > > > Thanks, > > > Rajini > >
[VOTE] 0.11.0.2 RC0
Hello Kafka users, developers and client-developers, This is the first candidate for release of Apache Kafka 0.11.0.2. This is a bug fix release and it includes fixes and improvements from 16 JIRAs, including a few critical bugs. Release notes for the 0.11.0.2 release: http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/RELEASE_NOTES.html *** Please download, test and vote by Wednesday the 15th of November, 8PM PT Kafka's KEYS file containing PGP keys we use to sign the release: http://kafka.apache.org/KEYS * Release artifacts to be voted upon (source and binary): http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/ * Maven artifacts to be voted upon: https://repository.apache.org/content/groups/staging/ * Javadoc: http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/javadoc/ * Tag to be voted upon (off 0.11.0 branch) is the 0.11.0.2 tag: https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=25639822d6e23803c599cba35ad3dc1a2817b404 * Documentation: Note the documentation can't be pushed live due to changes that will not go live until the release. You can manually verify by downloading http://home.apache.org/~rsivaram/kafka-0.11.0.2-rc0/kafka_2.11-0.11.0.2-site-docs.tgz * Protocol: http://kafka.apache.org/0110/protocol.html * Successful Jenkins builds for the 0.11.0 branch: Unit/integration tests: https://builds.apache.org/job/kafka-0.11.0-jdk7/333/ Thanks, Rajini
Re: Listeners and reference/docs
Yep I'm familiar with that. Just curious where it's documented that, for instance, the CLIENT listener is for client connections. On Fri, Nov 10, 2017, 12:08 PM Kaufman Ngwrote: > This is related to another config "listener.security.protocol.map" (since > version 0.10.2.0). The CLIENT, PLAINTEXT, etc are defined as a > name-protocol mapping. So what you have in the listeners property (e.g. > CLIENT) must have an entry in the protocol map which determines which > protocol to use (e.g. CLIENT:SASL_PLAINTEXT). > > The idea is that the same protocol can be used in multiple listeners. For > example you might have SASL/SSL in two listeners (different network > interface or port). > > This is part of KIP-103, you can see more details there: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic > > On Fri, Nov 10, 2017 at 10:10 AM, Thomas Stringer > wrote: > > > I've been working with Kafka broker listeners and I'm curious is there > > any documentation that explains what all of them apply to? Such as > > CLIENT, PLAINTEXT, SASL/SSL, etc. I see the encryption part of the > > documentation, but is it just inferred what these listeners apply to? > > > > Thank you in advance! > > > > > > -- > Kaufman Ng > +1 646 961 8063 > Solutions Architect | Confluent | www.confluent.io >
Re: How do I gracefully handle stream joins where the other side never appears?
Messages that don't find a join partner are dropped. For each incoming message, we do the following: 1. insert it into it's window store 2. lookup other window store for matching record a) if matching records are found, compute join and emit Note, that we maintain all records in the window store until retention time passes. Thus, if there will be no matching join record, a record will eventually be dropped. There is no API to be notified about this. With regard to left/outer join: your observation is correct -- we need to implement it this way, as it's unclear for how long to delay the computation and to wait for a matching record -- note, that Streams is able to handle late data, thus, the only "save" way to avoid a "double" call would be to change the implementation to wait until retention time is over (default is 24h) -- this implies a way to high latency and also result in out-of-order results. To tackle this issue, you could implement a "de-duplication" operator that consumes the join output stream. This stateful `Transfomer` could buffer all "early" (msg,null) and (null,msg) record for some time to see if there will be a "proper" join result later. Using punctuation you can emit (msg,null)/(null,msg) join result if you think(!) there will be not "proper" join result anymore. Note, that there always might be a late join result, and thus, this approach has it's own issues (of course, you could drop late "proper" join result in case you did emit a (msg,null)/(null,msg) already. Hope this helps. -Matthias On 11/9/17 2:53 PM, Thaler, Michael wrote: > Hi all, > > So let's say I have 2 topics coming that I want to join using KStream#join. I > set them up like so: > > KStreamBuilder builder = new KStreamBuilder(); > KStreama = builder.stream(TOPIC_A); > KStream b = builder.stream(TOPIC_B); > > a.join(b, (msgA, msgB) -> msgA + msgB, > JoinWindows.of(TimeUnit.HOURS.inMillis(1)) > .print(); > > So this works fine and joins the messages together. But what happens to > messages that don't find a join partner in the other topic within the window? > If I get a message in topic A and its partner doesn't occur in B, when and > how does the message get consumed? Is there a way to write my application so > that this is caught somehow and handled? > > I'm aware that I could use a leftJoin instead, but that would call the merge > function twice, once with (msgA, null) and the second time with (msgA, msgB). > I'm trying to find a solution that only calls one or the other. > > Is there a way to do this cleanly? > > Thanks! > --Michael Thaler > signature.asc Description: OpenPGP digital signature
Re: question about KIP-150 - Kafka-Streams Cogroup
The JIRA ticket for its implementation still appears to be open, so I'd guess it's not in 1.0 On Fri, Nov 10, 2017 at 12:28 PM Artur Mrozowskiwrote: > Hi, > I have a question about KIP-150. Has that functionality been released in > version 1.0 or is it planned for version 1.1? > > Here it says 1.1 > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams > > but in this blog post it seems to be part of 1.0. > https://www.confluent.io/blog/apache-kafka-goes-1-0/ > > Has anyone worked with it in current version 1.0 and could point me to some > examples? > > Best Regards > Artur Mrozowski >
question about KIP-150 - Kafka-Streams Cogroup
Hi, I have a question about KIP-150. Has that functionality been released in version 1.0 or is it planned for version 1.1? Here it says 1.1 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams but in this blog post it seems to be part of 1.0. https://www.confluent.io/blog/apache-kafka-goes-1-0/ Has anyone worked with it in current version 1.0 and could point me to some examples? Best Regards Artur Mrozowski
Re: Listeners and reference/docs
This is related to another config "listener.security.protocol.map" (since version 0.10.2.0). The CLIENT, PLAINTEXT, etc are defined as a name-protocol mapping. So what you have in the listeners property (e.g. CLIENT) must have an entry in the protocol map which determines which protocol to use (e.g. CLIENT:SASL_PLAINTEXT). The idea is that the same protocol can be used in multiple listeners. For example you might have SASL/SSL in two listeners (different network interface or port). This is part of KIP-103, you can see more details there: https://cwiki.apache.org/confluence/display/KAFKA/KIP-103%3A+Separation+of+Internal+and+External+traffic On Fri, Nov 10, 2017 at 10:10 AM, Thomas Stringerwrote: > I've been working with Kafka broker listeners and I'm curious is there > any documentation that explains what all of them apply to? Such as > CLIENT, PLAINTEXT, SASL/SSL, etc. I see the encryption part of the > documentation, but is it just inferred what these listeners apply to? > > Thank you in advance! > -- Kaufman Ng +1 646 961 8063 Solutions Architect | Confluent | www.confluent.io
Re: why number of insync replicats is less than min.insync.replicas?
Thank you very much, Stanislav, We managed to fix it by deleting everything from kafka data dir and zookeeper data dir. Regards, Vitaliy. On Fri, Nov 10, 2017 at 5:41 PM, Stas Chizhovwrote: > Hi, it looks like https://issues.apache.org/jira/browse/KAFKA-5970. Try > restarting broker 1. > > Best regards, > Stanislav. > > 2017-11-10 14:00 GMT+01:00 Vitaliy Semochkin : > >> Hi, >> >> I have a cluster with 3 brokers (0.11) >> when I create a topic with min.insync.replicas=2 and replication-factor 2 >> I see number of insync replicats in the created topic is less than >> min.insync.replicas. >> Why some partitions have less than 2 in sync repicas? How to prevent it? >> >> Here is the commands I used and the output. >> kafka-topics.sh --create --topic test --replication-factor 2 --config >> min.insync.replicas=2 --partitions 30 --zookeeper 127.0.0.1 >> kafka-topics.sh --describe --topic test --zookeeper 127.0.0.1 >> >> Topic:testPartitionCount:30ReplicationFactor:2 >> Configs:min.insync.replicas=2 >> Topic: highloadPartition: 0Leader: 2Replicas: 2,3Isr: >> 2,3 >> Topic: highloadPartition: 1Leader: 3Replicas: 3,1Isr: >> 3,1 >> Topic: highloadPartition: 2Leader: 1Replicas: 1,2Isr: >> 1,2 >> Topic: highloadPartition: 3Leader: 2Replicas: 2,1Isr: >> 2,1 >> Topic: highloadPartition: 4Leader: 3Replicas: 3,2Isr: >> 3,2 >> Topic: highloadPartition: 5Leader: 1Replicas: 1,3Isr: 1 >> Topic: highloadPartition: 6Leader: 2Replicas: 2,3Isr: >> 2,3 >> Topic: highloadPartition: 7Leader: 3Replicas: 3,1Isr: >> 3,1 >> Topic: highloadPartition: 8Leader: 1Replicas: 1,2Isr: >> 1,2 >> Topic: highloadPartition: 9Leader: 2Replicas: 2,1Isr: >> 2,1 >> Topic: highloadPartition: 10Leader: 3Replicas: 3,2Isr: >> 3,2 >> Topic: highloadPartition: 11Leader: 1Replicas: 1,3Isr: >> 1 >> Topic: highloadPartition: 12Leader: 2Replicas: 2,3Isr: >> 2,3 >> Topic: highloadPartition: 13Leader: 3Replicas: 3,1Isr: >> 3,1 >> Topic: highloadPartition: 14Leader: 1Replicas: 1,2Isr: >> 1,2 >> Topic: highloadPartition: 15Leader: 2Replicas: 2,1Isr: >> 2,1 >> Topic: highloadPartition: 16Leader: 3Replicas: 3,2Isr: >> 3,2 >> Topic: highloadPartition: 17Leader: 1Replicas: 1,3Isr: >> 1 >> Topic: highloadPartition: 18Leader: 2Replicas: 2,3Isr: >> 2,3 >> Topic: highloadPartition: 19Leader: 3Replicas: 3,1Isr: >> 3,1 >> Topic: highloadPartition: 20Leader: 1Replicas: 1,2Isr: >> 1,2 >> Topic: highloadPartition: 21Leader: 2Replicas: 2,1Isr: >> 2,1 >> Topic: highloadPartition: 22Leader: 3Replicas: 3,2Isr: >> 3,2 >> Topic: highloadPartition: 23Leader: 1Replicas: 1,3Isr: >> 1 >> Topic: highloadPartition: 24Leader: 2Replicas: 2,3Isr: >> 2,3 >> Topic: highloadPartition: 25Leader: 3Replicas: 3,1Isr: >> 3,1 >> Topic: highloadPartition: 26Leader: 1Replicas: 1,2Isr: >> 1,2 >> Topic: highloadPartition: 27Leader: 2Replicas: 2,1Isr: >> 2,1 >> Topic: highloadPartition: 28Leader: 3Replicas: 3,2Isr: >> 3,2 >> Topic: highloadPartition: 29Leader: 1Replicas: 1,3Isr: >> 1 >>
Listeners and reference/docs
I've been working with Kafka broker listeners and I'm curious is there any documentation that explains what all of them apply to? Such as CLIENT, PLAINTEXT, SASL/SSL, etc. I see the encryption part of the documentation, but is it just inferred what these listeners apply to? Thank you in advance!
Re: why number of insync replicats is less than min.insync.replicas?
Hi, it looks like https://issues.apache.org/jira/browse/KAFKA-5970. Try restarting broker 1. Best regards, Stanislav. 2017-11-10 14:00 GMT+01:00 Vitaliy Semochkin: > Hi, > > I have a cluster with 3 brokers (0.11) > when I create a topic with min.insync.replicas=2 and replication-factor 2 > I see number of insync replicats in the created topic is less than > min.insync.replicas. > Why some partitions have less than 2 in sync repicas? How to prevent it? > > Here is the commands I used and the output. > kafka-topics.sh --create --topic test --replication-factor 2 --config > min.insync.replicas=2 --partitions 30 --zookeeper 127.0.0.1 > kafka-topics.sh --describe --topic test --zookeeper 127.0.0.1 > > Topic:testPartitionCount:30ReplicationFactor:2 > Configs:min.insync.replicas=2 > Topic: highloadPartition: 0Leader: 2Replicas: 2,3Isr: > 2,3 > Topic: highloadPartition: 1Leader: 3Replicas: 3,1Isr: > 3,1 > Topic: highloadPartition: 2Leader: 1Replicas: 1,2Isr: > 1,2 > Topic: highloadPartition: 3Leader: 2Replicas: 2,1Isr: > 2,1 > Topic: highloadPartition: 4Leader: 3Replicas: 3,2Isr: > 3,2 > Topic: highloadPartition: 5Leader: 1Replicas: 1,3Isr: 1 > Topic: highloadPartition: 6Leader: 2Replicas: 2,3Isr: > 2,3 > Topic: highloadPartition: 7Leader: 3Replicas: 3,1Isr: > 3,1 > Topic: highloadPartition: 8Leader: 1Replicas: 1,2Isr: > 1,2 > Topic: highloadPartition: 9Leader: 2Replicas: 2,1Isr: > 2,1 > Topic: highloadPartition: 10Leader: 3Replicas: 3,2Isr: > 3,2 > Topic: highloadPartition: 11Leader: 1Replicas: 1,3Isr: > 1 > Topic: highloadPartition: 12Leader: 2Replicas: 2,3Isr: > 2,3 > Topic: highloadPartition: 13Leader: 3Replicas: 3,1Isr: > 3,1 > Topic: highloadPartition: 14Leader: 1Replicas: 1,2Isr: > 1,2 > Topic: highloadPartition: 15Leader: 2Replicas: 2,1Isr: > 2,1 > Topic: highloadPartition: 16Leader: 3Replicas: 3,2Isr: > 3,2 > Topic: highloadPartition: 17Leader: 1Replicas: 1,3Isr: > 1 > Topic: highloadPartition: 18Leader: 2Replicas: 2,3Isr: > 2,3 > Topic: highloadPartition: 19Leader: 3Replicas: 3,1Isr: > 3,1 > Topic: highloadPartition: 20Leader: 1Replicas: 1,2Isr: > 1,2 > Topic: highloadPartition: 21Leader: 2Replicas: 2,1Isr: > 2,1 > Topic: highloadPartition: 22Leader: 3Replicas: 3,2Isr: > 3,2 > Topic: highloadPartition: 23Leader: 1Replicas: 1,3Isr: > 1 > Topic: highloadPartition: 24Leader: 2Replicas: 2,3Isr: > 2,3 > Topic: highloadPartition: 25Leader: 3Replicas: 3,1Isr: > 3,1 > Topic: highloadPartition: 26Leader: 1Replicas: 1,2Isr: > 1,2 > Topic: highloadPartition: 27Leader: 2Replicas: 2,1Isr: > 2,1 > Topic: highloadPartition: 28Leader: 3Replicas: 3,2Isr: > 3,2 > Topic: highloadPartition: 29Leader: 1Replicas: 1,3Isr: > 1 >
why number of insync replicats is less than min.insync.replicas?
Hi, I have a cluster with 3 brokers (0.11) when I create a topic with min.insync.replicas=2 and replication-factor 2 I see number of insync replicats in the created topic is less than min.insync.replicas. Why some partitions have less than 2 in sync repicas? How to prevent it? Here is the commands I used and the output. kafka-topics.sh --create --topic test --replication-factor 2 --config min.insync.replicas=2 --partitions 30 --zookeeper 127.0.0.1 kafka-topics.sh --describe --topic test --zookeeper 127.0.0.1 Topic:testPartitionCount:30ReplicationFactor:2 Configs:min.insync.replicas=2 Topic: highloadPartition: 0Leader: 2Replicas: 2,3Isr: 2,3 Topic: highloadPartition: 1Leader: 3Replicas: 3,1Isr: 3,1 Topic: highloadPartition: 2Leader: 1Replicas: 1,2Isr: 1,2 Topic: highloadPartition: 3Leader: 2Replicas: 2,1Isr: 2,1 Topic: highloadPartition: 4Leader: 3Replicas: 3,2Isr: 3,2 Topic: highloadPartition: 5Leader: 1Replicas: 1,3Isr: 1 Topic: highloadPartition: 6Leader: 2Replicas: 2,3Isr: 2,3 Topic: highloadPartition: 7Leader: 3Replicas: 3,1Isr: 3,1 Topic: highloadPartition: 8Leader: 1Replicas: 1,2Isr: 1,2 Topic: highloadPartition: 9Leader: 2Replicas: 2,1Isr: 2,1 Topic: highloadPartition: 10Leader: 3Replicas: 3,2Isr: 3,2 Topic: highloadPartition: 11Leader: 1Replicas: 1,3Isr: 1 Topic: highloadPartition: 12Leader: 2Replicas: 2,3Isr: 2,3 Topic: highloadPartition: 13Leader: 3Replicas: 3,1Isr: 3,1 Topic: highloadPartition: 14Leader: 1Replicas: 1,2Isr: 1,2 Topic: highloadPartition: 15Leader: 2Replicas: 2,1Isr: 2,1 Topic: highloadPartition: 16Leader: 3Replicas: 3,2Isr: 3,2 Topic: highloadPartition: 17Leader: 1Replicas: 1,3Isr: 1 Topic: highloadPartition: 18Leader: 2Replicas: 2,3Isr: 2,3 Topic: highloadPartition: 19Leader: 3Replicas: 3,1Isr: 3,1 Topic: highloadPartition: 20Leader: 1Replicas: 1,2Isr: 1,2 Topic: highloadPartition: 21Leader: 2Replicas: 2,1Isr: 2,1 Topic: highloadPartition: 22Leader: 3Replicas: 3,2Isr: 3,2 Topic: highloadPartition: 23Leader: 1Replicas: 1,3Isr: 1 Topic: highloadPartition: 24Leader: 2Replicas: 2,3Isr: 2,3 Topic: highloadPartition: 25Leader: 3Replicas: 3,1Isr: 3,1 Topic: highloadPartition: 26Leader: 1Replicas: 1,2Isr: 1,2 Topic: highloadPartition: 27Leader: 2Replicas: 2,1Isr: 2,1 Topic: highloadPartition: 28Leader: 3Replicas: 3,2Isr: 3,2 Topic: highloadPartition: 29Leader: 1Replicas: 1,3Isr: 1
Re: kafka streams with multiple threads and state store
Hi Ranjit, it sounds like you might want to use a global table for this. You can use StreamsBuilder#globalTable(String, Materialized) to create the global table. You could do something like: KeyValueBytesStoreSupplier supplier = Stores.inMemoryKeyValueStore("global-store"); Materialized> materialized = Materialized.as(supplier); builder.globalTable("topic", materialized.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())); On Fri, 10 Nov 2017 at 09:24 Ranjit Kumar wrote: > Hi Guozhang, > > Thanks for the information. > > My requirement is some thing like this. > > 1. i want to read the data from one topic (which is continuously feeding), > so i though of using the kafka streams with threads > 2. want to store the data in one in memory data base (not the local data > store per thread) > > If i have to write my own Statestore logic with handling of synchronization > is it equal to having my own global data structure in all threads ? > > Any performance impact will be their with our own sync ? Can you pelase > share if you have any sample programs or links describing on this . > > Thanks & Regards, > Ranjit > > On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wang wrote: > > > Ranjit, > > > > Note that the "testStore" instance you are passing is a > StateStoreSupplier > > which will generate a new StateStore instance for each thread's task. > > > > If you really want to have all the thread's share the same state store > you > > should implement your own StateStoreSupplier that only return the same > > StateStore instance in its "get()" call; however, keep in mind that in > this > > case this state store could be concurrently accessed by multi-threads > which > > is not protected by the library itself (by default single-thread access > is > > guaranteed on the state stores). > > > > > > Guozhang > > > > On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar > wrote: > > > > > Hi All, > > > > > > I want to use one state store in all my kafka stream threads in my > > > application, how can i do it. > > > > > > 1. i created one topic (name: test2) with 3 partitions . > > > 2. wrote kafka stream with num.stream.threads = 3 in java code > > > 3. using state store (name: count2) in my application. > > > > > > But state store (count2) is acting like local to thread, but it should > be > > > unique to entire application and the same value to be reflected every > > where > > > how can i do it ? > > > > > > Do i need to take care any synch also ? > > > > > > Code: > > > > > > package com.javatpoint; > > > import org.apache.kafka.common.serialization.Serdes; > > > import org.apache.kafka.streams.KafkaStreams; > > > import org.apache.kafka.streams.StreamsConfig; > > > import org.apache.kafka.streams.processor.Processor; > > > import org.apache.kafka.streams.processor.ProcessorContext; > > > import org.apache.kafka.streams.processor.StateStoreSupplier; > > > import org.apache.kafka.streams.processor.TopologyBuilder; > > > import org.apache.kafka.streams.state.Stores; > > > > > > import org.apache.kafka.streams.kstream.KStreamBuilder; > > > import org.apache.kafka.streams.processor.StateStoreSupplier; > > > import org.apache.kafka.streams.state.KeyValueStore; > > > > > > import java.util.Properties; > > > import java.lang.*; > > > > > > /** > > > * Hello world! > > > * > > > */ > > > public class App > > > { > > > public static void main( String[] args ) > > > { > > > /*StateStoreSupplier testStore = Stores.create("count2") > > > .withKeys(Serdes.String()) > > > .withValues(Serdes.Long()) > > > .persistent() > > > .build();*/ > > > StateStoreSupplier testStore = Stores.create("count2") > > > .withStringKeys() > > > .withLongValues() > > > .persistent() > > > .build(); > > > > > > //TopologyBuilder builder = new TopologyBuilder(); > > > final KStreamBuilder builder = new KStreamBuilder(); > > > > > > builder.addSource("source", "test2").addProcessor("process", > > > TestProcessor::new, "source").addStateStore(testStore, "process"); > > > > > > Properties props = new Properties(); > > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1"); > > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > > "localhost:9092"); > > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > Serdes.String().getClass()); > > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > > Serdes.String().getClass()); > > > //props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > > Serdes.ByteArray().getClass().getName()); > > > //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > > Serdes.ByteArray().getClass().getName()); > > > > > > props.put("auto.offset.reset", "latest"); > > >
Kafka Data Directory
Hello there! I am newbie in the world of Apache Kafka and I have just implemented a standalone cluster of Kafka. I also tested it by running one producer process and 2 consumer processes and no issues were found. However, I would just like to know in which directory in the LocalFileSystem the data exchanged between consumers and producers is stored. Is that data readable through a normal text editor? Also, how do I read all the logs in the directory pointed by log.dir in server.properties configuration file? Thank you. Regards, Rishikesh Gawade
Re: kafka streams with multiple threads and state store
Hi Guozhang, Thanks for the information. My requirement is some thing like this. 1. i want to read the data from one topic (which is continuously feeding), so i though of using the kafka streams with threads 2. want to store the data in one in memory data base (not the local data store per thread) If i have to write my own Statestore logic with handling of synchronization is it equal to having my own global data structure in all threads ? Any performance impact will be their with our own sync ? Can you pelase share if you have any sample programs or links describing on this . Thanks & Regards, Ranjit On Fri, Nov 10, 2017 at 4:38 AM, Guozhang Wangwrote: > Ranjit, > > Note that the "testStore" instance you are passing is a StateStoreSupplier > which will generate a new StateStore instance for each thread's task. > > If you really want to have all the thread's share the same state store you > should implement your own StateStoreSupplier that only return the same > StateStore instance in its "get()" call; however, keep in mind that in this > case this state store could be concurrently accessed by multi-threads which > is not protected by the library itself (by default single-thread access is > guaranteed on the state stores). > > > Guozhang > > On Thu, Nov 9, 2017 at 2:51 AM, Ranjit Kumar wrote: > > > Hi All, > > > > I want to use one state store in all my kafka stream threads in my > > application, how can i do it. > > > > 1. i created one topic (name: test2) with 3 partitions . > > 2. wrote kafka stream with num.stream.threads = 3 in java code > > 3. using state store (name: count2) in my application. > > > > But state store (count2) is acting like local to thread, but it should be > > unique to entire application and the same value to be reflected every > where > > how can i do it ? > > > > Do i need to take care any synch also ? > > > > Code: > > > > package com.javatpoint; > > import org.apache.kafka.common.serialization.Serdes; > > import org.apache.kafka.streams.KafkaStreams; > > import org.apache.kafka.streams.StreamsConfig; > > import org.apache.kafka.streams.processor.Processor; > > import org.apache.kafka.streams.processor.ProcessorContext; > > import org.apache.kafka.streams.processor.StateStoreSupplier; > > import org.apache.kafka.streams.processor.TopologyBuilder; > > import org.apache.kafka.streams.state.Stores; > > > > import org.apache.kafka.streams.kstream.KStreamBuilder; > > import org.apache.kafka.streams.processor.StateStoreSupplier; > > import org.apache.kafka.streams.state.KeyValueStore; > > > > import java.util.Properties; > > import java.lang.*; > > > > /** > > * Hello world! > > * > > */ > > public class App > > { > > public static void main( String[] args ) > > { > > /*StateStoreSupplier testStore = Stores.create("count2") > > .withKeys(Serdes.String()) > > .withValues(Serdes.Long()) > > .persistent() > > .build();*/ > > StateStoreSupplier testStore = Stores.create("count2") > > .withStringKeys() > > .withLongValues() > > .persistent() > > .build(); > > > > //TopologyBuilder builder = new TopologyBuilder(); > > final KStreamBuilder builder = new KStreamBuilder(); > > > > builder.addSource("source", "test2").addProcessor("process", > > TestProcessor::new, "source").addStateStore(testStore, "process"); > > > > Properties props = new Properties(); > > props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app1"); > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, > > "localhost:9092"); > > props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > Serdes.String().getClass()); > > props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > Serdes.String().getClass()); > > //props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > > Serdes.ByteArray().getClass().getName()); > > //props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > > Serdes.ByteArray().getClass().getName()); > > > > props.put("auto.offset.reset", "latest"); > > props.put("num.stream.threads", 3); > > > > System.out.println("test1"); > > KafkaStreams streams = new KafkaStreams(builder, props); > > System.out.println("test2"); > > streams.start(); > > } > > > > //public static class TestProcessor implements Processor > byte[]> { > > public static class TestProcessor implements Processor String> > > { > > private KeyValueStore kvStore; > > private ProcessorContext context; > > > > @Override > > public void init(ProcessorContext context) { > > this.context = context; > > //context.getStateStore("count2"); > > System.out.println("Initialized"); > > this.kvStore =