Cool. I'm going back to the public list to share the knowledge. If your trident topology is compiling down to 35 bolts, then it sound like you have a lot of partitioning operations. Is there any way you can reduce that? That's going to introduce a decent amount of network transfer.
And I would definitely try to figure out what's making the serialized topology so big. In that department, the prepare() method is your friend in terms of initializing state. -Taylor > On Mar 18, 2014, at 7:37 PM, Adam Lewis <[email protected]> wrote: > > Sure, sharing with the list is a great idea. I already checked the log > excerpts I sent you to ensure they are clean of anything proprietary so all > good there. > > I did a little more digging and it seems there is definitely > something...interesting...happening in the compiling process. My uncompiled > topology is pretty small memory wise (according to the profiler), but once it > gets built into the generated storm bolts, it becomes 35 bolts each of which > serializes to about 200kb...I was struck by how small the variance is amongst > the serialized sizes, as if something big-ish is getting duplicated into each > bolt. Some more experimentation revealed that part of the problem may be > that I have multiple DRPC spouts in a single topology. I noticed > super-linear growth in serialized topology size for each additional DRPC > spout I create. > > I'm thinking I will break each DRPC spout into its own topology, which I've > needed to do anyway since right now they seem to block each other from > processing even though I don't need requests to separate DRPCs to be wholly > ordered in the way I imagine trident would try to do. > > >> On Tue, Mar 18, 2014 at 7:24 PM, P. Taylor Goetz <[email protected]> wrote: >> Glad I could help. >> >> Trident does a lot in terms of "compiling" down to storms primitive >> structures. The relevant classes are Stream and TridentTopology in the >> storm.trident package (if my memory serves me correctly, I'm not at a >> computer...). >> >> It's not for the faint of heart, but once you start to wrap your head around >> it, it pretty cool. >> >> Do you mind if I share this thread with the user group? Some of the >> information could be beneficial to others. At the very least I think we >> should document what you found... And the Apache line is always "If it >> didn't happen on the list, it didn't happen." >> >> -Taylor >> >>> On Mar 18, 2014, at 6:04 PM, Adam Lewis <[email protected]> wrote: >>> >>> Hi Taylor, >>> >>> The submitter isn't multithreaded or anything funky...it is basically a >>> typical submitter from the docs that just happens to call submit multiple >>> times. >>> >>> I did just check and the java-serialized topology and it is indeed the >>> culprit (6662137 bytes, 17k larger than the thrift message). I can now >>> look for any obvious bugs in my topology component's non-transient state, >>> but I'm wondering if there is a more systematic way? These topologies are >>> all trident built, so the indirection that occurs between the trident DSL >>> API and the final storm topology is a bit opaque to me. The generated >>> thrift classes make this difficult to penetrate from the other direction. >>> Do you have any suggestion on where I can look to better understand the >>> overall building process? >>> >>> Thanks a lot for your help. >>> >>> Adam >>> >>> >>>> On Tue, Mar 18, 2014 at 5:40 PM, P. Taylor Goetz <[email protected]> wrote: >>>> Just to give you a little background… the thrift max buffer size was >>>> introduced to prevent a situation where a wayward connection (SSH, telnet, >>>> security port scanner, etc.) to the nimbus thrift port would cause nimbus >>>> to hang. >>>> >>>> When you submit a topology, three things get sent over thrift: >>>> >>>> 1. the topology jar file (big — so it gets uploaded in small chunks) >>>> 2. the serialized Config object (typically a relatively small bit of JSON) >>>> 3. the serialized Topology itself (this all depends…) >>>> >>>> My initial guess is that #3 is potentially really big (as you mention). >>>> You could test this by serializing the topology to disk to see how big it >>>> is. >>>> >>>> My second guess is that something abut your submitter app might be >>>> overflowing thrifts server-side buffer. Is it multi-threaded and >>>> submitting topos in parallel? >>>> >>>> If you submit that topology with the storm jar command, do you still get >>>> the error? >>>> >>>> - Taylor >>>> >>>> >>>>> On Mar 18, 2014, at 5:17 PM, Adam Lewis <[email protected]> wrote: >>>>> >>>>> I noticed that StormSubmitter has a code path for only doing a single >>>>> upload of the jar file if it has "already uploaded to master" >>>>> (StormSubmitter.java:142), but the only case where that will actually >>>>> happen is if you submit multiple topologies during the lifetime of the VM >>>>> started with the "storm jar" command (since "submittedJar" is just a >>>>> private static field). My main() class (passed to the "storm jar" >>>>> command) simply delegates to a series of classes which build >>>>> StormTopology instances, and then calls StormSubmitter.submitTopology(..) >>>>> in a loop in turn with each of the topologies I've built. >>>>> >>>>> I catch any exceptions inside the loop, so if some topology is already >>>>> running, I log a "java.lang.RuntimeException: Topology with name `...` >>>>> already exists on cluster" and continue on in the loop. This effectively >>>>> enforces that all my topologies are running, and if I want to redeploy >>>>> one or more, I kill them in the storm UI and re-run my deployer program >>>>> (via storm jar). >>>>> >>>>> So...in effect, after running the program the first time and having three >>>>> topologies deploy and one fail, subsequent runs would only try to deploy >>>>> that fourth topology not already running. One potentially important >>>>> detail of my submitter program is that I reuse the storm Config object >>>>> across multiple calls to submitTopology, as well as an internal >>>>> configuration class built from the command line arguments used during >>>>> topology building. >>>>> >>>>> This approach has been working without any (apparent) ill effects on >>>>> 0.9.0.1, and also seems to be working on 0.9.1 now that I've increased >>>>> that buffer to accommodate this 6.6MB thrift message. But how did that >>>>> message get to be 6.6MB? Does that message contain all the serialized >>>>> bolts which are perhaps inadvertently pulling in something big? >>>>> >>>>> >>>>>> On Tue, Mar 18, 2014 at 4:48 PM, P. Taylor Goetz <[email protected]> >>>>>> wrote: >>>>>> What happens when you just use the `storm jar` command to submit the >>>>>> topologies separately (i.e. same jar, different topo class name)? >>>>>> >>>>>> Can you give me more details on your submitter program and how it works? >>>>>> How are you “reusing” the uploaded jar file? >>>>>> >>>>>> - Taylor >>>>>> >>>>>>> On Mar 18, 2014, at 4:25 PM, Adam Lewis <[email protected]> wrote: >>>>>>> >>>>>>> It isn't the jar file, but something about the topology itself; I have >>>>>>> a submitter program that submits four topologies all from the same jar. >>>>>>> Upon submitting the first topology, the jar is uploaded and topology >>>>>>> starts, then the submitter submits two more topologies whilst "reusing" >>>>>>> the uploaded jar. The broken pipe occurs when trying to submit the >>>>>>> fourth (large) topology. That is why I was assuming the large message >>>>>>> was actually the encoded topology itself. This is reproducible and the >>>>>>> errors are as follows: >>>>>>> >>>>>>> nimbus.log: >>>>>>> >>>>>>>> 2014-03-18 18:16:39 o.a.t.s.TNonblockingServer [ERROR] Read a frame >>>>>>>> size of 6644632, which is bigger than the maximum allowable buffer >>>>>>>> size for ALL connections. >>>>>>> >>>>>>> storm jar console: >>>>>>> >>>>>>>>> 2321 [main] INFO backtype.storm.StormSubmitter - Uploading topology >>>>>>>>> jar >>>>>>>>> /Users/adam/git/impl/impl-storm/target/impl-storm-0.0.1-SNAPSHOT.jar >>>>>>>>> to assigned location: >>>>>>>>> /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar >>>>>>>>> 97762 [main] INFO backtype.storm.StormSubmitter - Successfully >>>>>>>>> uploaded topology jar to assigned location: >>>>>>>>> /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar >>>>>>>>> 97762 [main] INFO backtype.storm.StormSubmitter - Submitting >>>>>>>>> topology global__topo_forecastRuntime in distributed mode with conf >>>>>>>>> {"topology.fall.back.on.java.serialization":false,"topology.workers":2,"drpc.servers":["10.118.57.229"],"topology.debug":false,"topology.kryo.register":[{"org.joda.time.DateTime":"de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer"},{"org.joda.time.Interval":"de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer"},"com.mycompany.data.Simulated","com.mycompany.data.SomeClass1","com.mycompany.ml.SomeClass2","com.mycompany.model.SomeClass3","com.mycompany.model.SomeClass4",{"com.mycompany.ml.SomeClass4":"com.esotericsoftware.kryo.serializers.DefaultSerializers$EnumSerializer"},{"java.math.BigDecimal":"com.esotericsoftware.kryo.serializers.DefaultSerializers$BigDecimalSerializer"},{"java.sql.Date":"de.javakaffee.kryoserializers.DateSerializer"},{"com.tdunning.math.stats.TDigest":"com.mycompany.trident.tdigest.TDigestSerializer"},{"java.lang.Class":"com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer"},{"java.util.UUID":"de.javakaffee.kryoserializers.UUIDSerializer"},{"com.google.common.collect.RegularImmutableList":"backtype.storm.serialization.SerializableSerializer"}],"topology.max.spout.pending":16,"topology.message.timeout.secs":900,"drpc.request.timeout.secs":45} >>>>>>>>> java.lang.RuntimeException: >>>>>>>>> org.apache.thrift7.transport.TTransportException: >>>>>>>>> java.net.SocketException: Broken pipe >>>>>>>>> at >>>>>>>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:112) >>>>>>>>> at >>>>>>>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:58) >>>>>>>>> at >>>>>>>>> com.mycompany.runtime.DeployStormTopologies.main(DeployStormTopologies.java:92) >>>>>>>>> Caused by: org.apache.thrift7.transport.TTransportException: >>>>>>>>> java.net.SocketException: Broken pipe >>>>>>>>> at >>>>>>>>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147) >>>>>>>>> at >>>>>>>>> org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157) >>>>>>>>> at >>>>>>>>> org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65) >>>>>>>>> at >>>>>>>>> backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:156) >>>>>>>>> at >>>>>>>>> backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:145) >>>>>>>>> at >>>>>>>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:98) >>>>>>>>> ... 2 more >>>>>>>>> Caused by: java.net.SocketException: Broken pipe >>>>>>>>> at java.net.SocketOutputStream.socketWrite0(Native Method) >>>>>>>>> at >>>>>>>>> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) >>>>>>>>> at >>>>>>>>> java.net.SocketOutputStream.write(SocketOutputStream.java:153) >>>>>>>>> at >>>>>>>>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145) >>>>>>>>> ... 7 more >>>>>>> >>>>>>> >>>>>>> >>>>>>>> On Tue, Mar 18, 2014 at 4:12 PM, P. Taylor Goetz <[email protected]> >>>>>>>> wrote: >>>>>>>> It uploads the file in small (1024*5 bytes) chunks. >>>>>>>> >>>>>>>> Does this happen every time (i.e. reproducible)? What is the size of >>>>>>>> your topology jar? >>>>>>>> >>>>>>>> Can you post the server side message (I want to see the length it >>>>>>>> output). >>>>>>>> >>>>>>>> - Taylor >>>>>>>> >>>>>>>>> On Mar 18, 2014, at 3:40 PM, Adam Lewis <[email protected]> wrote: >>>>>>>>> >>>>>>>>> Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the >>>>>>>>> new thrift max buffer size (nicely logged on the server side, >>>>>>>>> although the client just gets a broken pipe stack trace form thrift) >>>>>>>>> with an approx 6 MB message(!). Increasing the configured limit >>>>>>>>> solves the problem, but I would have thought the 1MB default should >>>>>>>>> be enough. >>>>>>>>> >>>>>>>>> Does the storm submitter encode the entire topology as a single >>>>>>>>> thrift message? I'm really surprised that the message is coming out >>>>>>>>> so large, my topology isn't exactly small, but it only has about 20 >>>>>>>>> bolts...does anyone have any suggestions on how to determine why the >>>>>>>>> message is so large? Is this within the realm of what others have >>>>>>>>> seen or am I doing something wrong? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Adam >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >
