Thanks for the update Pushkar! I'd have to say it is indeed very very
misleading error message and we should fix it asap. Will follow-up on the
ticket.

Guozhang

On Thu, May 28, 2020 at 9:17 AM John Roesler <vvcep...@apache.org> wrote:

> Woah, that's a nasty bug. I've just pinged the Jira ticket. Please feel
> free to
> do the same.
>
> Thanks,
> -John
>
> On Thu, May 28, 2020, at 02:55, Pushkar Deole wrote:
> > Thanks for the help Guozhang!
> > however i realized that the exception and actual problem is totally
> > different. The problem was the client was not set with SSL truststore
> while
> > server is SSLenabled.
> > I also found this open bug on kafka
> > https://issues.apache.org/jira/browse/KAFKA-4493
> > After setting the SSL properties on stream, I am able to get it up and
> > running.
> >
> > @kafka developers, I think the problem is very misleading and should be
> > fixed as soon as possible, or a proper exception should be thrown.
> >
> > On Thu, May 28, 2020 at 9:46 AM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Pushkar,
> > >
> > > I think the memory pressure may not come from the topic data
> consumption,
> > > but from rocksDB used for materializing the global table. Note rocksDB
> > > allocates large chunk of memory beforehand in mem-table / page cache /
> > > reader cache with default configs. You can get some detailed
> information
> > > from this KIP:
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Record+the+Memory+Used+by+RocksDB
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Wed, May 27, 2020 at 8:44 PM Pushkar Deole <pdeole2...@gmail.com>
> > > wrote:
> > >
> > > > Hello All,
> > > >
> > > > I am using Stream DSL API just to create a GlobalKTable backed by a
> > > topic.
> > > > The topology is simple, just create a global table from a topic and
> > > that's
> > > > it (pasted below code snippet), when I run this service on K8S
> cluster
> > > > (container in a pod), the service gets OutOfMemoryError during
> > > > kafkaStreams.start() method call (exception trace pasted below). Note
> > > that
> > > > the topic is newly created so there is no data in the topic. POD
> memory
> > > was
> > > > set initially to 500MiB which I doubled to 1000MiB but no luck.
> > > > kafka-streams and kafka-clients jar at 2.3.1 version. Broker might
> be a
> > > > version ahead I think 2.4 but that should not be an issue. Any help
> would
> > > > be appreciated since I am blocked at this point.
> > > >
> > > > Properties props = new Properties();
> > > > props.put(StreamsConfig.APPLICATION_ID_CONFIG,
> DEFAULT_APPLICATION_ID);
> > > > props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
> > > > StreamsBuilder streamsBuilder = new StreamsBuilder();
> > > > GlobalKTable<String, Map<String, String>> groupCacheTable =
> > > >     streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
> > > > Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
> > > > Materialized.as(GROUP_CACHE_STORE_NAME));
> > > > Topology groupCacheTopology = streamsBuilder.build();
> > > > kafkaStreams = new KafkaStreams(groupCacheTopology, props);
> > > > kafkaStreams.start();
> > > >
> > > > Runtime.getRuntime().addShutdownHook(new Thread(() -> {
> > > > LOG.info("Stopping the stream");
> > > > kafkaStreams.close();
> > > > }));
> > > >
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:39.719+00:00","@version":"1","message":"stream-client
> > > > [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9] State
> transition
> > > from
> > > > CREATED to
> > > >
> > > >
> > >
> REBALANCING","logger_name":"org.apache.kafka.streams.KafkaStreams","thread_name":"main","level":"INFO","level_value":20000}
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:43.532+00:00","@version":"1","message":"Uncaught
> > > > exception in thread 'kafka-admin-client-thread |
> > > >
> > > >
> > >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-admin-client-thread
> > > > |
> > > >
> > > >
> > >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-admin","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> > > > Java heap space\n\tat
> java.base/java.nio.HeapByteBuffer.<init>(Unknown
> > > > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> > > Source)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > > >
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > > >
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)\n\tat
> > > > java.base/java.lang.Thread.run(Unknown Source)\n"}
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:44.641+00:00","@version":"1","message":"Uncaught
> > > > exception in thread 'kafka-producer-network-thread |
> > > >
> > > >
> > >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer':","logger_name":"org.apache.kafka.common.utils.KafkaThread","thread_name":"kafka-producer-network-thread
> > > > |
> > > >
> > > >
> > >
> DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-StreamThread-1-producer","level":"ERROR","level_value":40000,"stack_trace":"java.lang.OutOfMemoryError:
> > > > Java heap space\n\tat
> java.base/java.nio.HeapByteBuffer.<init>(Unknown
> > > > Source)\n\tat java.base/java.nio.ByteBuffer.allocate(Unknown
> > > Source)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)\n\tat
> > > >
> org.apache.kafka.common.network.Selector.poll(Selector.java:483)\n\tat
> > > >
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)\n\tat
> > > >
> > > >
> > >
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)\n\tat
> > > > java.base/java.lang.Thread.run(Unknown Source)\n"}
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:45.017+00:00","@version":"1","message":"Opening
> > > > store group-cache-store in regular
> > > >
> > > >
> > >
> mode","logger_name":"org.apache.kafka.streams.state.internals.RocksDBTimestampedStore","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
> > > >
> > > >
> > >
> {"@timestamp":"2020-05-28T03:11:45.020+00:00","@version":"1","message":"global-stream-thread
> > > >
> [DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread]
> > > > Restoring state for global store
> > > >
> > > >
> > >
> group-cache-store","logger_name":"org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl","thread_name":"DsiApplication-3db7d0ab-c84b-4e39-9038-61fa235786b9-GlobalStreamThread","level":"INFO","level_value":20000}
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>


-- 
-- Guozhang

Reply via email to