HI Jon, It looks like you have the logging level for KafkaStreams set to at least WARN. I can only see ERROR level logs being produced from Streams.
However, i did notice an issue in the logs (not related to your specific error but you will need to fix anyway): There are lots of messages like: task [2_9] Error sending record to topic PRTMinuteAgg-prt_hour_agg_stream-changelog org.apache.kafka.common.errors.RecordTooLargeException: The message is 2381750 bytes when serialized which is larger than the maximum This means you need to add some extra config to your StreamsConfig: config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, expectedMaximumMessageSizeBytes) You will also possible need to adjust the broker properties and increase message.max.bytes - it will need to be at least as large as the setting above. At the moment all of the change-logs for your state-stores are being dropped due to this issue. Thanks, Damian On Tue, 13 Dec 2016 at 11:32 Jon Yeargers <jon.yearg...@cedexis.com> wrote: > (am attaching a debug log - note that app terminated with no further > messages) > > topology: kStream -> groupByKey.aggregate(minute) -> foreach > \-> groupByKey.aggregate(hour) -> foreach > > > config: > > Properties config = new Properties(); > config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP); > config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP); > config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" ); > config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, > AggKey.class.getName()); > config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass().getName()); > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); > config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2"); > config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg"); > > config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2"); > > > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > Jon, > > To help investigating this issue, could you let me know 1) your topology > sketch and 2) your app configs? For example did you enable caching in your > apps with the cache.max.bytes.buffering config? > > > Guozhang > > > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers <jon.yearg...@cedexis.com> > wrote: > > > I get this one quite a bit. It kills my app after a short time of > running. > > Driving me nuts. > > > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Not sure about this one. > > > > > > Can you describe what you do exactly? Can you reproduce the issue? We > > > definitely want to investigate this. > > > > > > -Matthias > > > > > > On 12/10/16 4:17 PM, Jon Yeargers wrote: > > > > (Am reporting these as have moved to 0.10.1.0-cp2) > > > > > > > > ERROR o.a.k.c.c.i.ConsumerCoordinator - User provided listener > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > > > > MinuteAgg failed on partition assignment > > > > > > > > java.lang.IllegalStateException: task [1_9] Log end offset should not > > > > change while restoring > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > > > restoreActiveState(ProcessorStateManager.java:245) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager. > > > register(ProcessorStateManager.java:198) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123) > > > > > > > > at > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init( > > > RocksDBWindowStore.java:206) > > > > > > > > at > > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init( > > > MeteredWindowStore.java:66) > > > > > > > > at > > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init( > > > CachingWindowStore.java:64) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.AbstractTask. > > > initializeStateStores(AbstractTask.java:81) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > StreamTask.<init>(StreamTask.java:120) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.createStreamTask(StreamThread.java:633) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.addStreamTasks(StreamThread.java:660) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.access$100( > > > StreamThread.java:69) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > > > onPartitionsAssigned(StreamThread.java:124) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > > > onJoinComplete(ConsumerCoordinator.java:228) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > joinGroupIfNeeded(AbstractCoordinator.java:313) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > > > ensureActiveGroup(AbstractCoordinator.java:277) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > > > ConsumerCoordinator.java:259) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > > > pollOnce(KafkaConsumer.java:1013) > > > > > > > > at > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > KafkaConsumer.java:979) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > > StreamThread.java:407) > > > > > > > > at > > > > org.apache.kafka.streams.processor.internals. > > > StreamThread.run(StreamThread.java:242) > > > > > > > > > > > > > > > > -- > -- Guozhang > > >