> On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > >
Thanks for the feedback, I've taken them into account in a new patch > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java, line 137 > > <https://reviews.apache.org/r/8633/diff/1/?file=239332#file239332line137> > > > > Should we do the check with type.isAssignableFrom(data.type) instead of > > removing it? This is mostly to provide a meaningful error message, so we can do it in case of a class cast exception, and handle primitive types through wrappers. > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-benchmarks/README.md, line 61 > > <https://reviews.apache.org/r/8633/diff/1/?file=239339#file239339line61> > > > > repeated, remove will do > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java, > > line 105 > > <https://reviews.apache.org/r/8633/diff/1/?file=239361#file239361line105> > > > > Should we return a failed future here? > > > > e.g. return SettableFuture.create().setException(e); OK, makes sense, it will also make sure we don't try to process the event at this point. We might want to keep the interrupt status though: } catch (InterruptedException e) { Thread.currentThread().interrupt(); return Futures.immediateFailedCheckedFuture(e); } > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java, > > line 116 > > <https://reviews.apache.org/r/8633/diff/1/?file=239361#file239361line116> > > > > Same as line 105, a couple more below. yes > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java, > > line 47 > > <https://reviews.apache.org/r/8633/diff/1/?file=239363#file239363line47> > > > > > This can be help > > > > (remove extra be) ok > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java, > > line 43 > > <https://reviews.apache.org/r/8633/diff/1/?file=239364#file239364line43> > > > > Is this called from a single thread? This can be called from multiple threads, this is why we have some code in place to ensure only 1 emitter is used for a given remote topology. However, having a second look made me realize that, in case of concurrent access and creation of remote emitters, we should properly clean the unused emitter. I will update that. > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java, > > line 204 > > <https://reviews.apache.org/r/8633/diff/1/?file=239365#file239365line204> > > > > Maybe we should expose the InterruptedException to make it an > > explicitly blocking operation. yes good suggestion! > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java, > > line 21 > > <https://reviews.apache.org/r/8633/diff/1/?file=239338#file239338line21> > > > > remove yes > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java, > > line 45 > > <https://reviews.apache.org/r/8633/diff/1/?file=239376#file239376line45> > > > > It's better to not catch the exception and add the throw to the method > > declaration. If an exception is thrown the test fails as expected yes > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java, > > line 44 > > <https://reviews.apache.org/r/8633/diff/1/?file=239383#file239383line44> > > > > Same, if you agree. yes > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/MemoryAwareDeserializerExecutorFactory.java, > > line 18 > > <https://reviews.apache.org/r/8633/diff/1/?file=239362#file239362line18> > > > > I don't think it makes sense to have a separate > > OrderedMemoryAwareThreadPoolExecutor for each channel, that's why it has > > maxMemoryPerChannel/PerExecutor parameters. If we have one per channel I > > think we should use a non-memoryAware single thread pool. A major concern in this stage is to avoid overloading the memory, i.e. avoid overloading the deserialization task buffer. We can do it by using a memory aware executor (blocks submission when memory threshold reached), or indeed a single threaded thread pool with a bounded task queue and a blocking implementation (blocking task submission when buffer is full). I had one implemented but for some reason it was not included in the patch. Will add that and use it as default, since it's simpler. > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > build.gradle, line 150 > > <https://reviews.apache.org/r/8633/diff/1/?file=239328#file239328line150> > > > > It's already in the 'compile' configuration, no need to specify again. ok > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java, > > line 53 > > <https://reviews.apache.org/r/8633/diff/1/?file=239394#file239394line53> > > > > Add assertion failure as well. ok > On Dec. 26, 2012, 4:04 p.m., Daniel Gómez Ferro wrote: > > subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java, > > line 792 > > <https://reviews.apache.org/r/8633/diff/1/?file=239401#file239401line792> > > > > Should we change this API to ByteBuffers since they are more flexible? I'd rather leave that for later, if needed - Matthieu ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/8633/#review14909 ----------------------------------------------------------- On Dec. 17, 2012, 11:12 a.m., Matthieu Morel wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/8633/ > ----------------------------------------------------------- > > (Updated Dec. 17, 2012, 11:12 a.m.) > > > Review request for S4. > > > Description > ------- > > Performance improvements (S4-95 ended up being a lot more than just a > benchmarking framework): > - added integration with metrics (that could still be improved though) > - minimized buffer and object copies > - use kryo 2 for serialization > - refactored netty pipeline and provided throttling, blocking and load > shedding senders and receivers > - added a benchmarking framework for measuring communication overhead > > > This addresses bug S4-95. > https://issues.apache.org/jira/browse/S4-95 > > > Diffs > ----- > > build.gradle 60577de > lib/reflectasm-1.07-shaded.jar PRE-CREATION > settings.gradle c2b9b67 > subprojects/s4-base/src/main/java/org/apache/s4/base/Emitter.java c73dd1c > subprojects/s4-base/src/main/java/org/apache/s4/base/Event.java 4e47723 > subprojects/s4-base/src/main/java/org/apache/s4/base/EventMessage.java > e777687 > subprojects/s4-base/src/main/java/org/apache/s4/base/Listener.java 608d628 > subprojects/s4-base/src/main/java/org/apache/s4/base/Receiver.java > PRE-CREATION > subprojects/s4-base/src/main/java/org/apache/s4/base/Sender.java > PRE-CREATION > > subprojects/s4-base/src/main/java/org/apache/s4/base/SerializerDeserializer.java > f7ef101 > subprojects/s4-base/src/main/java/org/apache/s4/base/package-info.java > 5829e0c > subprojects/s4-benchmarks/README.md PRE-CREATION > subprojects/s4-benchmarks/bench-cluster.sh PRE-CREATION > subprojects/s4-benchmarks/config/injector.config PRE-CREATION > subprojects/s4-benchmarks/config/node.config PRE-CREATION > subprojects/s4-benchmarks/s4-benchmarks.gradle PRE-CREATION > > subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/DagApp.java > PRE-CREATION > > subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/FirstPE.java > PRE-CREATION > > subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/LastPE.java > PRE-CREATION > > subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/dag/PipePE.java > PRE-CREATION > > subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/ProducerConsumerApp.java > PRE-CREATION > > subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE1.java > PRE-CREATION > > subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/prodcon/SimplePE2.java > PRE-CREATION > > subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/InjectionLimiterModule.java > PRE-CREATION > > subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Injector.java > PRE-CREATION > > subprojects/s4-benchmarks/src/main/java/org/apache/s4/benchmark/utils/Utils.java > PRE-CREATION > subprojects/s4-benchmarks/src/main/resources/logback.xml PRE-CREATION > subprojects/s4-benchmarks/startInjector.sh PRE-CREATION > subprojects/s4-benchmarks/startNode.sh PRE-CREATION > subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java > d7c8cee > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/DeserializerExecutorFactory.java > PRE-CREATION > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/KryoSerDeser.java > b7fad75 > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/serialize/SerializerDeserializerFactory.java > PRE-CREATION > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java > PRE-CREATION > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/MemoryAwareDeserializerExecutorFactory.java > PRE-CREATION > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/ThrottlingThreadPoolExecutorService.java > PRE-CREATION > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java > 85dc86d > subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPEmitter.java > 44c9247 > subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPListener.java > d495033 > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/TCPRemoteEmitter.java > a22767b > subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPEmitter.java > 2f156b2 > subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPListener.java > 319c7ab > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/udp/UDPRemoteEmitter.java > 050a6ec > > subprojects/s4-comm/src/main/java/org/apache/s4/comm/util/EmitterMetrics.java > PRE-CREATION > subprojects/s4-comm/src/main/resources/default.s4.comm.properties 36417b4 > subprojects/s4-comm/src/test/java/org/apache/s4/comm/DeliveryTestUtil.java > 17340b8 > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/MultiPartitionDeliveryTest.java > 1813f7a > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/SimpleDeliveryTest.java > e46ed47 > subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPBasicTest.java > PRE-CREATION > subprojects/s4-comm/src/test/java/org/apache/s4/comm/tcp/TCPCommTest.java > c9153ce > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java > e3ff947 > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java > 768ffd0 > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ZKBaseTest.java > 2ef95f4 > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/MultiPartitionDeliveryTest.java > 4e06233 > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/SimpleDeliveryTest.java > b1c5d8a > subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPBasicTest.java > PRE-CREATION > subprojects/s4-comm/src/test/java/org/apache/s4/comm/udp/UDPCommTest.java > 0f6004f > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/PartitionInfo.java > 97cc973 > > subprojects/s4-comm/src/test/java/org/apache/s4/comm/util/ProtocolTestUtil.java > c647611 > subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java > 7316498 > subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiver.java > PRE-CREATION > > subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/MockReceiverModule.java > PRE-CREATION > subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiver.java > PRE-CREATION > > subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/NoOpReceiverModule.java > PRE-CREATION > > subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TCPTransportModule.java > PRE-CREATION > > subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/UDPTransportModule.java > PRE-CREATION > subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/ZkBasedTest.java > 411c7c6 > subprojects/s4-comm/src/test/resources/udp.s4.comm.properties 7b525bd > subprojects/s4-core/s4-core.gradle 0905ce7 > subprojects/s4-core/src/main/java/org/apache/s4/core/App.java 936d225 > subprojects/s4-core/src/main/java/org/apache/s4/core/AppModule.java > PRE-CREATION > subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultCoreModule.java > 5701640 > subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java fc85219 > subprojects/s4-core/src/main/java/org/apache/s4/core/ProcessingElement.java > b899403 > subprojects/s4-core/src/main/java/org/apache/s4/core/Receiver.java 6c0b19c > subprojects/s4-core/src/main/java/org/apache/s4/core/ReceiverImpl.java > PRE-CREATION > subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSender.java > daccbaa > subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java > 3a111d0 > subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteStream.java > c4a3798 > subprojects/s4-core/src/main/java/org/apache/s4/core/Sender.java 5b0b03d > subprojects/s4-core/src/main/java/org/apache/s4/core/SenderImpl.java > PRE-CREATION > subprojects/s4-core/src/main/java/org/apache/s4/core/Server.java 02e521a > subprojects/s4-core/src/main/java/org/apache/s4/core/Stream.java e5ef775 > > subprojects/s4-core/src/main/java/org/apache/s4/core/ft/FileSystemBackendCheckpointingModule.java > ca23c79 > subprojects/s4-core/src/main/java/org/apache/s4/core/ft/SafeKeeper.java > 3ffc91b > > subprojects/s4-core/src/main/java/org/apache/s4/core/gen/OverloadDispatcherGenerator.java > c0f07a7 > > subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingRemoteSendersExecutorServiceFactory.java > PRE-CREATION > > subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingSenderExecutorServiceFactory.java > PRE-CREATION > > subprojects/s4-core/src/main/java/org/apache/s4/core/staging/BlockingStreamExecutorServiceFactory.java > PRE-CREATION > > subprojects/s4-core/src/main/java/org/apache/s4/core/staging/LoadSheddingStreamExecutorServiceFactory.java > PRE-CREATION > > subprojects/s4-core/src/main/java/org/apache/s4/core/staging/RemoteSendersExecutorServiceFactory.java > PRE-CREATION > > subprojects/s4-core/src/main/java/org/apache/s4/core/staging/SenderExecutorServiceFactory.java > PRE-CREATION > > subprojects/s4-core/src/main/java/org/apache/s4/core/staging/StreamExecutorServiceFactory.java > PRE-CREATION > > subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingRemoteSendersExecutorServiceFactory.java > PRE-CREATION > > subprojects/s4-core/src/main/java/org/apache/s4/core/staging/ThrottlingSenderExecutorServiceFactory.java > PRE-CREATION > subprojects/s4-core/src/main/java/org/apache/s4/core/util/S4Metrics.java > PRE-CREATION > subprojects/s4-core/src/test/java/org/apache/s4/core/TriggerTest.java > 97b025c > > subprojects/s4-core/src/test/java/org/apache/s4/core/ft/CheckpointingTest.java > 6001595 > > subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java > c3d30b7 > subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java > f2b0297 > > subprojects/s4-core/src/test/java/org/apache/s4/core/timers/MultithreadingTest.java > 903f288 > > subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPE1.java > b1676dc > > subprojects/s4-core/src/test/java/org/apache/s4/core/windowing/WindowingPETest.java > 78d814a > > subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java > 1fbd37a > subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java > 7e88dd8 > > subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCommModule.java > 75143c1 > > subprojects/s4-core/src/test/java/org/apache/s4/fixtures/MockCoreModule.java > 71bae7a > > subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java > db45fcb > subprojects/s4-edsl/src/main/java/org/apache/s4/edsl/AppBuilder.java > 4e007b0 > > subprojects/s4-example/src/main/java/org/apache/s4/example/counter/MyApp.java > 0209933 > > test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java > 5d7855f > > Diff: https://reviews.apache.org/r/8633/diff/ > > > Testing > ------- > > - regression tests > - performance benchmarks (up to >200 k remote msgs/s/stream/node with > blocking sender) > - twitter example + other internal apps > > > Thanks, > > Matthieu Morel > >
