To follow up on this, our specific problem was with a custom
CachedSchemaRegistry.
 external/storm-hdfs master branch uses CachedSchemaRegistry from a
Confluent dep on version 1 which also has this bug.  It is fixed in later
Confluent versions of confluentinc/schema-registry like this missing cache
entry in 1.x:
https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java#L152

I don't have time right now to see if storm-hdfs can bump to something
newer than
https://github.com/apache/storm/blob/master/external/storm-hdfs/pom.xml#L205
 but
will report if I do.

Kris

On Sun, Sep 11, 2016 at 1:31 PM, Kristopher Kane <kkane.l...@gmail.com>
wrote:

> This took a while as I could not get INFO logging to come out of the
> serializer in Kryo.  The CachedSchemaRegistry and
> EnhancedCachedSchmeaRegistry - in our raw scheme deserializer (with
> serialization into Avro) is 100% cache after the initial load.  As you
> said, the serializer with the IdentityHashMap in org.apache.storm.hdfs.avro.
> ConfluentAvroSerializer.getFingerprint(ConfluentAvroSerializer.java:74)  looks
> to be the problem - although I couldn't get it to log the cache misses for
> me in the serializer itself.
>
> Here is the evidence:
>
> This thread dump is after the topology has been running for a few
> minutes.  No new schemas are being introduced but still opening
> 'HttpURLConnection'.  The deserializer bolt says all cache requests are a
> hit and are not going to the SchemaRegistry
>
> "Thread-3-disruptor-executor[8 8]-send-queue" - Thread t@49
>    java.lang.Thread.State: RUNNABLE
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
> at java.net.SocketInputStream.read(SocketInputStream.java:170)
> at java.net.SocketInputStream.read(SocketInputStream.java:141)
> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
> at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
> at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
> - locked <670c88ea> (a java.io.BufferedInputStream)
> at sun.net.www.MeteredStream.read(MeteredStream.java:134)
> - locked <2a8ccb96> (a sun.net.www.http.KeepAliveStream)
> at java.io.FilterInputStream.read(FilterInputStream.java:133)
> at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(
> HttpURLConnection.java:3336)
> at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.
> ensureLoaded(ByteSourceJsonBootstrapper.java:503)
> at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.
> detectEncoding(ByteSourceJsonBootstrapper.java:129)
> at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.
> constructParser(ByteSourceJsonBootstrapper.java:224)
> at com.fasterxml.jackson.core.JsonFactory._createParser(
> JsonFactory.java:1244)
> at com.fasterxml.jackson.core.JsonFactory.createParser(
> JsonFactory.java:755)
> at com.fasterxml.jackson.databind.ObjectMapper.
> readValue(ObjectMapper.java:2199)
> at io.confluent.kafka.schemaregistry.client.rest.
> utils.RestUtils.httpRequest(RestUtils.java:137)
> at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.
> lookUpSubjectVersion(RestUtils.java:164)
> at org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.
> getVersionFromRegistry(EnhancedCachedSchemaRegistry.java:80)
> at org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.getVersion(
> EnhancedCachedSchemaRegistry.java:176)
> - locked <6b561b0f> (a org.apache.storm.hdfs.avro.
> EnhancedCachedSchemaRegistry)
> at org.apache.storm.hdfs.avro.ConfluentAvroSerializer.getFingerprint(
> ConfluentAvroSerializer.java:74)
> at org.apache.storm.hdfs.avro.AbstractAvroSerializer.write(
> AbstractAvroSerializer.java:50)
> at org.apache.storm.hdfs.avro.AbstractAvroSerializer.write(
> AbstractAvroSerializer.java:45)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(
> CollectionSerializer.java:75)
> at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(
> CollectionSerializer.java:18)
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486)
> at backtype.storm.serialization.KryoValuesSerializer.serializeInto(
> KryoValuesSerializer.java:44)
> at backtype.storm.serialization.KryoTupleSerializer.serialize(
> KryoTupleSerializer.java:44)
> at backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__
> 5543.invoke(worker.clj:142)
> at backtype.storm.daemon.executor$start_batch_transfer_
> _GT_worker_handler_BANG_$fn__3539.invoke(executor.clj:274)
> at backtype.storm.disruptor$clojure_handler$reify__3196.
> onEvent(disruptor.clj:58)
> at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(
> DisruptorQueue.java:125)
> at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(
> DisruptorQueue.java:99)
> at backtype.storm.disruptor$consume_batch_when_available.
> invoke(disruptor.clj:80)
> at backtype.storm.disruptor$consume_loop_STAR_$fn__3209.
> invoke(disruptor.clj:94)
> at backtype.storm.util$async_loop$fn__544.invoke(util.clj:475)
> at clojure.lang.AFn.run(AFn.java:22)
> at java.lang.Thread.run(Thread.java:745)
>
>
> I've attached two screen shots of the JVisualVM CPU profilers.  The one
> titled 'multiple-jvm' represents a topology with two workers and high CPU
> usage on RestUtil.  The file title 'single-jvm' represents a topology with
> one worker and no CPU usage for RestUtil.
>
>
> I think this is pretty good evidence but I would love to know how to log
> from the serializer running in Kryo as this would give me 100% proof.  Can
> anyone help me understand what is going on there?
>
> Thanks,
>
> Kris
>
> On Wed, Sep 7, 2016 at 12:11 PM, Aaron Niskodé-Dossett <doss...@gmail.com>
> wrote:
>
>> Let us know what you find, especially if the serializer needs to be more
>> defensive to ensure proper caching.
>>
>> On Tue, Sep 6, 2016 at 8:45 AM Kristopher Kane <kkane.l...@gmail.com>
>> wrote:
>>
>>> Come to think of it, I did see RestUtils rank some what higher in the
>>> visualvm CPU profiler but did not give it the attention it deserved.
>>>
>>> On Tue, Sep 6, 2016 at 9:39 AM, Aaron Niskodé-Dossett <doss...@gmail.com
>>> > wrote:
>>>
>>>> Hi Kris,
>>>>
>>>> One possibility is that the Serializer isn't actually caching the
>>>> schema <-> id mappings and is hitting the schema registry every time.  The
>>>> call to register() in getFingerprint() [1] in particular can be a finicky
>>>> since the cache is ultimately in an IDENTITY hash map, not a regular old
>>>> hashmap[2].  I'm familiar with the Avro deserializer you're using and
>>>> though it accounted for this, but perhaps not.
>>>>
>>>> You could add timing information to the getFingerprint() and
>>>> getSchema() calls in ConfluentAvroSerializer.  If the results indicate
>>>> cache misses, that's probably your culprit.
>>>>
>>>> Best, Aaron
>>>>
>>>>
>>>> [1] https://github.com/apache/storm/blob/master/external/storm-
>>>> hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroS
>>>> erializer.java#L66
>>>> [2] https://github.com/confluentinc/schema-registry/blob/v1.
>>>> 0/client/src/main/java/io/confluent/kafka/schemaregistry
>>>> /client/CachedSchemaRegistryClient.java#L79
>>>>
>>>> On Tue, Sep 6, 2016 at 7:40 AM Kristopher Kane <kkane.l...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi everyone.
>>>>>
>>>>> I have a simple topology that uses the Avro serializer (
>>>>> https://github.com/apache/storm/blob/master/external/storm-
>>>>> hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroS
>>>>> erializer.java)  and writes to Elasticsearch.
>>>>>
>>>>> The topology is like this:
>>>>>
>>>>> Kafka (raw scheme) -> Avro deserializer -> Elasticsearch
>>>>>
>>>>> This topology runs well with one worker, however, once I add one more
>>>>> worker (total of two) and change nothing else, the topology throughput
>>>>> drops and tuples start timing out.
>>>>>
>>>>> I've attached visualvm/jstatd to the workers when in multi worker mode
>>>>> - and added some jmx configs to the worker opts - but I am unable to see
>>>>> anything glaring.
>>>>>
>>>>> I've never seen Storm act this way but have also never worked with a
>>>>> custom serializer so assume that it is the culprit but I cannot explain
>>>>> why.
>>>>>
>>>>> Any pointers would be appreciated.
>>>>>
>>>>> Kris
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>

Reply via email to