> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> >

Thanks for the feedback, I've taken them into account in a new patch


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java, line 137
> > <https://reviews.apache.org/r/8633/diff/1/?file=239332#file239332line137>
> >
> >     Should we do the check with type.isAssignableFrom(data.type) instead of 
> > removing it?

This is mostly to provide a meaningful error message, so we can do it in case 
of a class cast exception, and handle primitive types through wrappers.


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-benchmarks/README.md, line 61
> > <https://reviews.apache.org/r/8633/diff/1/?file=239339#file239339line61>
> >
> >     repeated, remove

will do


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java,
> >  line 105
> > <https://reviews.apache.org/r/8633/diff/1/?file=239361#file239361line105>
> >
> >     Should we return a failed future here?
> >     
> >     e.g. return SettableFuture.create().setException(e);

OK, makes sense, it will also make sure we don't try to process the event at 
this point. We might want to keep the interrupt status though:
            } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return Futures.immediateFailedCheckedFuture(e);
        }


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java,
> >  line 116
> > <https://reviews.apache.org/r/8633/diff/1/?file=239361#file239361line116>
> >
> >     Same as line 105, a couple more below.

yes


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java,
> >  line 47
> > <https://reviews.apache.org/r/8633/diff/1/?file=239363#file239363line47>
> >
> >     > This can be help 
> >     
> >     (remove extra be)

ok


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java,
> >  line 43
> > <https://reviews.apache.org/r/8633/diff/1/?file=239364#file239364line43>
> >
> >     Is this called from a single thread?

This can be called from multiple threads, this is why we have some code in 
place to ensure only 1 emitter is used for a given remote topology.

However, having a second look made me realize that, in case of concurrent 
access and creation of remote emitters, we should properly clean the unused 
emitter. I will update that.


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java, 
> > line 204
> > <https://reviews.apache.org/r/8633/diff/1/?file=239365#file239365line204>
> >
> >     Maybe we should expose the InterruptedException to make it an 
> > explicitly blocking operation.

yes good suggestion!


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java, 
> > line 21
> > <https://reviews.apache.org/r/8633/diff/1/?file=239338#file239338line21>
> >
> >     remove

yes


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java, 
> > line 45
> > <https://reviews.apache.org/r/8633/diff/1/?file=239376#file239376line45>
> >
> >     It's better to not catch the exception and add the throw to the method 
> > declaration. If an exception is thrown the test fails as expected

yes


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java, 
> > line 44
> > <https://reviews.apache.org/r/8633/diff/1/?file=239383#file239383line44>
> >
> >     Same, if you agree.

yes


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/MemoryAwareDeserializerExecutorFactory.java,
> >  line 18
> > <https://reviews.apache.org/r/8633/diff/1/?file=239362#file239362line18>
> >
> >     I don't think it makes sense to have a separate 
> > OrderedMemoryAwareThreadPoolExecutor for each channel, that's why it has 
> > maxMemoryPerChannel/PerExecutor parameters. If we have one per channel I 
> > think we should use a non-memoryAware single thread pool.

A major concern in this stage is to avoid overloading the memory, i.e. avoid 
overloading the deserialization task buffer. We can do it by using a memory 
aware executor (blocks submission when memory threshold reached), or indeed a 
single threaded thread pool with a bounded task queue and a blocking 
implementation (blocking task submission when buffer is full). I had one 
implemented but for some reason it was not included in the patch. Will add that 
and use it as default, since it's simpler.


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > build.gradle, line 150
> > <https://reviews.apache.org/r/8633/diff/1/?file=239328#file239328line150>
> >
> >     It's already in the 'compile' configuration, no need to specify again.

ok


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java, 
> > line 53
> > <https://reviews.apache.org/r/8633/diff/1/?file=239394#file239394line53>
> >
> >     Add assertion failure as well.

ok


> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote:
> > subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java,
> >  line 792
> > <https://reviews.apache.org/r/8633/diff/1/?file=239401#file239401line792>
> >
> >     Should we change this API to ByteBuffers since they are more flexible?

I'd rather leave that for later, if needed


- Matthieu


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/8633/#review14909
-----------------------------------------------------------


On Dec. 17, 2012, 11:12 a.m., Matthieu Morel wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/8633/
> -----------------------------------------------------------
> 
> (Updated Dec. 17, 2012, 11:12 a.m.)
> 
> 
> Review request for S4.
> 
> 
> Description
> -------
> 
> Performance improvements (S4-95 ended up being a lot more than just a 
> benchmarking framework):
> - added integration with metrics (that could still be improved though)
> - minimized buffer and object copies
> - use kryo 2 for serialization
> - refactored netty pipeline and provided throttling, blocking and load 
> shedding senders and receivers
> - added a benchmarking framework for measuring communication overhead
> 
> 
> This addresses bug S4-95.
>     https://issues.apache.org/jira/browse/S4-95
> 
> 
> Diffs
> -----
> 
>   build.gradle 60577de 
>   lib/reflectasm-1.07-shaded.jar PRE-CREATION 
>   settings.gradle c2b9b67 
>   subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java c73dd1c 
>   subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java 4e47723 
>   subprojects/s4-base/src/main/java/org/apache/s4/base/EventMessage.java 
> e777687 
>   subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java 608d628 
>   subprojects/s4-base/src/main/java/org/apache/s4/base/Receiver.java 
> PRE-CREATION 
>   subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java 
> PRE-CREATION 
>   
> subprojects/s4-base/src/main/java/org/apache/s4/base/SerializerDeserializer.java
>  f7ef101 
>   subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java 
> 5829e0c 
>   subprojects/s4-benchmarks/README.md PRE-CREATION 
>   subprojects/s4-benchmarks/bench-cluster.sh PRE-CREATION 
>   subprojects/s4-benchmarks/config/injector.config PRE-CREATION 
>   subprojects/s4-benchmarks/config/node.config PRE-CREATION 
>   subprojects/s4-benchmarks/s4-benchmarks.gradle PRE-CREATION 
>   
> subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/DagApp.java
>  PRE-CREATION 
>   
> subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/FirstPE.java
>  PRE-CREATION 
>   
> subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/LastPE.java
>  PRE-CREATION 
>   
> subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/PipePE.java
>  PRE-CREATION 
>   
> subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java
>  PRE-CREATION 
>   
> subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE1.java
>  PRE-CREATION 
>   
> subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE2.java
>  PRE-CREATION 
>   
> subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/InjectionLimiterModule.java
>  PRE-CREATION 
>   
> subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java
>  PRE-CREATION 
>   
> subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java
>  PRE-CREATION 
>   subprojects/s4-benchmarks/src/main/resources/logback.xml PRE-CREATION 
>   subprojects/s4-benchmarks/startInjector.sh PRE-CREATION 
>   subprojects/s4-benchmarks/startNode.sh PRE-CREATION 
>   subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java 
> d7c8cee 
>   
> subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java
>  b7fad75 
>   
> subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/SerializerDeserializerFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java
>  PRE-CREATION 
>   
> subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/MemoryAwareDeserializerExecutorFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java
>  PRE-CREATION 
>   
> subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java 
> 85dc86d 
>   subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java 
> 44c9247 
>   subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java 
> d495033 
>   
> subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java
>  a22767b 
>   subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java 
> 2f156b2 
>   subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java 
> 319c7ab 
>   
> subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java
>  050a6ec 
>   
> subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java 
> PRE-CREATION 
>   subprojects/s4-comm/src/main/resources/default.s4.comm.properties 36417b4 
>   subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java 
> 17340b8 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java
>  1813f7a 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java
>  e46ed47 
>   subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java 
> PRE-CREATION 
>   subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java 
> c9153ce 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
>  e3ff947 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
>  768ffd0 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java 
> 2ef95f4 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java
>  4e06233 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java
>  b1c5d8a 
>   subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java 
> PRE-CREATION 
>   subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java 
> 0f6004f 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java 
> 97cc973 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java
>  c647611 
>   subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java 
> 7316498 
>   subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java 
> PRE-CREATION 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java
>  PRE-CREATION 
>   subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java 
> PRE-CREATION 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java
>  PRE-CREATION 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TCPTransportModule.java
>  PRE-CREATION 
>   
> subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/UDPTransportModule.java
>  PRE-CREATION 
>   subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java 
> 411c7c6 
>   subprojects/s4-comm/src/test/resources/udp.s4.comm.properties 7b525bd 
>   subprojects/s4-core/s4-core.gradle 0905ce7 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/App.java 936d225 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java 
> PRE-CREATION 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java 
> 5701640 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java fc85219 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java 
> b899403 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java 6c0b19c 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java 
> PRE-CREATION 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java 
> daccbaa 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java 
> 3a111d0 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java 
> c4a3798 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java 5b0b03d 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java 
> PRE-CREATION 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java 02e521a 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java e5ef775 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java
>  ca23c79 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java 
> 3ffc91b 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/gen/OverloadDispatcherGenerator.java
>  c0f07a7 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingStreamExecutorServiceFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java
>  PRE-CREATION 
>   
> subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java
>  PRE-CREATION 
>   subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java 
> PRE-CREATION 
>   subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java 
> 97b025c 
>   
> subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java
>  6001595 
>   
> subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java 
> c3d30b7 
>   subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java 
> f2b0297 
>   
> subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java
>  903f288 
>   
> subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java
>  b1676dc 
>   
> subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java
>  78d814a 
>   
> subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
>  1fbd37a 
>   subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java 
> 7e88dd8 
>   
> subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java 
> 75143c1 
>   
> subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java 
> 71bae7a 
>   
> subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java 
> db45fcb 
>   subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java 
> 4e007b0 
>   
> subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java 
> 0209933 
>   
> test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
>  5d7855f 
> 
> Diff: https://reviews.apache.org/r/8633/diff/
> 
> 
> Testing
> -------
> 
> - regression tests
> - performance benchmarks (up to >200 k remote msgs/s/stream/node with 
> blocking sender)
> - twitter example + other internal apps
> 
> 
> Thanks,
> 
> Matthieu Morel
> 
>

Reply via email to