[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6071 ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194770093 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java --- @@ -88,6 +102,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { } } + @Override + public long length() { + return length; + } + + @Override + public long progress() { + return length - buf.readableBytes(); --- End diff -- changed to `return buf.readerIndex()` since progress is not well defined. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194769995 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java --- @@ -88,6 +102,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { } } + @Override + public long length() { + return length; --- End diff -- changed to `-1` ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194763826 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java --- @@ -75,12 +75,12 @@ private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length, int m @Override public ByteBuf unwrap() { - return super.unwrap().unwrap(); + return super.unwrap(); --- End diff -- as discussed offline, this is because of change in the implementation of `SlicedByteBuf`. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194718801 --- Diff: flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE; +import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; + +/** + * Test network stack with taskmanager.network.netty.transport set to epoll. This test car only run + * on linux. On other platforms it's basically a NO-OP. See + * https://github.com/apache/flink-shaded/issues/30 + */ +public class NettyEpollITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(NettyEpollITCase.class); + + private static final int TASK_MANAGERES = 2; + + @Test + public void testNettyEpoll() throws Exception { + Optional cluster = trySetUp(); + if (!cluster.isPresent()) { + return; + } + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(TASK_MANAGERES); + env.getConfig().disableSysoutLogging(); + + DataStream input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42); + input.keyBy(new KeySelector() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }) + .sum(0) + .print(); + + env.execute(); --- End diff -- The same question would remain: whether such artificial setup, setups a network stack in the same way as Flink. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194713008 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java --- @@ -97,11 +98,17 @@ public ReferenceCounted retain(int arg0) { @Override public ReferenceCounted touch() { + if (requestAsReferenceCounted.isPresent()) { + ReferenceCountUtil.touch(requestAsReferenceCounted.get()); + } return this; } @Override public ReferenceCounted touch(Object o) { + if (requestAsReferenceCounted.isPresent()) { + ReferenceCountUtil.touch(requestAsReferenceCounted.get()); --- End diff -- I think we could do that. I'm not entirely sure how this whole `touch` is suppose to work, but passing down the hint shouldn't harm anything and be more correct. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194711763 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java --- @@ -88,6 +102,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { } } + @Override + public long length() { + return length; --- End diff -- It seems like content of `buf` is not changing. However we can not guard against it programmatically, only via java doc. On the other hand, I'm not sure what would the side effects of "unknown" length be. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194671209 --- Diff: flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE; +import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; + +/** + * Test network stack with taskmanager.network.netty.transport set to epoll. This test car only run + * on linux. On other platforms it's basically a NO-OP. See + * https://github.com/apache/flink-shaded/issues/30 + */ +public class NettyEpollITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(NettyEpollITCase.class); + + private static final int TASK_MANAGERES = 2; + + @Test + public void testNettyEpoll() throws Exception { + Optional cluster = trySetUp(); + if (!cluster.isPresent()) { + return; + } + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(TASK_MANAGERES); + env.getConfig().disableSysoutLogging(); + + DataStream input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42); + input.keyBy(new KeySelector() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }) + .sum(0) + .print(); + + env.execute(); --- End diff -- Is it strictly necessary that this test must be an integration test case? As far as I understand the test case, wouldn't it be enough to setup a `NettyServer` with `epoll` activated and sending some data to it via a `NettyClient` with `epoll` activated? Since we don't assert anything else than whether the Flink program executes or not, it should be basically the same. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r19412 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java --- @@ -64,12 +69,13 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeFalse; import static org.junit.Assume.assumeTrue; /** * An abstract test class for channel buffers. * - * Copied from netty 4.0.50 with some changes to fit our netty version 4.0.27. + * Copy from netty 4.1.24.Final. */ public abstract class AbstractByteBufTest { --- End diff -- Let's extend this class from `TestLogger`. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194663296 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java --- @@ -97,11 +98,17 @@ public ReferenceCounted retain(int arg0) { @Override public ReferenceCounted touch() { + if (requestAsReferenceCounted.isPresent()) { + ReferenceCountUtil.touch(requestAsReferenceCounted.get()); + } return this; } @Override public ReferenceCounted touch(Object o) { + if (requestAsReferenceCounted.isPresent()) { + ReferenceCountUtil.touch(requestAsReferenceCounted.get()); --- End diff -- Shall we pass `o` to the `ReferenceCountUtil.touch` call? ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194652733 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java --- @@ -88,6 +102,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { } } + @Override + public long length() { + return length; + } + + @Override + public long progress() { + return length - buf.readableBytes(); --- End diff -- If the length is not know, then I think it would be better to return something like `buf.readerIndex - initialReaderIndex`. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194660815 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java --- @@ -75,12 +75,12 @@ private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length, int m @Override public ByteBuf unwrap() { - return super.unwrap().unwrap(); + return super.unwrap(); --- End diff -- But isn't it a bit counter-intuitive that you instantiate a `ReadOnlySlicedNetworkBuffer` with a `NetworkBuffer` and when you call `ReadOnlySlicedNetworkBuffer.unwrap` you don't get a `NetworkBuffer` back but a `SlicedByteBuf`? Thus, you need to know the internals of this class to know that you have to call twice `unwrap` to obtain the `NetworkBuffer`. This shows for example in the `NetworkBufferTest` where we always call `slice.unwrap().unwrap()`. What exactly did change with the structure of nested classes that this change is necessary? ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r194652495 --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/ChunkedByteBuf.java --- @@ -88,6 +102,16 @@ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { } } + @Override + public long length() { + return length; --- End diff -- Is it always guaranteed that `buf.readableBytes` won't change? If not, then I think we should return `-1` here, because the length is unknown. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r193345578 --- Diff: pom.xml --- @@ -300,15 +300,7 @@ under the License. org.apache.flink flink-shaded-netty - - 4.0.27.Final-${flink.shaded.version} + 4.1.24.Final-${flink.shaded.version} --- End diff -- I've opened #6128 to bump the remaining version so this PR can stay focused on netty. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191685431 --- Diff: pom.xml --- @@ -300,15 +300,7 @@ under the License. org.apache.flink flink-shaded-netty - - 4.0.27.Final-${flink.shaded.version} + 4.1.24.Final-${flink.shaded.version} --- End diff -- I hope that bumping to from `2.0` to `4.0` will not break other things, but yes, that was my intention when `4.0` will be on mvn central. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191685731 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java --- @@ -215,16 +242,33 @@ protected void _setMedium(int index, int value) { setByte(index + 2, (byte) value); } + @Override + protected void _setMediumLE(int index, int value){ + setByte(index, (byte) value); --- End diff -- added comment ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191700895 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java --- @@ -75,12 +75,12 @@ private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length, int m @Override public ByteBuf unwrap() { - return super.unwrap().unwrap(); + return super.unwrap(); --- End diff -- Necessary for the upgrade, otherwise our tests (including `ITCases`) are failing. In upgraded version there is a different structure of nested classes in case of using slices of readonly buffers. Possibly this is a bug fix for a bug, that didn't show up in older netty. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191700141 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java --- @@ -424,6 +480,19 @@ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOEx } } + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + // adapted from UnpooledDirectByteBuf: + checkIndex(index, length); + + ByteBuffer tmpBuf = memorySegment.wrap(index, length); + try { + return in.read(tmpBuf); --- End diff -- ops, good catch. Even better catch is that this was not covered by any test, because I assumed `AbstractByteBufTest` comes from netty, while in reality it was copied to our code. Fixed this lack of `position` bug and upgraded the `AbstractByteBufTest` as well - newer version was correctly failing with this bug. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191684561 --- Diff: flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE; +import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; + +/** + * Test network stack with taskmanager.network.netty.transport set to epoll. This test car only run + * on linux. On other platforms it's basically a NO-OP. See + * https://github.com/apache/flink-shaded/issues/30 + */ +public class NettyEpollITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(NettyEpollITCase.class); + + private static final int TASK_MANAGERES = 2; + + @Test + public void testNettyEpoll() throws Exception { + Optional cluster = trySetUp(); + if (!cluster.isPresent()) { + return; + } + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(TASK_MANAGERES); + env.getConfig().disableSysoutLogging(); + + DataStream input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42); + input.keyBy(new KeySelector() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }) + .sum(0) + .print(); + + env.execute(); --- End diff -- Hmmm, depends what you would like to test and depends on black/white box approach. With white box where you assume/know that `Netty` is loading `native` libraries only during setup and if you assume that if they are loaded, that they will work correctly, then no, execution is unnecessary. But I don't want to relay on both of those assumptions. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191683459 --- Diff: flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE; +import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; + +/** + * Test network stack with taskmanager.network.netty.transport set to epoll. This test car only run + * on linux. On other platforms it's basically a NO-OP. See + * https://github.com/apache/flink-shaded/issues/30 + */ +public class NettyEpollITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(NettyEpollITCase.class); + + private static final int TASK_MANAGERES = 2; + + @Test + public void testNettyEpoll() throws Exception { + Optional cluster = trySetUp(); + if (!cluster.isPresent()) { + return; + } + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(TASK_MANAGERES); + env.getConfig().disableSysoutLogging(); + + DataStream input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42); + input.keyBy(new KeySelector() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }) + .sum(0) + .print(); + + env.execute(); + } + finally { + cluster.get().after(); + } + } + + private Optional trySetUp() throws Exception { + try { + MiniClusterResource cluster = new MiniClusterResource( + new MiniClusterResourceConfiguration( + getConfiguration(), + TASK_MANAGERES, + 1), + true); + cluster.before(); + return Optional.of(cluster); + } + catch (UnsatisfiedLinkError ex) { + // If we failed to init netty because we are not on Linux platform, abort the test. + if (findThrowableWithMessage(ex, "Only supported on Linux").isPresent()) { + return Optional.empty(); --- End diff -- I didn't know about `AssumptionViolatedException`. Thanks for pointing this out. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191386855 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java --- @@ -424,6 +480,19 @@ public int setBytes(int index, ScatteringByteChannel in, int length) throws IOEx } } + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + // adapted from UnpooledDirectByteBuf: + checkIndex(index, length); + + ByteBuffer tmpBuf = memorySegment.wrap(index, length); + try { + return in.read(tmpBuf); --- End diff -- are you ignoring the `position` argument intentionally? ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191379939 --- Diff: flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE; +import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; + +/** + * Test network stack with taskmanager.network.netty.transport set to epoll. This test car only run + * on linux. On other platforms it's basically a NO-OP. See + * https://github.com/apache/flink-shaded/issues/30 + */ +public class NettyEpollITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(NettyEpollITCase.class); + + private static final int TASK_MANAGERES = 2; + + @Test + public void testNettyEpoll() throws Exception { + Optional cluster = trySetUp(); + if (!cluster.isPresent()) { + return; + } + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(TASK_MANAGERES); + env.getConfig().disableSysoutLogging(); + + DataStream input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42); + input.keyBy(new KeySelector() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }) + .sum(0) + .print(); + + env.execute(); + } + finally { + cluster.get().after(); + } + } + + private Optional trySetUp() throws Exception { + try { + MiniClusterResource cluster = new MiniClusterResource( + new MiniClusterResourceConfiguration( + getConfiguration(), + TASK_MANAGERES, + 1), + true); + cluster.before(); + return Optional.of(cluster); + } + catch (UnsatisfiedLinkError ex) { + // If we failed to init netty because we are not on Linux platform, abort the test. + if (findThrowableWithMessage(ex, "Only supported on Linux").isPresent()) { + return Optional.empty(); --- End diff -- couldn't you fail here with an ´AssumptionViolatedException`? Then we wouldn't have to deal with optionals. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191379552 --- Diff: flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE; +import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; + +/** + * Test network stack with taskmanager.network.netty.transport set to epoll. This test car only run --- End diff -- typo: can only run ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191386142 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java --- @@ -215,16 +242,33 @@ protected void _setMedium(int index, int value) { setByte(index + 2, (byte) value); } + @Override + protected void _setMediumLE(int index, int value){ + setByte(index, (byte) value); --- End diff -- was this also taken from `UnpooledDirectByteBuf`? ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191385215 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java --- @@ -75,12 +75,12 @@ private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length, int m @Override public ByteBuf unwrap() { - return super.unwrap().unwrap(); + return super.unwrap(); --- End diff -- Is this change necessary for the upgrade or cleanup? ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191380426 --- Diff: flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE; +import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; + +/** + * Test network stack with taskmanager.network.netty.transport set to epoll. This test car only run + * on linux. On other platforms it's basically a NO-OP. See + * https://github.com/apache/flink-shaded/issues/30 + */ +public class NettyEpollITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(NettyEpollITCase.class); + + private static final int TASK_MANAGERES = 2; + + @Test + public void testNettyEpoll() throws Exception { + Optional cluster = trySetUp(); + if (!cluster.isPresent()) { + return; + } + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(TASK_MANAGERES); + env.getConfig().disableSysoutLogging(); + + DataStream input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42); + input.keyBy(new KeySelector() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }) + .sum(0) + .print(); + + env.execute(); + } + finally { + cluster.get().after(); + } + } + + private Optional trySetUp() throws Exception { + try { + MiniClusterResource cluster = new MiniClusterResource( + new MiniClusterResourceConfiguration( + getConfiguration(), + TASK_MANAGERES, + 1), + true); + cluster.before(); + return Optional.of(cluster); + } + catch (UnsatisfiedLinkError ex) { + // If we failed to init netty because we are not on Linux platform, abort the test. + if (findThrowableWithMessage(ex, "Only supported on Linux").isPresent()) { + return Optional.empty(); + } + throw ex; + } + } + + private static Configuration getConfiguration() { --- End diff -- I would in-line this method. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191384185 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/router/RoutedRequest.java --- @@ -94,4 +94,14 @@ public ReferenceCounted retain(int arg0) { } return this; } + + @Override + public ReferenceCounted touch() { + return this; + } + + @Override + public ReferenceCounted touch(Object o) { + return this; --- End diff -- We may want to `touch` the contained request as well like in [this](https://github.com/netty/netty/blob/4.1/codec-redis/src/main/java/io/netty/handler/codec/redis/ArrayRedisMessage.java) class, depending on `requestAsReferenceCounted`. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191380808 --- Diff: flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE; +import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; + +/** + * Test network stack with taskmanager.network.netty.transport set to epoll. This test car only run + * on linux. On other platforms it's basically a NO-OP. See + * https://github.com/apache/flink-shaded/issues/30 + */ +public class NettyEpollITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(NettyEpollITCase.class); + + private static final int TASK_MANAGERES = 2; + + @Test + public void testNettyEpoll() throws Exception { + Optional cluster = trySetUp(); + if (!cluster.isPresent()) { + return; + } + + try { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(TASK_MANAGERES); + env.getConfig().disableSysoutLogging(); + + DataStream input = env.fromElements(1, 2, 3, 4, 1, 2, 3, 42); + input.keyBy(new KeySelector() { + @Override + public Integer getKey(Integer value) throws Exception { + return value; + } + }) + .sum(0) + .print(); + + env.execute(); --- End diff -- The job execution isn't necessary is it? ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191381531 --- Diff: pom.xml --- @@ -300,15 +300,7 @@ under the License. org.apache.flink flink-shaded-netty - - 4.0.27.Final-${flink.shaded.version} + 4.1.24.Final-${flink.shaded.version} --- End diff -- you (will) have to bump `flink.shaded.version` to 4.0. Doing this will not negatively affect other dependencies. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191379687 --- Diff: flink-tests/src/test/java/org/apache/flink/test/runtime/NettyEpollITCase.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.runtime; + +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.test.util.MiniClusterResource.MiniClusterResourceConfiguration; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Optional; + +import static org.apache.flink.runtime.io.network.netty.NettyConfig.TRANSPORT_TYPE; +import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage; + +/** + * Test network stack with taskmanager.network.netty.transport set to epoll. This test car only run + * on linux. On other platforms it's basically a NO-OP. See + * https://github.com/apache/flink-shaded/issues/30 + */ +public class NettyEpollITCase extends TestLogger { + + private static final Logger LOG = LoggerFactory.getLogger(NettyEpollITCase.class); + + private static final int TASK_MANAGERES = 2; --- End diff -- `NUM_TASK_MANAGERS`, also a type ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r191150025 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -247,7 +249,15 @@ public void shutdown(Time timeout) { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { - if (msg instanceof FullHttpResponse) { + // TODO: should this check for status OK (200) and treat all other as errors? --- End diff -- yes I think we can just drop it. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r190884312 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -247,7 +249,15 @@ public void shutdown(Time timeout) { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { - if (msg instanceof FullHttpResponse) { + // TODO: should this check for status OK (200) and treat all other as errors? --- End diff -- Thx for filling this out. In that case should I just drop this `// TODO:`? ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r190705206 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -247,7 +249,15 @@ public void shutdown(Time timeout) { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) { - if (msg instanceof FullHttpResponse) { + // TODO: should this check for status OK (200) and treat all other as errors? --- End diff -- message semantics are handled later when parsing the payload, which effectively does what you're suggesting. The else branch for debugging; our rest servers always return `FullHttpResponses`. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r190554721 --- Diff: pom.xml --- @@ -308,7 +308,7 @@ under the License. errors. [1] https://github.com/netty/netty/issues/3704 --> --- End diff -- Ops, dropped. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/6071#discussion_r190554149 --- Diff: pom.xml --- @@ -308,7 +308,7 @@ under the License. errors. [1] https://github.com/netty/netty/issues/3704 --> --- End diff -- Looks like you can remove this comment now. ---
[GitHub] flink pull request #6071: [FLINK-3952][runtine] Upgrade to Netty 4.1
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/6071 [FLINK-3952][runtine] Upgrade to Netty 4.1 This PR adjusts our code to work with Netty 4.1. It also includes possible bug fix to file uploading cleanup in FileUploadHandler and HttpRequestHandler. For mor information look here: https://github.com/netty/netty/issues/7611 First commit is only for having green travis and will be dropped once new `flink-shadded-netty` will be released. ## Verifying this change This change is covered by variety of pre existing tests. Furthermore I have manually verified that issue mentioned by @uce in the commit message here: https://github.com/apache/flink/commit/d92e422ec7089376583a8f57043274d236c340a4 doesn't happen: - I have reproduced this issue on a test cluster with Flink 1.0-XXX - I have verified that the same job passes without any problems after upgrading to Netty 4.1 I have also run our network benchmark suite and verified that there are no performance changes after this change. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (**yes** / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (**yes** / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f3952 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6071.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6071 commit 7b3be7c9ebac7392136ed85bca4664559710552e Author: Piotr Nowojski Date: 2018-05-14T11:46:08Z [hotfix][tests] Report failure with error level instead of debug commit afaf1d5181c7133a040bf3881723e240145a4b0a Author: Piotr Nowojski Date: 2018-05-16T19:26:36Z [FLINK-9386] Embed netty router This commit replaces netty-router dependency with our own version of it, which is simplified and adds guarantees about order of matching router patterns. This is a prerequisite for FLINK-3952. netty-router 1.10 is incompatible with Netty 4.1, while netty-router 2.2.0 brakes a compatibility in a way that we were unable to use it. commit 26bc92db0863bf53e60164ab5f6b92ac3b424506 Author: Piotr Nowojski Date: 2018-05-14T10:30:31Z Embed flink-shaded-netty-4 commit 94a4cc2237b5dac0c004ec192eb4d7f1b782e5f2 Author: Piotr Nowojski Date: 2018-05-16T19:27:22Z [FLINK-3952][runtine] Upgrade to Netty 4.1 This commit includes possible bug fix to file uploading cleanup in FileUploadHandler and HttpRequestHandler. For mor information look here: https://github.com/netty/netty/issues/7611 ---