Matthias,

I realized that the exception and actual problem is totally different. The
problem was the client was not set with SSL truststore while server is
SSLenabled.
I also found this open bug on kafka
https://issues.apache.org/jira/browse/KAFKA-4493
After setting the SSL properties on stream, I am able to get it up and
running.

Due to above problem, it is very difficult to debug the issue and above bug
can be fixed as soon as possible, or a proper exception should be thrown.

On Wed, May 27, 2020 at 10:59 PM Pushkar Deole <pdeole2...@gmail.com> wrote:

> Thanks... i will try increasing the memory in case you don't spot anything
> wrong with the code. Other service also have streams and global k table but
> they use spring-kafka, but i think that should not matter, and it should
> work with normal kafka-streams code unless i am missing some
> configuration/setting here
>
> On Wed, May 27, 2020 at 10:26 PM Matthias J. Sax <mj...@apache.org> wrote:
>
>> There is no hook. Only a restore listener, but this one is only used
>> during startup when the global store is loaded. It's not sure during
>> regular processing.
>>
>> Depending on your usage, maybe you can switch to a global store instead
>> of GlobalKTable? That way, you can implement a custom `Processor` and
>> add a hook manually?
>>
>> I don't see anything wrong with your setup. Unclear if/why the global
>> store would require a lot of memory...
>>
>>
>> -Matthias
>>
>> On 5/27/20 7:41 AM, Pushkar Deole wrote:
>> > Matthias,
>> > I tried with default store as well but getting same error, can you
>> please
>> > check if I am initializing the global store in the right way:
>> >
>> > public void setupGlobalCacheTables(String theKafkaServers) {
>> >     Properties props = new Properties();
>> >     props.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> DEFAULT_APPLICATION_ID);
>> >     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
>> >     StreamsBuilder streamsBuilder = new StreamsBuilder();
>> >     groupCacheTable =
>> >     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
>> > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
>> >         Materialized.as(GROUP_CACHE_STORE_NAME));
>> >     Topology groupCacheTopology = streamsBuilder.build();
>> >      kafkaStreams = new KafkaStreams(groupCacheTopology, props);
>> >       kafkaStreams.start();
>> >
>> > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
>> > LOG.info("Stopping the stream");
>> > kafkaStreams.close();
>> > }));
>> > }
>> >
>> > On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pdeole2...@gmail.com>
>> wrote:
>> >
>> >> Hi Matthias,
>> >>
>> >> By the way, I used the in-memory global store and the service is giving
>> >> out of memory error during startup. Unfortunately i don't have a stack
>> >> trace now but when i got stack the first time, the error was coming
>> >> somewhere from memorypool.allocate or similar kind of method. If i get
>> the
>> >> stack trace again, I will share that with you.
>> >> However, the topic from where the store is reading from is empty so I
>> am
>> >> not sure why the global k table is trying to occupy a lot of space.
>> The POD
>> >> memory request and limits are 500 MiB and 750 MiB respectively so the
>> state
>> >> store should fit into this memory I believe since topic is empty. Can
>> you
>> >> provide inputs on this.
>> >>
>> >>
>> >> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pdeole2...@gmail.com>
>> >> wrote:
>> >>
>> >>> Ok... got it... is there any hook that I can attach to the global k
>> table
>> >>> or global store? What I mean here is I want to know when the global
>> store
>> >>> is updated with data from topic in that case the hook that I specified
>> >>> should be invoked so i can do some activity like logging that, this
>> will
>> >>> allow me to know how long the global store took to sync up with topic
>> after
>> >>> the event has been put on the topic.
>> >>>
>> >>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org>
>> >>> wrote:
>> >>>
>> >>>> For example it could be some "static" information, like a mapping
>> from
>> >>>> zip code to city name.
>> >>>>
>> >>>> Something that does usually not change over time.
>> >>>>
>> >>>>
>> >>>> -Matthias
>> >>>>
>> >>>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
>> >>>>> Matthias,
>> >>>>>
>> >>>>> I am wondering what you mean by "Global store hold "axially" data
>> that
>> >>>> is
>> >>>>> provided from "outside" of the
>> >>>>> app"
>> >>>>>
>> >>>>> will you be able to give some example use case here as to what you
>> >>>> mean by
>> >>>>> axially data provided from outside app?
>> >>>>>
>> >>>>> On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
>> >>>> wrote:
>> >>>>>
>> >>>>>> Both stores sever a different purpose.
>> >>>>>>
>> >>>>>> Regular stores allow you to store state the application computes.
>> >>>>>> Writing into the changelog is a fault-tolerance mechanism.
>> >>>>>>
>> >>>>>> Global store hold "axially" data that is provided from "outside" of
>> >>>> the
>> >>>>>> app. There is no changelog topic, but only the input topic (that is
>> >>>> used
>> >>>>>> to re-create the global state).
>> >>>>>>
>> >>>>>> Local stores are sharded and updates are "sync" as they don't need
>> to
>> >>>> be
>> >>>>>> shared with anybody else.
>> >>>>>>
>> >>>>>> For global stores, as all instances need to be updated, updates are
>> >>>>>> async (we don't know when which instance will update it's own
>> global
>> >>>>>> store replica).
>> >>>>>>
>> >>>>>>>> Say one stream thread updates the topic for global store and
>> starts
>> >>>>>>>> processing next event wherein the processor tries to read the
>> global
>> >>>>>> store
>> >>>>>>>> which may not have been synced with the topic?
>> >>>>>>
>> >>>>>> Correct. There is no guarantee when the update to the global store
>> >>>> will
>> >>>>>> be applied. As said, global stores are not designed to hold data
>> the
>> >>>>>> application computes.
>> >>>>>>
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>>
>> >>>>>> On 4/30/20 11:11 PM, Pushkar Deole wrote:
>> >>>>>>> thanks... will try with GlobalKTable.
>> >>>>>>> As a side question, I didn't really understand the significance of
>> >>>> global
>> >>>>>>> state store which kind of works in a reverse way to local state
>> store
>> >>>>>> i.e.
>> >>>>>>> local state store is updated and then saved to changelog topic
>> >>>> whereas in
>> >>>>>>> case of global state store the topic is updated first and then
>> >>>> synced to
>> >>>>>>> global state store. Do these two work in sync i.e. the update to
>> >>>> topic
>> >>>>>> and
>> >>>>>>> global state store ?
>> >>>>>>>
>> >>>>>>> Say one stream thread updates the topic for global store and
>> starts
>> >>>>>>> processing next event wherein the processor tries to read the
>> global
>> >>>>>> store
>> >>>>>>> which may not have been synced with the topic?
>> >>>>>>>
>> >>>>>>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
>> >>>> wrote:
>> >>>>>>>
>> >>>>>>>> Yes.
>> >>>>>>>>
>> >>>>>>>> A `GlobalKTable` uses a global store internally.
>> >>>>>>>>
>> >>>>>>>> You can also use `StreamsBuilder.addGlobalStore()` or
>> >>>>>>>> `Topology.addGlobalStore()` to add a global store "manually".
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> -Matthias
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>> >>>>>>>>> Thanks Matthias.
>> >>>>>>>>> Can you elaborate on the replicated caching layer part?
>> >>>>>>>>> When you say global stores, do you mean GlobalKTable created
>> from a
>> >>>>>> topic
>> >>>>>>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>> >>>>>>>>>
>> >>>>>>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <
>> mj...@apache.org
>> >>>>>
>> >>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>>> It's not possible to modify state store from "outside".
>> >>>>>>>>>>
>> >>>>>>>>>> If you want to build a "replicated caching layer", you could
>> use
>> >>>>>> global
>> >>>>>>>>>> stores and write into the corresponding topics to update all
>> >>>> stores.
>> >>>>>> Of
>> >>>>>>>>>> course, those updates would be async.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> -Matthias
>> >>>>>>>>>>
>> >>>>>>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>> >>>>>>>>>>> Hi All,
>> >>>>>>>>>>>
>> >>>>>>>>>>> I am wondering if this is possible: i have been asked to use
>> >>>> state
>> >>>>>>>> stores
>> >>>>>>>>>>> as a general replicated cache among multiple instances of
>> service
>> >>>>>>>>>> instances
>> >>>>>>>>>>> however the state store is created through streambuilder but
>> is
>> >>>> not
>> >>>>>>>>>>> actually modified through stream processor topology however it
>> >>>> is to
>> >>>>>> be
>> >>>>>>>>>>> modified from outside the stream topology. So, essentially,
>> the
>> >>>> state
>> >>>>>>>>>> store
>> >>>>>>>>>>> is just to be created from streambuilder and then to be used
>> as
>> >>>> an
>> >>>>>>>>>>> application level cache that will get replicated between
>> >>>> application
>> >>>>>>>>>>> instances. Is this possible using state stores?
>> >>>>>>>>>>>
>> >>>>>>>>>>> Secondly, if possible, is this a good design approach?
>> >>>>>>>>>>>
>> >>>>>>>>>>> Appreciate your response since I don't know the internals of
>> >>>> state
>> >>>>>>>>>> stores.
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>>
>> >
>>
>>

Reply via email to