The fix appears to be in 2.x and beyond.

On Thu, Oct 13, 2016 at 10:29 AM, Jungtaek Lim <kabh...@gmail.com> wrote:

> Thanks for the great update. Which version of schema-registry resolves the
> issue?
>
> - Jungtaek Lim (HeartSaVioR)
>
> 2016년 10월 13일 (목) 오후 11:13, Kristopher Kane <kkane.l...@gmail.com>님이 작성:
>
>> To follow up on this, our specific problem was with a custom 
>> CachedSchemaRegistry.
>>  external/storm-hdfs master branch uses CachedSchemaRegistry from a
>> Confluent dep on version 1 which also has this bug.  It is fixed in later
>> Confluent versions of confluentinc/schema-registry like this missing cache
>> entry in 1.x: https://github.com/confluentinc/schema-registry/
>> blob/master/client/src/main/java/io/confluent/kafka/
>> schemaregistry/client/CachedSchemaRegistryClient.java#L152
>>
>> I don't have time right now to see if storm-hdfs can bump to something
>> newer than https://github.com/apache/storm/blob/master/external/
>> storm-hdfs/pom.xml#L205  but will report if I do.
>>
>> Kris
>>
>> On Sun, Sep 11, 2016 at 1:31 PM, Kristopher Kane <kkane.l...@gmail.com>
>> wrote:
>>
>> This took a while as I could not get INFO logging to come out of the
>> serializer in Kryo.  The CachedSchemaRegistry and
>> EnhancedCachedSchmeaRegistry - in our raw scheme deserializer (with
>> serialization into Avro) is 100% cache after the initial load.  As you
>> said, the serializer with the IdentityHashMap in org.apache.storm.hdfs.avro.
>> ConfluentAvroSerializer.getFingerprint(ConfluentAvroSerializer.java:74)  
>> looks
>> to be the problem - although I couldn't get it to log the cache misses for
>> me in the serializer itself.
>>
>> Here is the evidence:
>>
>> This thread dump is after the topology has been running for a few
>> minutes.  No new schemas are being introduced but still opening
>> 'HttpURLConnection'.  The deserializer bolt says all cache requests are a
>> hit and are not going to the SchemaRegistry
>>
>> "Thread-3-disruptor-executor[8 8]-send-queue" - Thread t@49
>>    java.lang.Thread.State: RUNNABLE
>> at java.net.SocketInputStream.socketRead0(Native Method)
>> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
>> at java.net.SocketInputStream.read(SocketInputStream.java:170)
>> at java.net.SocketInputStream.read(SocketInputStream.java:141)
>> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
>> at java.io.BufferedInputStream.read1(BufferedInputStream.java:286)
>> at java.io.BufferedInputStream.read(BufferedInputStream.java:345)
>> - locked <670c88ea> (a java.io.BufferedInputStream)
>> at sun.net.www.MeteredStream.read(MeteredStream.java:134)
>> - locked <2a8ccb96> (a sun.net.www.http.KeepAliveStream)
>> at java.io.FilterInputStream.read(FilterInputStream.java:133)
>> at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(
>> HttpURLConnection.java:3336)
>> at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.
>> ensureLoaded(ByteSourceJsonBootstrapper.java:503)
>> at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.
>> detectEncoding(ByteSourceJsonBootstrapper.java:129)
>> at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.
>> constructParser(ByteSourceJsonBootstrapper.java:224)
>> at com.fasterxml.jackson.core.JsonFactory._createParser(
>> JsonFactory.java:1244)
>> at com.fasterxml.jackson.core.JsonFactory.createParser(
>> JsonFactory.java:755)
>> at com.fasterxml.jackson.databind.ObjectMapper.
>> readValue(ObjectMapper.java:2199)
>> at io.confluent.kafka.schemaregistry.client.rest.
>> utils.RestUtils.httpRequest(RestUtils.java:137)
>> at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.
>> lookUpSubjectVersion(RestUtils.java:164)
>> at org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.
>> getVersionFromRegistry(EnhancedCachedSchemaRegistry.java:80)
>> at org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.getVersion(
>> EnhancedCachedSchemaRegistry.java:176)
>> - locked <6b561b0f> (a org.apache.storm.hdfs.avro.
>> EnhancedCachedSchemaRegistry)
>> at org.apache.storm.hdfs.avro.ConfluentAvroSerializer.getFingerprint(
>> ConfluentAvroSerializer.java:74)
>> at org.apache.storm.hdfs.avro.AbstractAvroSerializer.write(
>> AbstractAvroSerializer.java:50)
>> at org.apache.storm.hdfs.avro.AbstractAvroSerializer.write(
>> AbstractAvroSerializer.java:45)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(
>> CollectionSerializer.java:75)
>> at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(
>> CollectionSerializer.java:18)
>> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486)
>> at backtype.storm.serialization.KryoValuesSerializer.serializeInto(
>> KryoValuesSerializer.java:44)
>> at backtype.storm.serialization.KryoTupleSerializer.serialize(
>> KryoTupleSerializer.java:44)
>> at backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__
>> 5543.invoke(worker.clj:142)
>> at backtype.storm.daemon.executor$start_batch_transfer_
>> _GT_worker_handler_BANG_$fn__3539.invoke(executor.clj:274)
>> at backtype.storm.disruptor$clojure_handler$reify__3196.
>> onEvent(disruptor.clj:58)
>> at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(
>> DisruptorQueue.java:125)
>> at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(
>> DisruptorQueue.java:99)
>> at backtype.storm.disruptor$consume_batch_when_available.
>> invoke(disruptor.clj:80)
>> at backtype.storm.disruptor$consume_loop_STAR_$fn__3209.
>> invoke(disruptor.clj:94)
>> at backtype.storm.util$async_loop$fn__544.invoke(util.clj:475)
>> at clojure.lang.AFn.run(AFn.java:22)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> I've attached two screen shots of the JVisualVM CPU profilers.  The one
>> titled 'multiple-jvm' represents a topology with two workers and high CPU
>> usage on RestUtil.  The file title 'single-jvm' represents a topology with
>> one worker and no CPU usage for RestUtil.
>>
>>
>> I think this is pretty good evidence but I would love to know how to log
>> from the serializer running in Kryo as this would give me 100% proof.  Can
>> anyone help me understand what is going on there?
>>
>> Thanks,
>>
>> Kris
>>
>> On Wed, Sep 7, 2016 at 12:11 PM, Aaron Niskodé-Dossett <doss...@gmail.com
>> > wrote:
>>
>> Let us know what you find, especially if the serializer needs to be more
>> defensive to ensure proper caching.
>>
>> On Tue, Sep 6, 2016 at 8:45 AM Kristopher Kane <kkane.l...@gmail.com>
>> wrote:
>>
>> Come to think of it, I did see RestUtils rank some what higher in the
>> visualvm CPU profiler but did not give it the attention it deserved.
>>
>> On Tue, Sep 6, 2016 at 9:39 AM, Aaron Niskodé-Dossett <doss...@gmail.com>
>> wrote:
>>
>> Hi Kris,
>>
>> One possibility is that the Serializer isn't actually caching the schema
>> <-> id mappings and is hitting the schema registry every time.  The call to
>> register() in getFingerprint() [1] in particular can be a finicky since the
>> cache is ultimately in an IDENTITY hash map, not a regular old hashmap[2].
>> I'm familiar with the Avro deserializer you're using and though it
>> accounted for this, but perhaps not.
>>
>> You could add timing information to the getFingerprint() and getSchema()
>> calls in ConfluentAvroSerializer.  If the results indicate cache misses,
>> that's probably your culprit.
>>
>> Best, Aaron
>>
>>
>> [1] https://github.com/apache/storm/blob/master/external/
>> storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/
>> ConfluentAvroSerializer.java#L66
>> [2] https://github.com/confluentinc/schema-registry/
>> blob/v1.0/client/src/main/java/io/confluent/kafka/schemaregistry/client/
>> CachedSchemaRegistryClient.java#L79
>>
>> On Tue, Sep 6, 2016 at 7:40 AM Kristopher Kane <kkane.l...@gmail.com>
>> wrote:
>>
>> Hi everyone.
>>
>> I have a simple topology that uses the Avro serializer (
>> https://github.com/apache/storm/blob/master/external/
>> storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/
>> ConfluentAvroSerializer.java)  and writes to Elasticsearch.
>>
>> The topology is like this:
>>
>> Kafka (raw scheme) -> Avro deserializer -> Elasticsearch
>>
>> This topology runs well with one worker, however, once I add one more
>> worker (total of two) and change nothing else, the topology throughput
>> drops and tuples start timing out.
>>
>> I've attached visualvm/jstatd to the workers when in multi worker mode -
>> and added some jmx configs to the worker opts - but I am unable to see
>> anything glaring.
>>
>> I've never seen Storm act this way but have also never worked with a
>> custom serializer so assume that it is the culprit but I cannot explain
>> why.
>>
>> Any pointers would be appreciated.
>>
>> Kris
>>
>>
>>
>>
>>
>>
>>
>>

Reply via email to