Thanks for the great update. Which version of schema-registry resolves the issue?
- Jungtaek Lim (HeartSaVioR) 2016년 10월 13일 (목) 오후 11:13, Kristopher Kane <kkane.l...@gmail.com>님이 작성: > To follow up on this, our specific problem was with a custom > CachedSchemaRegistry. > external/storm-hdfs master branch uses CachedSchemaRegistry from a > Confluent dep on version 1 which also has this bug. It is fixed in later > Confluent versions of confluentinc/schema-registry like this missing cache > entry in 1.x: > https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java#L152 > > I don't have time right now to see if storm-hdfs can bump to something > newer than > https://github.com/apache/storm/blob/master/external/storm-hdfs/pom.xml#L205 > but > will report if I do. > > Kris > > On Sun, Sep 11, 2016 at 1:31 PM, Kristopher Kane <kkane.l...@gmail.com> > wrote: > > This took a while as I could not get INFO logging to come out of the > serializer in Kryo. The CachedSchemaRegistry and > EnhancedCachedSchmeaRegistry - in our raw scheme deserializer (with > serialization into Avro) is 100% cache after the initial load. As you > said, the serializer with the IdentityHashMap in > org.apache.storm.hdfs.avro.ConfluentAvroSerializer.getFingerprint(ConfluentAvroSerializer.java:74) > looks > to be the problem - although I couldn't get it to log the cache misses for > me in the serializer itself. > > Here is the evidence: > > This thread dump is after the topology has been running for a few > minutes. No new schemas are being introduced but still opening > 'HttpURLConnection'. The deserializer bolt says all cache requests are a > hit and are not going to the SchemaRegistry > > "Thread-3-disruptor-executor[8 8]-send-queue" - Thread t@49 > java.lang.Thread.State: RUNNABLE > at java.net.SocketInputStream.socketRead0(Native Method) > at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) > at java.net.SocketInputStream.read(SocketInputStream.java:170) > at java.net.SocketInputStream.read(SocketInputStream.java:141) > at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) > at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) > at java.io.BufferedInputStream.read(BufferedInputStream.java:345) > - locked <670c88ea> (a java.io.BufferedInputStream) > at sun.net.www.MeteredStream.read(MeteredStream.java:134) > - locked <2a8ccb96> (a sun.net.www.http.KeepAliveStream) > at java.io.FilterInputStream.read(FilterInputStream.java:133) > at > sun.net.www.protocol.http.HttpURLConnection$HttpInputStream.read(HttpURLConnection.java:3336) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:503) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:129) > at > com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:224) > at > com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1244) > at > com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:755) > at > com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2199) > at > io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:137) > at > io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.lookUpSubjectVersion(RestUtils.java:164) > at > org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.getVersionFromRegistry(EnhancedCachedSchemaRegistry.java:80) > at > org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry.getVersion(EnhancedCachedSchemaRegistry.java:176) > - locked <6b561b0f> (a > org.apache.storm.hdfs.avro.EnhancedCachedSchemaRegistry) > at > org.apache.storm.hdfs.avro.ConfluentAvroSerializer.getFingerprint(ConfluentAvroSerializer.java:74) > at > org.apache.storm.hdfs.avro.AbstractAvroSerializer.write(AbstractAvroSerializer.java:50) > at > org.apache.storm.hdfs.avro.AbstractAvroSerializer.write(AbstractAvroSerializer.java:45) > at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:75) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) > at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:486) > at > backtype.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:44) > at > backtype.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) > at > backtype.storm.daemon.worker$mk_transfer_fn$transfer_fn__5543.invoke(worker.clj:142) > at > backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3539.invoke(executor.clj:274) > at > backtype.storm.disruptor$clojure_handler$reify__3196.onEvent(disruptor.clj:58) > at > backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) > at > backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) > at > backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) > at > backtype.storm.disruptor$consume_loop_STAR_$fn__3209.invoke(disruptor.clj:94) > at backtype.storm.util$async_loop$fn__544.invoke(util.clj:475) > at clojure.lang.AFn.run(AFn.java:22) > at java.lang.Thread.run(Thread.java:745) > > > I've attached two screen shots of the JVisualVM CPU profilers. The one > titled 'multiple-jvm' represents a topology with two workers and high CPU > usage on RestUtil. The file title 'single-jvm' represents a topology with > one worker and no CPU usage for RestUtil. > > > I think this is pretty good evidence but I would love to know how to log > from the serializer running in Kryo as this would give me 100% proof. Can > anyone help me understand what is going on there? > > Thanks, > > Kris > > On Wed, Sep 7, 2016 at 12:11 PM, Aaron Niskodé-Dossett <doss...@gmail.com> > wrote: > > Let us know what you find, especially if the serializer needs to be more > defensive to ensure proper caching. > > On Tue, Sep 6, 2016 at 8:45 AM Kristopher Kane <kkane.l...@gmail.com> > wrote: > > Come to think of it, I did see RestUtils rank some what higher in the > visualvm CPU profiler but did not give it the attention it deserved. > > On Tue, Sep 6, 2016 at 9:39 AM, Aaron Niskodé-Dossett <doss...@gmail.com> > wrote: > > Hi Kris, > > One possibility is that the Serializer isn't actually caching the schema > <-> id mappings and is hitting the schema registry every time. The call to > register() in getFingerprint() [1] in particular can be a finicky since the > cache is ultimately in an IDENTITY hash map, not a regular old hashmap[2]. > I'm familiar with the Avro deserializer you're using and though it > accounted for this, but perhaps not. > > You could add timing information to the getFingerprint() and getSchema() > calls in ConfluentAvroSerializer. If the results indicate cache misses, > that's probably your culprit. > > Best, Aaron > > > [1] > https://github.com/apache/storm/blob/master/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java#L66 > [2] > https://github.com/confluentinc/schema-registry/blob/v1.0/client/src/main/java/io/confluent/kafka/schemaregistry/client/CachedSchemaRegistryClient.java#L79 > > On Tue, Sep 6, 2016 at 7:40 AM Kristopher Kane <kkane.l...@gmail.com> > wrote: > > Hi everyone. > > I have a simple topology that uses the Avro serializer ( > https://github.com/apache/storm/blob/master/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java) > and writes to Elasticsearch. > > The topology is like this: > > Kafka (raw scheme) -> Avro deserializer -> Elasticsearch > > This topology runs well with one worker, however, once I add one more > worker (total of two) and change nothing else, the topology throughput > drops and tuples start timing out. > > I've attached visualvm/jstatd to the workers when in multi worker mode - > and added some jmx configs to the worker opts - but I am unable to see > anything glaring. > > I've never seen Storm act this way but have also never worked with a > custom serializer so assume that it is the culprit but I cannot explain > why. > > Any pointers would be appreciated. > > Kris > > > > > > > >