Cool. I'm going back to the public list to share the knowledge.

If your trident topology is compiling down to 35 bolts, then it sound like you 
have a lot of partitioning operations. Is there any way you can reduce that? 
That's going to introduce a decent amount of network transfer.

And I would definitely try to figure out what's making the serialized topology 
so big. In that department, the prepare() method is your friend in terms of 
initializing state.

-Taylor

> On Mar 18, 2014, at 7:37 PM, Adam Lewis <[email protected]> wrote:
> 
> Sure, sharing with the list is a great idea.  I already checked the log 
> excerpts I sent you to ensure they are clean of anything proprietary so all 
> good there.  
> 
> I did a little more digging and it seems there is definitely 
> something...interesting...happening in the compiling process.  My uncompiled 
> topology is pretty small memory wise (according to the profiler), but once it 
> gets built into the generated storm bolts, it becomes 35 bolts each of which 
> serializes to about 200kb...I was struck by how small the variance is amongst 
> the serialized sizes, as if something big-ish is getting duplicated into each 
> bolt.  Some more experimentation revealed that part of the problem may be 
> that I have multiple DRPC spouts in a single topology.  I noticed 
> super-linear growth in serialized topology size for each additional DRPC 
> spout I create.
> 
> I'm thinking I will break each DRPC spout into its own topology, which I've 
> needed to do anyway since right now they seem to block each other from 
> processing even though I don't need requests to separate DRPCs to be wholly 
> ordered in the way I imagine trident would try to do.
> 
> 
>> On Tue, Mar 18, 2014 at 7:24 PM, P. Taylor Goetz <[email protected]> wrote:
>> Glad I could help.
>> 
>> Trident does a lot in terms of "compiling" down to storms primitive 
>> structures. The relevant classes are Stream and TridentTopology in the 
>> storm.trident package (if my memory serves me correctly, I'm not at a 
>> computer...).
>> 
>> It's not for the faint of heart, but once you start to wrap your head around 
>> it, it pretty cool.
>> 
>> Do you mind if I share this thread with the user group? Some of the 
>> information could be beneficial to others. At the very least I think we 
>> should document what you found... And the Apache line is always "If it 
>> didn't happen on the list, it didn't happen."
>> 
>> -Taylor
>> 
>>> On Mar 18, 2014, at 6:04 PM, Adam Lewis <[email protected]> wrote:
>>> 
>>> Hi Taylor,
>>> 
>>> The submitter isn't multithreaded or anything funky...it is basically a 
>>> typical submitter from the docs that just happens to call submit multiple 
>>> times.  
>>> 
>>> I did just check and the java-serialized topology and it is indeed the 
>>> culprit (6662137 bytes, 17k larger than the thrift message).  I can now 
>>> look for any obvious bugs in my topology component's non-transient state, 
>>> but I'm wondering if there is a more systematic way? These topologies are 
>>> all trident built, so the indirection that occurs between the trident DSL 
>>> API and the final storm topology is a bit opaque to me.  The generated 
>>> thrift classes make this difficult to penetrate from the other direction.  
>>> Do you have any suggestion on where I can look to better understand the 
>>> overall building process?
>>> 
>>> Thanks a lot for your help.
>>> 
>>> Adam
>>> 
>>> 
>>>> On Tue, Mar 18, 2014 at 5:40 PM, P. Taylor Goetz <[email protected]> wrote:
>>>> Just to give you a little background… the thrift max buffer size was 
>>>> introduced to prevent a situation where a wayward connection (SSH, telnet, 
>>>> security port scanner, etc.) to the nimbus thrift port would cause nimbus 
>>>> to hang.
>>>> 
>>>> When you submit a topology, three things get sent over thrift:
>>>> 
>>>> 1. the topology jar file (big — so it gets uploaded in small chunks)
>>>> 2. the serialized Config object (typically a relatively small bit of JSON)
>>>> 3. the serialized Topology itself (this all depends…)
>>>> 
>>>> My initial guess is that #3 is potentially really big (as you mention). 
>>>> You could test this by serializing the topology to disk to see how big it 
>>>> is.
>>>> 
>>>> My second guess is that something abut your submitter app might be 
>>>> overflowing thrifts server-side buffer. Is it multi-threaded and 
>>>> submitting topos in parallel?
>>>> 
>>>> If you submit that topology with the storm jar command, do you still get 
>>>> the error?
>>>> 
>>>> - Taylor
>>>> 
>>>> 
>>>>> On Mar 18, 2014, at 5:17 PM, Adam Lewis <[email protected]> wrote:
>>>>> 
>>>>> I noticed that StormSubmitter has a code path for only doing a single 
>>>>> upload of the jar file if it has "already uploaded to master" 
>>>>> (StormSubmitter.java:142), but the only case where that will actually 
>>>>> happen is if you submit multiple topologies during the lifetime of the VM 
>>>>> started with the "storm jar" command (since "submittedJar" is just a 
>>>>> private static field).  My main() class (passed to the "storm jar" 
>>>>> command) simply delegates to a series of classes which build 
>>>>> StormTopology instances, and then calls StormSubmitter.submitTopology(..) 
>>>>> in a loop in turn with each of the topologies I've built.
>>>>> 
>>>>> I catch any exceptions inside the loop, so if some topology is already 
>>>>> running, I log a "java.lang.RuntimeException: Topology with name `...` 
>>>>> already exists on cluster" and continue on in the loop.  This effectively 
>>>>> enforces that all my topologies are running, and if I want to redeploy 
>>>>> one or more, I kill them in the storm UI and re-run my deployer program 
>>>>> (via storm jar).
>>>>> 
>>>>> So...in effect, after running the program the first time and having three 
>>>>> topologies deploy and one fail, subsequent runs would only try to deploy 
>>>>> that fourth topology not already running.  One potentially important 
>>>>> detail of my submitter program is that I reuse the storm Config object 
>>>>> across multiple calls to submitTopology, as well as an internal 
>>>>> configuration class built from the command line arguments used during 
>>>>> topology building.
>>>>> 
>>>>> This approach has been working without any (apparent) ill effects on 
>>>>> 0.9.0.1, and also seems to be working on 0.9.1 now that I've increased 
>>>>> that buffer to accommodate this 6.6MB thrift message.  But how did that 
>>>>> message get to be 6.6MB?  Does that message contain all the serialized 
>>>>> bolts which are perhaps inadvertently pulling in something big?
>>>>> 
>>>>> 
>>>>>> On Tue, Mar 18, 2014 at 4:48 PM, P. Taylor Goetz <[email protected]> 
>>>>>> wrote:
>>>>>> What happens when you just use the `storm jar` command to submit the 
>>>>>> topologies separately (i.e. same jar, different topo class name)?
>>>>>> 
>>>>>> Can you give me more details on your submitter program and how it works? 
>>>>>> How are you “reusing” the uploaded jar file?
>>>>>> 
>>>>>> - Taylor
>>>>>> 
>>>>>>> On Mar 18, 2014, at 4:25 PM, Adam Lewis <[email protected]> wrote:
>>>>>>> 
>>>>>>> It isn't the jar file, but something about the topology itself; I have 
>>>>>>> a submitter program that submits four topologies all from the same jar. 
>>>>>>>  Upon submitting the first topology, the jar is uploaded and topology 
>>>>>>> starts, then the submitter submits two more topologies whilst "reusing" 
>>>>>>> the uploaded jar.  The broken pipe occurs when trying to submit the 
>>>>>>> fourth (large) topology.  That is why I was assuming the large message 
>>>>>>> was actually the encoded topology itself.  This is reproducible and the 
>>>>>>> errors are as follows:
>>>>>>> 
>>>>>>> nimbus.log:
>>>>>>> 
>>>>>>>> 2014-03-18 18:16:39 o.a.t.s.TNonblockingServer [ERROR] Read a frame 
>>>>>>>> size of 6644632, which is bigger than the maximum allowable buffer 
>>>>>>>> size for ALL connections.
>>>>>>> 
>>>>>>> storm jar console:
>>>>>>> 
>>>>>>>>> 2321 [main] INFO  backtype.storm.StormSubmitter - Uploading topology 
>>>>>>>>> jar 
>>>>>>>>> /Users/adam/git/impl/impl-storm/target/impl-storm-0.0.1-SNAPSHOT.jar 
>>>>>>>>> to assigned location: 
>>>>>>>>> /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar
>>>>>>>>> 97762 [main] INFO  backtype.storm.StormSubmitter - Successfully 
>>>>>>>>> uploaded topology jar to assigned location: 
>>>>>>>>> /mnt/storm/nimbus/inbox/stormjar-04acf27a-a6d1-4a9e-9231-9f4f5f30fd03.jar
>>>>>>>>> 97762 [main] INFO  backtype.storm.StormSubmitter - Submitting 
>>>>>>>>> topology global__topo_forecastRuntime in distributed mode with conf 
>>>>>>>>> {"topology.fall.back.on.java.serialization":false,"topology.workers":2,"drpc.servers":["10.118.57.229"],"topology.debug":false,"topology.kryo.register":[{"org.joda.time.DateTime":"de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer"},{"org.joda.time.Interval":"de.javakaffee.kryoserializers.jodatime.JodaIntervalSerializer"},"com.mycompany.data.Simulated","com.mycompany.data.SomeClass1","com.mycompany.ml.SomeClass2","com.mycompany.model.SomeClass3","com.mycompany.model.SomeClass4",{"com.mycompany.ml.SomeClass4":"com.esotericsoftware.kryo.serializers.DefaultSerializers$EnumSerializer"},{"java.math.BigDecimal":"com.esotericsoftware.kryo.serializers.DefaultSerializers$BigDecimalSerializer"},{"java.sql.Date":"de.javakaffee.kryoserializers.DateSerializer"},{"com.tdunning.math.stats.TDigest":"com.mycompany.trident.tdigest.TDigestSerializer"},{"java.lang.Class":"com.esotericsoftware.kryo.serializers.DefaultSerializers$ClassSerializer"},{"java.util.UUID":"de.javakaffee.kryoserializers.UUIDSerializer"},{"com.google.common.collect.RegularImmutableList":"backtype.storm.serialization.SerializableSerializer"}],"topology.max.spout.pending":16,"topology.message.timeout.secs":900,"drpc.request.timeout.secs":45}
>>>>>>>>> java.lang.RuntimeException: 
>>>>>>>>> org.apache.thrift7.transport.TTransportException: 
>>>>>>>>> java.net.SocketException: Broken pipe
>>>>>>>>>       at 
>>>>>>>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:112)
>>>>>>>>>       at 
>>>>>>>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:58)
>>>>>>>>>       at 
>>>>>>>>> com.mycompany.runtime.DeployStormTopologies.main(DeployStormTopologies.java:92)
>>>>>>>>> Caused by: org.apache.thrift7.transport.TTransportException: 
>>>>>>>>> java.net.SocketException: Broken pipe
>>>>>>>>>       at 
>>>>>>>>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:147)
>>>>>>>>>       at 
>>>>>>>>> org.apache.thrift7.transport.TFramedTransport.flush(TFramedTransport.java:157)
>>>>>>>>>       at 
>>>>>>>>> org.apache.thrift7.TServiceClient.sendBase(TServiceClient.java:65)
>>>>>>>>>       at 
>>>>>>>>> backtype.storm.generated.Nimbus$Client.send_submitTopology(Nimbus.java:156)
>>>>>>>>>       at 
>>>>>>>>> backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:145)
>>>>>>>>>       at 
>>>>>>>>> backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:98)
>>>>>>>>>       ... 2 more
>>>>>>>>> Caused by: java.net.SocketException: Broken pipe
>>>>>>>>>       at java.net.SocketOutputStream.socketWrite0(Native Method)
>>>>>>>>>       at 
>>>>>>>>> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
>>>>>>>>>       at 
>>>>>>>>> java.net.SocketOutputStream.write(SocketOutputStream.java:153)
>>>>>>>>>       at 
>>>>>>>>> org.apache.thrift7.transport.TIOStreamTransport.write(TIOStreamTransport.java:145)
>>>>>>>>>       ... 7 more
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>> On Tue, Mar 18, 2014 at 4:12 PM, P. Taylor Goetz <[email protected]> 
>>>>>>>> wrote:
>>>>>>>> It uploads the file in small (1024*5 bytes) chunks.
>>>>>>>> 
>>>>>>>> Does this happen every time (i.e. reproducible)? What is the size of 
>>>>>>>> your topology jar?
>>>>>>>> 
>>>>>>>> Can you post the server side message (I want to see the length it 
>>>>>>>> output).
>>>>>>>> 
>>>>>>>> - Taylor
>>>>>>>> 
>>>>>>>>> On Mar 18, 2014, at 3:40 PM, Adam Lewis <[email protected]> wrote:
>>>>>>>>> 
>>>>>>>>> Upon upgrading from 0.9.0.1 to 0.9.1-incubating, I'm exceeding the 
>>>>>>>>> new thrift max buffer size (nicely logged on the server side, 
>>>>>>>>> although the client just gets a broken pipe stack trace form thrift) 
>>>>>>>>> with an approx 6 MB message(!).  Increasing the configured limit 
>>>>>>>>> solves the problem, but I would have thought the 1MB default should 
>>>>>>>>> be enough.
>>>>>>>>> 
>>>>>>>>> Does the storm submitter encode the entire topology as a single 
>>>>>>>>> thrift message?  I'm really surprised that the message is coming out 
>>>>>>>>> so large, my topology isn't exactly small, but it only has about 20 
>>>>>>>>> bolts...does anyone have any suggestions on how to determine why the 
>>>>>>>>> message is so large?  Is this within the realm of what others have 
>>>>>>>>> seen or am I doing something wrong?
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Adam
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
> 

Reply via email to