The fix appears to be in 2.x and beyond. On Thu, Oct 13, 2016 at 10:29 AM, Jungtaek Lim <[email protected]> wrote:
> Thanks for the great update. Which version of schema-registry resolves the > issue? > > - Jungtaek Lim (HeartSaVioR) > > 2016년 10월 13일 (목) 오후 11:13, Kristopher Kane <[email protected]>님이 작성: > >> To follow up on this, our specific problem was with a custom >> CachedSchemaRegistry. >> external/storm-hdfs master branch uses CachedSchemaRegistry from a >> Confluent dep on version 1 which also has this bug. It is fixed in later >> Confluent versions of confluentinc/schema-registry like this missing cache >> entry in 1.x: https://github.com/confluentinc/schema-registry/ >> blob/master/client/src/main/java/io/confluent/kafka/ >> schemaregistry/client/CachedSchemaRegistryClient.java#L152 >> >> I don't have time right now to see if storm-hdfs can bump to something >> newer than https://github.com/apache/storm/blob/master/external/ >> storm-hdfs/pom.xml#L205 but will report if I do. >> >> Kris >> >> On Sun, Sep 11, 2016 at 1:31 PM, Kristopher Kane <[email protected]> >> wrote: >> >> This took a while as I could not get INFO logging to come out of the >> serializer in Kryo. The CachedSchemaRegistry and >> EnhancedCachedSchmeaRegistry - in our raw scheme deserializer (with >> serialization into Avro) is 100% cache after the initial load. As you >> said, the serializer with the IdentityHashMap in org.apache.storm.hdfs.avro. >> ConfluentAvroSerializer.getFingerprint(ConfluentAvroSerializer.java:74) >> looks >> to be the problem - although I couldn't get it to log the cache misses for >> me in the serializer itself. >> >> Here is the evidence: >> >> This thread dump is after the topology has been running for a few >> minutes. No new schemas are being introduced but still opening >> 'HttpURLConnection'. The deserializer bolt says all cache requests are a >> hit and are not going to the SchemaRegistry >> >> "Thread-3-disruptor-executor[8 8]-send-queue" - Thread t@49 >> java.lang.Thread.State: RUNNABLE >> at java.net.SocketInputStream.socketRead0(Native Method) >> at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) >> at java.net.SocketInputStream.read(SocketInputStream.java:170) >> at java.net.SocketInputStream.read(SocketInputStream.java:141) >> at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) >> at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) >> at java.io.BufferedInputStream.read(BufferedInputStream.java:345) >> - locked <670c88ea> (a java.io.BufferedInputStream) >> at sun.net.www.MeteredStream.read(MeteredStream.java:134) >> - locked <2a8ccb96> (a sun.net.www.http.KeepAliveStream) >> at java.io.FilterInputStream.read(FilterInputStream.java:133) >> at sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read( >> HttpURLConnection.java:3336) >> at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper. >> ensureLoaded(ByteSourceJsonBootstrapper.java:503) >> at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper. >> detectEncoding(ByteSourceJsonBootstrapper.java:129) >> at com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper. >> constructParser(ByteSourceJsonBootstrapper.java:224) >> at com.fasterxml.jackson.core.JsonFactory._createParser( >> JsonFactory.java:1244) >> at com.fasterxml.jackson.core.JsonFactory.createParser( >> JsonFactory.java:755) >> at com.fasterxml.jackson.databind.ObjectMapper. >> readValue(ObjectMapper.java:2199) >> at io.confluent.kafka.schemaregistry.client.rest. >> utils.RestUtils.httpRequest(RestUtils.java:137) >> at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils. >> lookUpSubjectVersion(RestUtils.java:164) >> at org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry. >> getVersionFromRegistry(EnhancedCachedSchemaRegistry.java:80) >> at org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.getVersion( >> EnhancedCachedSchemaRegistry.java:176) >> - locked <6b561b0f> (a org.apache.storm.hdfs.avro. >> EnhancedCachedSchemaRegistry) >> at org.apache.storm.hdfs.avro.ConfluentAvroSerializer.getFingerprint( >> ConfluentAvroSerializer.java:74) >> at org.apache.storm.hdfs.avro.AbstractAvroSerializer.write( >> AbstractAvroSerializer.java:50) >> at org.apache.storm.hdfs.avro.AbstractAvroSerializer.write( >> AbstractAvroSerializer.java:45) >> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) >> at com.esotericsoftware.kryo.serializers.CollectionSerializer.write( >> CollectionSerializer.java:75) >> at com.esotericsoftware.kryo.serializers.CollectionSerializer.write( >> CollectionSerializer.java:18) >> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486) >> at backtype.storm.serialization.KryoValuesSerializer.serializeInto( >> KryoValuesSerializer.java:44) >> at backtype.storm.serialization.KryoTupleSerializer.serialize( >> KryoTupleSerializer.java:44) >> at backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__ >> 5543.invoke(worker.clj:142) >> at backtype.storm.daemon.executor$start_batch_transfer_ >> _GT_worker_handler_BANG_$fn__3539.invoke(executor.clj:274) >> at backtype.storm.disruptor$clojure_handler$reify__3196. >> onEvent(disruptor.clj:58) >> at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor( >> DisruptorQueue.java:125) >> at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable( >> DisruptorQueue.java:99) >> at backtype.storm.disruptor$consume_batch_when_available. >> invoke(disruptor.clj:80) >> at backtype.storm.disruptor$consume_loop_STAR_$fn__3209. >> invoke(disruptor.clj:94) >> at backtype.storm.util$async_loop$fn__544.invoke(util.clj:475) >> at clojure.lang.AFn.run(AFn.java:22) >> at java.lang.Thread.run(Thread.java:745) >> >> >> I've attached two screen shots of the JVisualVM CPU profilers. The one >> titled 'multiple-jvm' represents a topology with two workers and high CPU >> usage on RestUtil. The file title 'single-jvm' represents a topology with >> one worker and no CPU usage for RestUtil. >> >> >> I think this is pretty good evidence but I would love to know how to log >> from the serializer running in Kryo as this would give me 100% proof. Can >> anyone help me understand what is going on there? >> >> Thanks, >> >> Kris >> >> On Wed, Sep 7, 2016 at 12:11 PM, Aaron Niskodé-Dossett <[email protected] >> > wrote: >> >> Let us know what you find, especially if the serializer needs to be more >> defensive to ensure proper caching. >> >> On Tue, Sep 6, 2016 at 8:45 AM Kristopher Kane <[email protected]> >> wrote: >> >> Come to think of it, I did see RestUtils rank some what higher in the >> visualvm CPU profiler but did not give it the attention it deserved. >> >> On Tue, Sep 6, 2016 at 9:39 AM, Aaron Niskodé-Dossett <[email protected]> >> wrote: >> >> Hi Kris, >> >> One possibility is that the Serializer isn't actually caching the schema >> <-> id mappings and is hitting the schema registry every time. The call to >> register() in getFingerprint() [1] in particular can be a finicky since the >> cache is ultimately in an IDENTITY hash map, not a regular old hashmap[2]. >> I'm familiar with the Avro deserializer you're using and though it >> accounted for this, but perhaps not. >> >> You could add timing information to the getFingerprint() and getSchema() >> calls in ConfluentAvroSerializer. If the results indicate cache misses, >> that's probably your culprit. >> >> Best, Aaron >> >> >> [1] https://github.com/apache/storm/blob/master/external/ >> storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ >> ConfluentAvroSerializer.java#L66 >> [2] https://github.com/confluentinc/schema-registry/ >> blob/v1.0/client/src/main/java/io/confluent/kafka/schemaregistry/client/ >> CachedSchemaRegistryClient.java#L79 >> >> On Tue, Sep 6, 2016 at 7:40 AM Kristopher Kane <[email protected]> >> wrote: >> >> Hi everyone. >> >> I have a simple topology that uses the Avro serializer ( >> https://github.com/apache/storm/blob/master/external/ >> storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ >> ConfluentAvroSerializer.java) and writes to Elasticsearch. >> >> The topology is like this: >> >> Kafka (raw scheme) -> Avro deserializer -> Elasticsearch >> >> This topology runs well with one worker, however, once I add one more >> worker (total of two) and change nothing else, the topology throughput >> drops and tuples start timing out. >> >> I've attached visualvm/jstatd to the workers when in multi worker mode - >> and added some jmx configs to the worker opts - but I am unable to see >> anything glaring. >> >> I've never seen Storm act this way but have also never worked with a >> custom serializer so assume that it is the culprit but I cannot explain >> why. >> >> Any pointers would be appreciated. >> >> Kris >> >> >> >> >> >> >> >>
