Hi Jon,

I haven't had much of a chance to look at the logs in detail too much yet,
but i have noticed that your app seems to be rebalancing frequently.  It
seems that it is usually around the 300 second mark, which usually would
mean that poll hasn't been called for at least that long. You might want to
try setting the config ConsumerConfig.MAX_POLL_INTERVAL_CONFIG to something
higher than 300000 (which is the default).

I'll continue to look at your logs and get back to you.
Thanks,
Damian

On Tue, 13 Dec 2016 at 15:02 Jon Yeargers <jon.yearg...@cedexis.com> wrote:

> attached is a log with lots of disconnections and a small amount of
> actual, useful activity.
>
>
>
> On Tue, Dec 13, 2016 at 4:57 AM, Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
>
> n/m - I understand the logging issue now. Am generating a new one. Will
> send shortly.
>
> On Tue, Dec 13, 2016 at 4:55 AM, Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
>
> Yes - saw that one. There were plenty of smaller records available though.
>
> I sent another log this morning with the level set to DEBUG. Hopefully you
> rec'd it.
>
> On Tue, Dec 13, 2016 at 3:58 AM, Damian Guy <damian....@gmail.com> wrote:
>
> HI Jon,
>
> It looks like you have the logging level for KafkaStreams set to at least
> WARN. I can only see ERROR level logs being produced from Streams.
>
> However, i did notice an issue in the logs (not related to your specific
> error but you will need to fix anyway):
>
> There are lots of messages like:
> task [2_9] Error sending record to topic
> PRTMinuteAgg-prt_hour_agg_stream-changelog
> org.apache.kafka.common.errors.RecordTooLargeException: The message is
> 2381750 bytes when serialized which is larger than the maximum
>
> This means you need to add some extra config to your StreamsConfig:
> config.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
> expectedMaximumMessageSizeBytes)
>
> You will also possible need to adjust the broker properties and
> increase message.max.bytes
> - it will need to be at least as large as the setting above.
>
> At the moment all of the change-logs for your state-stores are being
> dropped due to this issue.
>
> Thanks,
> Damian
>
>
>
> On Tue, 13 Dec 2016 at 11:32 Jon Yeargers <jon.yearg...@cedexis.com>
> wrote:
>
> > (am attaching a debug log - note that app terminated with no further
> > messages)
> >
> > topology: kStream -> groupByKey.aggregate(minute) -> foreach
> >                              \-> groupByKey.aggregate(hour) -> foreach
> >
> >
> > config:
> >
> >         Properties config = new Properties();
> >         config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_IP);
> >         config.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, ZOOKEEPER_IP);
> >         config.put(StreamsConfig.APPLICATION_ID_CONFIG, "PRTMinuteAgg" );
> >         config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > AggKey.class.getName());
> >         config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > Serdes.String().getClass().getName());
> >         config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
> >         config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "2");
> >         config.put(StreamsConfig.STATE_DIR_CONFIG, "/mnt/PRTMinuteAgg");
> >
> >         config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "2");
> >
> >
> > On Mon, Dec 12, 2016 at 4:45 PM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > Jon,
> >
> > To help investigating this issue, could you let me know 1) your topology
> > sketch and 2) your app configs? For example did you enable caching in
> your
> > apps with the cache.max.bytes.buffering config?
> >
> >
> > Guozhang
> >
> >
> > On Sun, Dec 11, 2016 at 3:44 PM, Jon Yeargers <jon.yearg...@cedexis.com>
> > wrote:
> >
> > > I get this one quite a bit. It kills my app after a short time of
> > running.
> > > Driving me nuts.
> > >
> > > On Sun, Dec 11, 2016 at 2:17 PM, Matthias J. Sax <
> matth...@confluent.io>
> > > wrote:
> > >
> > > > Not sure about this one.
> > > >
> > > > Can you describe what you do exactly? Can you reproduce the issue? We
> > > > definitely want to investigate this.
> > > >
> > > > -Matthias
> > > >
> > > > On 12/10/16 4:17 PM, Jon Yeargers wrote:
> > > > > (Am reporting these as have moved to 0.10.1.0-cp2)
> > > > >
> > > > > ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1 for
> group
> > > > > MinuteAgg failed on partition assignment
> > > > >
> > > > > java.lang.IllegalStateException: task [1_9] Log end offset should
> not
> > > > > change while restoring
> > > > >
> > > > >         at
>
> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > > restoreActiveState(ProcessorStateManager.java:245)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
>
> > > > register(ProcessorStateManager.java:198)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.processor.internals.
> > > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> > > > RocksDBWindowStore.java:206)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> > > > MeteredWindowStore.java:66)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> > > > CachingWindowStore.java:64)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > > > initializeStateStores(AbstractTask.java:81)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamTask.<init>(StreamTask.java:120)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.createStreamTask(StreamThread.java:633)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.addStreamTasks(StreamThread.java:660)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.processor.internals.StreamThread.ac
> cess$100(
> > > > StreamThread.java:69)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > > onPartitionsAssigned(StreamThread.java:124)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > > onJoinComplete(ConsumerCoordinator.java:228)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > > joinGroupIfNeeded(AbstractCoordinator.java:313)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > > ensureActiveGroup(AbstractCoordinator.java:277)
> > > > >
> > > > >         at
> > > > >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > > > ConsumerCoordinator.java:259)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > pollOnce(KafkaConsumer.java:1013)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > KafkaConsumer.java:979)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > > StreamThread.java:407)
> > > > >
> > > > >         at
> > > > > org.apache.kafka.streams.processor.internals.
> > > > StreamThread.run(StreamThread.java:242)
> > > > >
> > > >
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
>
>

Reply via email to