[GitHub] [flink] flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250#issuecomment-592336212 ## CI report: * 912485a96d61febfa41b7c84631aeae19d819325 Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/150964833) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5718) * e474761e49927bb30bac6a297e992dc2ec98e01a UNKNOWN * b8d51c94a0b93fdbfa4b167e0b4c630f791fba10 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution
hequn8128 commented on a change in pull request #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution URL: https://github.com/apache/flink/pull/11208#discussion_r385552486 ## File path: flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java ## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.python.scalar.arrow; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.python.PythonFunctionRunner; +import org.apache.flink.python.env.PythonEnvironmentManager; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.python.PythonFunctionInfo; +import org.apache.flink.table.runtime.arrow.ArrowReader; +import org.apache.flink.table.runtime.arrow.ArrowUtils; +import org.apache.flink.table.runtime.operators.python.scalar.AbstractRowPythonScalarFunctionOperator; +import org.apache.flink.table.runtime.runners.python.scalar.arrow.ArrowPythonScalarFunctionRunner; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowStreamReader; +import org.apache.beam.sdk.fn.data.FnDataReceiver; + +import java.io.IOException; + +/** + * Arrow Python {@link ScalarFunction} operator for the old planner. + */ +@Internal +public class ArrowPythonScalarFunctionOperator extends AbstractRowPythonScalarFunctionOperator { Review comment: ok, let's keep it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16245) Use a delegating classloader as the user code classloader to prevent class leaks.
[ https://issues.apache.org/jira/browse/FLINK-16245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047288#comment-17047288 ] Guowei Ma commented on FLINK-16245: --- This means that there would still be some object leak, but the cost will be much smaller because only one delegate ClassLoader might be leaked if the user does not clean up the context class loader. > Use a delegating classloader as the user code classloader to prevent class > leaks. > - > > Key: FLINK-16245 > URL: https://issues.apache.org/jira/browse/FLINK-16245 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Stephan Ewen >Priority: Critical > Labels: usability > Fix For: 1.11.0 > > > As reported in FLINK-11205, a reference to the user-code ClassLoader can be > held by some libraries, causing class leaks. > One way to circumvent this class leak is if the ClassLoader that we set as > the user-code ClassLoader is a delegating ClassLoader to the real class > loader, and when closing the user code ClassLoader we null out the reference. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] hequn8128 commented on a change in pull request #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams
hequn8128 commented on a change in pull request #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams URL: https://github.com/apache/flink/pull/11220#discussion_r385546143 ## File path: flink-python/setup.py ## @@ -224,7 +224,7 @@ def remove_if_exists(file_path): author_email='d...@flink.apache.org', python_requires='>=3.5', install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.19.0', - 'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1'], + 'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 'jsonpickle==1.2'], Review comment: Since we only introduce jsonpickle now and the dependency is rather small, it does not hurt non-ML users while brings convenience for ML users. I find more advantages here. If we add NumPy support for ML, we can add all these dependencies into the optional at that time. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams
hequn8128 commented on a change in pull request #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams URL: https://github.com/apache/flink/pull/11220#discussion_r385546143 ## File path: flink-python/setup.py ## @@ -224,7 +224,7 @@ def remove_if_exists(file_path): author_email='d...@flink.apache.org', python_requires='>=3.5', install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.19.0', - 'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1'], + 'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 'jsonpickle==1.2'], Review comment: Since we only introduce jsonpickle now and the dependency is rather small, it does not hurt non-ML users while brings convenience for ML users. I find more advantages here. If we add NumPy support for ML, we can add all these dependencies into the optional at that time. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk edited a comment on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId
zhuzhurk edited a comment on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId URL: https://github.com/apache/flink/pull/11148#issuecomment-592390176 >> The problem is that we are unable to manipulate the state of the ExecutionVertex easily such that getPreferredLocations() returns the desired result That's true and subclassing is just to solve this kind of problems. From the references you list, I find most people also think so. It's not perfect but better than no way. Another problem of `SchedulerTestUtils` which currently uses mockito is that it only mocks what are needed by existing tests. So future tests may encounter NPE issue again when it uses these utils, e.g. `getResourceProfile()`. If you think it's better to have a minimal fix (i.e. using mockito to mock `ExecutionVertex#getID()`) first to unblock higher priority tasks, we can do it first and discuss #11230 later. Or check whether there is a better way to solve the problem of `SchedulerTestUtils`. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId
zhuzhurk commented on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId URL: https://github.com/apache/flink/pull/11148#issuecomment-592390176 >> The problem is that we are unable to manipulate the state of the ExecutionVertex easily such that getPreferredLocations() returns the desired result That's true and subclassing is just to solve this kind of problems. From the references you list, I find most people also think so. It's not perfect but better than no way. And Flink already uses it for the testing of `SlotPoolImpl` with `TestingSlotPoolImpl`. Another problem of `SchedulerTestUtils` which currently uses mockito is that it only mocks what are needed by existing tests. So future tests may encounter NPE issue again when it uses these utils, e.g. `getResourceProfile()`. If you think it's better to have a minimal fix (i.e. using mockito to mock `ExecutionVertex#getID()`) first to unblock higher priority tasks, we can do it first and discuss #11230 later. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 edited a comment on issue #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams
hequn8128 edited a comment on issue #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams URL: https://github.com/apache/flink/pull/11220#issuecomment-591807495 > @hequn8128 Thanks for the PR. > > 1. I noticed that there are two modules "ml" and "mllib" introduced. I'm wondering if one module such as ml is enough? @dianfu This sounds like a good idea. We can add api and lib under the `pyflink.ml` package, i.e., `pyflink.ml.api` and `pyflink.ml.lib`. The package directly under the pyflink should be an entire module, somehow consistent with `pyflink.table`. @becketqin @walterddr What's your opinion on this. If we reach a consensus on this, we may need an extra vote since it is different from the FLIP wiki page. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250#issuecomment-592336212 ## CI report: * 912485a96d61febfa41b7c84631aeae19d819325 Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/150964833) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5718) * e474761e49927bb30bac6a297e992dc2ec98e01a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
flinkbot edited a comment on issue #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing URL: https://github.com/apache/flink/pull/11213#issuecomment-590853060 ## CI report: * b31d7fab8da81827ca063075427a3737377bf0ed UNKNOWN * cade94bacbaa7fa76c0a0d013eb069406d01b6ad Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150784419) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5667) * d557df5126a0a4f2404ac08ccea25000e8495e25 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150968498) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5719) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams
hequn8128 commented on a change in pull request #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams URL: https://github.com/apache/flink/pull/11220#discussion_r385546143 ## File path: flink-python/setup.py ## @@ -224,7 +224,7 @@ def remove_if_exists(file_path): author_email='d...@flink.apache.org', python_requires='>=3.5', install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 'apache-beam==2.19.0', - 'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1'], + 'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 'jsonpickle==1.2'], Review comment: Since we only introduce jsonpickle now and the dependency is rather small, it does not hurt non-ML users while brings convenience for ML users. I find more advantages here. If we add NumPy support for ML, we can add all these dependencies into the optional at that time. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design
flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design URL: https://github.com/apache/flink/pull/11233#issuecomment-591843120 ## CI report: * 786a5056c957863c05ad24b00ca1dca032905eb0 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150962647) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5717) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11236: [FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do not match when generic or POJO type is requested
flinkbot edited a comment on issue #11236: [FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do not match when generic or POJO type is requested URL: https://github.com/apache/flink/pull/11236#issuecomment-591902160 ## CI report: * fd488f5cf0bf428170cb72cf434e01780cac8ef7 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150821616) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5689) * c2e5a4fb73a7b2aca22e914e57b36bc3128bb966 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385541960 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceStartPosition, src.readerIndex()); + + verifyBufferContent(src, sourceStartPosition, sourceLength - sourceStartPosition, sourceStartPosition); + } + + @Test + public void testAccumulateWithCopy() { + final int firstSourceLength = 128; + final int firstSourceStartPosition = 32; + final int secondSourceLength = 64; + final int secondSourceStartPosition = 0; + final int expectedAccumulationSize = 128; + + final int firstCopyLength = firstSourceLength - firstSourceStartPosition; + final int secondCopyLength = expectedAccumulationSize - firstCopyLength; + + ByteBuf firstSource = createSourceBuffer(firstSourceLength, firstSourceStartPosition); + ByteBuf secondSource = createSourceBuffer(secondSourceLength, secondSourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src does not have enough data, src will be copied into target and null will be returned. + ByteBuf accumulated = ByteBufUtils.accumulate( + target, + firstSource, + expectedAccumulationSize, + target.readableBytes()); + assertNull(accumulated); + assertEquals(firstSourceLength, firstSource.readerIndex()); + assertEquals(firstCopyLength, target.readableBytes()); + + // The remaining data will be copied from the second buffer, and the target buffer will be returned + // after all data is accumulated. + accumulated = ByteBufUtils.accumulate( + target, + secondSource, + expectedAccumulationSize, + target.readableBytes()); + assertSame(target, accumulated); + assertEquals(secondSourceStartPosition + secondCopyLength, secondSource.readerIndex()); + assertEquals(expectedAccumulationSize, target.readableBytes()); + + verifyBufferContent(accumulated, 0, firstCopyLength, firstSourceStartPosition); + verifyBufferContent(accumulated, firstCopyLength, secondCopyLength, secondSourceStartPosition); + } + + private ByteBuf createSourceBuffer(int size, int readerIndex) { + ByteBuf buf = Unpooled.buffer(size); + for (int i = 0; i < size; ++i) { + buf.writeByte((byte) i); + } + + buf.readerIndex(readerIndex); + +
[GitHub] [flink] flinkbot edited a comment on issue #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
flinkbot edited a comment on issue #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing URL: https://github.com/apache/flink/pull/11213#issuecomment-590853060 ## CI report: * b31d7fab8da81827ca063075427a3737377bf0ed UNKNOWN * cade94bacbaa7fa76c0a0d013eb069406d01b6ad Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150784419) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5667) * d557df5126a0a4f2404ac08ccea25000e8495e25 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11236: [FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do not match when generic or POJO type is requested
flinkbot edited a comment on issue #11236: [FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do not match when generic or POJO type is requested URL: https://github.com/apache/flink/pull/11236#issuecomment-591902160 ## CI report: * fd488f5cf0bf428170cb72cf434e01780cac8ef7 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150821616) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5689) * c2e5a4fb73a7b2aca22e914e57b36bc3128bb966 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385540907 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceStartPosition, src.readerIndex()); + + verifyBufferContent(src, sourceStartPosition, sourceLength - sourceStartPosition, sourceStartPosition); + } + + @Test + public void testAccumulateWithCopy() { + final int firstSourceLength = 128; + final int firstSourceStartPosition = 32; + final int secondSourceLength = 64; + final int secondSourceStartPosition = 0; + final int expectedAccumulationSize = 128; + + final int firstCopyLength = firstSourceLength - firstSourceStartPosition; + final int secondCopyLength = expectedAccumulationSize - firstCopyLength; + + ByteBuf firstSource = createSourceBuffer(firstSourceLength, firstSourceStartPosition); + ByteBuf secondSource = createSourceBuffer(secondSourceLength, secondSourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src does not have enough data, src will be copied into target and null will be returned. + ByteBuf accumulated = ByteBufUtils.accumulate( + target, + firstSource, + expectedAccumulationSize, + target.readableBytes()); + assertNull(accumulated); + assertEquals(firstSourceLength, firstSource.readerIndex()); + assertEquals(firstCopyLength, target.readableBytes()); + + // The remaining data will be copied from the second buffer, and the target buffer will be returned + // after all data is accumulated. + accumulated = ByteBufUtils.accumulate( + target, + secondSource, + expectedAccumulationSize, + target.readableBytes()); + assertSame(target, accumulated); + assertEquals(secondSourceStartPosition + secondCopyLength, secondSource.readerIndex()); + assertEquals(expectedAccumulationSize, target.readableBytes()); + + verifyBufferContent(accumulated, 0, firstCopyLength, firstSourceStartPosition); Review comment: I guess we can remove this intermediate result, and only verify the final result which can cover it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[GitHub] [flink] flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design
flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design URL: https://github.com/apache/flink/pull/11233#issuecomment-591843120 ## CI report: * 786a5056c957863c05ad24b00ca1dca032905eb0 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150962647) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5717) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385540280 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceStartPosition, src.readerIndex()); + + verifyBufferContent(src, sourceStartPosition, sourceLength - sourceStartPosition, sourceStartPosition); + } + + @Test + public void testAccumulateWithCopy() { + final int firstSourceLength = 128; + final int firstSourceStartPosition = 32; + final int secondSourceLength = 64; Review comment: we can make two sources have same length to achieve the same goal, then to void too many vars. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16326) Eagerly validate strictly required Flink configurations for Stateful Functions
Tzu-Li (Gordon) Tai created FLINK-16326: --- Summary: Eagerly validate strictly required Flink configurations for Stateful Functions Key: FLINK-16326 URL: https://issues.apache.org/jira/browse/FLINK-16326 Project: Flink Issue Type: Improvement Components: Stateful Functions Affects Versions: statefun-1.1 Reporter: Tzu-Li (Gordon) Tai Currently, when Stateful Functions users want to set their own Flink configurations, they are required to build on top of a base template {{flink-conf.yaml}} which has some strictly required configurations predefined, such as parent-first classloading and state backend settings. These Flink settings should never (as of now) be changed by the user, but there is no validation of that in place. We should do that eagerly pre-submission of the translated job, probably in {{StatefulFunctionsConfig}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385538145 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; Review comment: remove final to keep consistency with others. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385538145 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; Review comment: remove final to keep consistent with others. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385537626 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceStartPosition, src.readerIndex()); + Review comment: nit: remove this empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385537572 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + Review comment: nit: remove this empty line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385536593 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; + final int expectedAccumulationSize = 16; + + ByteBuf src = createSourceBuffer(sourceLength, sourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src has enough data and no data has been copied yet, src will be returned without modification. + ByteBuf accumulated = ByteBufUtils.accumulate(target, src, expectedAccumulationSize, target.readableBytes()); + + assertSame(src, accumulated); + assertEquals(sourceStartPosition, src.readerIndex()); + + verifyBufferContent(src, sourceStartPosition, sourceLength - sourceStartPosition, sourceStartPosition); + } + + @Test + public void testAccumulateWithCopy() { + final int firstSourceLength = 128; + final int firstSourceStartPosition = 32; + final int secondSourceLength = 64; + final int secondSourceStartPosition = 0; + final int expectedAccumulationSize = 128; + + final int firstCopyLength = firstSourceLength - firstSourceStartPosition; + final int secondCopyLength = expectedAccumulationSize - firstCopyLength; + + ByteBuf firstSource = createSourceBuffer(firstSourceLength, firstSourceStartPosition); + ByteBuf secondSource = createSourceBuffer(secondSourceLength, secondSourceStartPosition); + + ByteBuf target = Unpooled.buffer(expectedAccumulationSize); + + // If src does not have enough data, src will be copied into target and null will be returned. + ByteBuf accumulated = ByteBufUtils.accumulate( + target, + firstSource, + expectedAccumulationSize, + target.readableBytes()); + assertNull(accumulated); + assertEquals(firstSourceLength, firstSource.readerIndex()); + assertEquals(firstCopyLength, target.readableBytes()); + + // The remaining data will be copied from the second buffer, and the target buffer will be returned + // after all data is accumulated. + accumulated = ByteBufUtils.accumulate( + target, + secondSource, + expectedAccumulationSize, + target.readableBytes()); + assertSame(target, accumulated); + assertEquals(secondSourceStartPosition + secondCopyLength, secondSource.readerIndex()); + assertEquals(expectedAccumulationSize, target.readableBytes()); + + verifyBufferContent(accumulated, 0, firstCopyLength, firstSourceStartPosition); + verifyBufferContent(accumulated, firstCopyLength, secondCopyLength, secondSourceStartPosition); + } + + private ByteBuf createSourceBuffer(int size, int readerIndex) { + ByteBuf buf = Unpooled.buffer(size); + for (int i = 0; i < size; ++i) { + buf.writeByte((byte) i); Review comment: `buf.writeByte(i)`?
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385536489 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; + +/** + * Tests the methods in {@link ByteBufUtils}. + */ +public class ByteBufUtilsTest { + + @Test + public void testAccumulateWithoutCopy() { + final int sourceLength = 128; + final int sourceStartPosition = 32; Review comment: sourceStartPosition -> sourceReaderIndex This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250#issuecomment-592336212 ## CI report: * 912485a96d61febfa41b7c84631aeae19d819325 Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/150964833) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5718) * e474761e49927bb30bac6a297e992dc2ec98e01a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on issue #11236: [FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do not match when generic or POJO type is requested
wuchong commented on issue #11236: [FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do not match when generic or POJO type is requested URL: https://github.com/apache/flink/pull/11236#issuecomment-592370151 @flinkbot run travis This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #11188: [FLINK-16202][sql] Support JSON_QUERY for blink planner
wuchong commented on a change in pull request #11188: [FLINK-16202][sql] Support JSON_QUERY for blink planner URL: https://github.com/apache/flink/pull/11188#discussion_r385535789 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala ## @@ -4195,4 +4195,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f55=f57", "true") } + + //--- + // JSON functions + //--- + @Test + def testJsonQuery(): Unit = { +// lax test +testSqlApi("json_query('{\"foo\":100}', 'lax $' null on empty)", "{\"foo\":100}") Review comment: Could you add an other test without error behavior? `json_query('{\"foo\":100}', 'lax $'`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #11188: [FLINK-16202][sql] Support JSON_QUERY for blink planner
wuchong commented on a change in pull request #11188: [FLINK-16202][sql] Support JSON_QUERY for blink planner URL: https://github.com/apache/flink/pull/11188#discussion_r385535362 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala ## @@ -4195,4 +4195,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f55=f57", "true") } + + //--- + // JSON functions + //--- + @Test + def testJsonQuery(): Unit = { Review comment: Let's move json tests to a new test class, e.g. `JsonFunctionsTest`. `ScalarFunctionsTest` is too large now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #11188: [FLINK-16202][sql] Support JSON_QUERY for blink planner
wuchong commented on a change in pull request #11188: [FLINK-16202][sql] Support JSON_QUERY for blink planner URL: https://github.com/apache/flink/pull/11188#discussion_r385535655 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala ## @@ -4195,4 +4195,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f55=f57", "true") } + + //--- + // JSON functions + //--- + @Test + def testJsonQuery(): Unit = { +// lax test +testSqlApi("json_query('{\"foo\":100}', 'lax $' null on empty)", "{\"foo\":100}") +testSqlApi("json_query('{\"foo\":100}', 'lax $' error on empty)", "{\"foo\":100}") +testSqlApi("json_query('{\"foo\":100}', 'lax $' empty array on empty)", "{\"foo\":100}"); +testSqlApi("json_query('{\"foo\":100}', 'lax $' empty object on empty)", "{\"foo\":100}"); +testSqlApi("json_query('{\"foo\":100}', 'lax $.foo' null on empty)", "null"); +testSqlApi("json_query('{\"foo\":100}', 'lax $.foo' empty array on empty)", "[]"); +testSqlApi("json_query('{\"foo\":100}', 'lax $.foo' empty object on empty)", "{}"); + +// path error test +testSqlApi("json_query('{\"foo\":100}', 'invalid $.foo' null on error)", "null"); +testSqlApi("json_query('{\"foo\":100}', 'invalid $.foo' empty array on error)", "[]"); +testSqlApi("json_query('{\"foo\":100}', 'invalid $.foo' empty object on error)", "{}"); + +// strict test +testSqlApi("json_query('{\"foo\":100}', 'strict $' null on empty)", "{\"foo\":100}"); +testSqlApi("json_query('{\"foo\":100}', 'strict $' error on empty)", "{\"foo\":100}"); +testSqlApi("json_query('{\"foo\":100}', 'strict $' empty array on error)", "{\"foo\":100}"); +testSqlApi("json_query('{\"foo\":100}', 'strict $' empty object on error)", "{\"foo\":100}"); + +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo1' null on error)", "null"); +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo1' empty array on error)", "[]"); +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo1' empty object on error)", "{}"); +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' null on error)", "null"); +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' empty array on error)", "[]"); +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' empty object on error)", "{}"); + +// array wrapper test +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' without wrapper)", "null"); +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' without array wrapper)", "null"); +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' with wrapper)", "[100]"); +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' with unconditional wrapper)", "[100]"); +testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' with conditional wrapper)", "[100]"); +testSqlApi("json_query('{\"foo\":[100]}', 'strict $.foo' without wrapper)", "[100]"); +testSqlApi("json_query('{\"foo\":[100]}', 'strict $.foo' without array wrapper)", "[100]"); +testSqlApi("json_query('{\"foo\":[100]}', 'strict $.foo' with wrapper)", "[[100]]"); +testSqlApi("json_query('{\"foo\":[100]}', 'strict $.foo' with unconditional wrapper)", + "[[100]]"); +testSqlApi("json_query('{\"foo\":[100]}', 'strict $.foo' with conditional wrapper)", "[100]"); + +// nulls +testSqlApi("json_query(cast(null as varchar), 'lax $')", "null") Review comment: Please also add some tests on a column reference? Currently, all the testing data is char. Please also test on non-string columns. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing URL: https://github.com/apache/flink/pull/11213#discussion_r385534676 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -280,23 +281,8 @@ private ExecutionGraph createExecutionGraph( failoverStrategy); } - /** -* @deprecated Direct access to the execution graph by scheduler implementations is discouraged -* because currently the execution graph has various features and responsibilities that a -* scheduler should not be concerned about. The following specialized abstractions to the -* execution graph and accessors should be preferred over direct access: -* -* {@link #getSchedulingTopology()} -* {@link #getFailoverTopology()} -* {@link #getInputsLocationsRetriever()} -* {@link #getExecutionVertex(ExecutionVertexID)} -* {@link #getExecutionVertexId(ExecutionAttemptID)} -* {@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)} -* -* Currently, only {@link LegacyScheduler} requires direct access to the execution graph. -*/ - @Deprecated - protected ExecutionGraph getExecutionGraph() { + @VisibleForTesting + public ExecutionGraph getExecutionGraph() { Review comment: I will think again about whether we must expose `getExecutionGraph()` to the tests. Will drop this commit and may do it when reworking the tests. And as you mentioned, it should be a non-hotfix commit then if we would have it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing URL: https://github.com/apache/flink/pull/11213#discussion_r385534676 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -280,23 +281,8 @@ private ExecutionGraph createExecutionGraph( failoverStrategy); } - /** -* @deprecated Direct access to the execution graph by scheduler implementations is discouraged -* because currently the execution graph has various features and responsibilities that a -* scheduler should not be concerned about. The following specialized abstractions to the -* execution graph and accessors should be preferred over direct access: -* -* {@link #getSchedulingTopology()} -* {@link #getFailoverTopology()} -* {@link #getInputsLocationsRetriever()} -* {@link #getExecutionVertex(ExecutionVertexID)} -* {@link #getExecutionVertexId(ExecutionAttemptID)} -* {@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)} -* -* Currently, only {@link LegacyScheduler} requires direct access to the execution graph. -*/ - @Deprecated - protected ExecutionGraph getExecutionGraph() { + @VisibleForTesting + public ExecutionGraph getExecutionGraph() { Review comment: I will think again about whether we must expose `getExecutionGraph()` to the tests. Will drop this commit and may do it when reworking the tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16274) Add typed builder methods for setting dynamic configuration on StatefulFunctionsAppContainers
[ https://issues.apache.org/jira/browse/FLINK-16274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047259#comment-17047259 ] Tzu-Li (Gordon) Tai commented on FLINK-16274: - [~igal] thanks for the comment. {quote} a) Having users (statefun core devs) supplying the flink-conf.yaml as part of the e2e test makes the e2e test more likely to catch property renaming in Flink. {quote} I'm not sure I'm following your thoughts on this. If a config key happens to be renamed in Flink / Statefun, the e2e should still catch them all the same, because in the end _some_ flink-conf.yaml is being used. {quote} b) It requires pulling in a dependency on flink-statefun-core into the test driving code, that is otherwise independent of the runtime which is a very nice property for an end-to-end black box test. {quote} The only reason we have that extra dependency on {{statefun-flink-core}} right now is only due to this: https://github.com/apache/flink-statefun/pull/35/files#diff-663c1410bf2a21982e28d4512b1de018R161 I feel like this can be resolved by perhaps moving the configuration-related classes to a common module, instead of residing in core. > Add typed builder methods for setting dynamic configuration on > StatefulFunctionsAppContainers > - > > Key: FLINK-16274 > URL: https://issues.apache.org/jira/browse/FLINK-16274 > Project: Flink > Issue Type: New Feature > Components: Stateful Functions, Test Infrastructure >Affects Versions: statefun-1.1 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai >Priority: Major > Labels: pull-request-available > Time Spent: 10m > Remaining Estimate: 0h > > Excerpt from: > https://github.com/apache/flink-statefun/pull/32#discussion_r383644382 > Currently, you'd need to provide a complete {{Configuration}} as dynamic > properties when constructing a {{StatefulFunctionsAppContainers}}. > It'll be nicer if this is built like this: > {code} > public StatefulFunctionsAppContainers verificationApp = > new StatefulFunctionsAppContainers("sanity-verification", 2) > .withModuleGlobalConfiguration("kafka-broker", > kafka.getBootstrapServers()) > .withConfiguration(ConfigOption option, configValue) > {code} > And by default the {{StatefulFunctionsAppContainers}} just only has the > configs in the base template {{flink-conf.yaml}}. > This would require lazy construction of the containers on {{beforeTest}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing URL: https://github.com/apache/flink/pull/11213#discussion_r385532166 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -280,23 +281,8 @@ private ExecutionGraph createExecutionGraph( failoverStrategy); } - /** -* @deprecated Direct access to the execution graph by scheduler implementations is discouraged -* because currently the execution graph has various features and responsibilities that a -* scheduler should not be concerned about. The following specialized abstractions to the -* execution graph and accessors should be preferred over direct access: -* -* {@link #getSchedulingTopology()} -* {@link #getFailoverTopology()} -* {@link #getInputsLocationsRetriever()} -* {@link #getExecutionVertex(ExecutionVertexID)} -* {@link #getExecutionVertexId(ExecutionAttemptID)} -* {@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)} -* -* Currently, only {@link LegacyScheduler} requires direct access to the execution graph. -*/ - @Deprecated - protected ExecutionGraph getExecutionGraph() { + @VisibleForTesting + public ExecutionGraph getExecutionGraph() { Review comment: There would be quite a few tests to be reworked. - ExecutionGraphDeploymentTest - ExecutionGraphPartitionReleaseTest - ExecutionGraphVariousFailuesTest - ExecutionTest - ExecutionVertexCancelTest - ExecutionVertexInputConstraintTest - ExecutionVertexTest - ExecutionGraphNotEnoughResourceTest - ExecutionGraphCheckpointCoordinatorTest - ExecutionGraphColocationRestartTest - ExecutionGraphSuspendTest - FinalizeOnMasterTest Most of them does not need much effort to be based on the new scheduler if we exposes ExecutionGraph. It would make the work much easier since we only need to force scheduling related actions to be conducted via the scheduler. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w… URL: https://github.com/apache/flink/pull/11248#discussion_r385533680 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -176,9 +179,37 @@ public void teardown() throws Exception { } } + static class TestingNMClientAsync extends NMClientAsync { + + public List containerStatuses = new ArrayList<>(); + + protected TestingNMClientAsync(CallbackHandler callbackHandler) { + super(callbackHandler); + } + + @Override + public void startContainerAsync(Container container, ContainerLaunchContext containerLaunchContext) { + // Do nothing. + } + + @Override + public void stopContainerAsync(ContainerId containerId, NodeId nodeId) { + // Do nothing. + } + + @Override + public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) { + for (ContainerStatus containerStatus: containerStatuses) { + if (containerStatus.getContainerId().equals(containerId)) { + callbackHandler.onContainerStatusReceived(containerId, containerStatus); + } + } + } + } Review comment: Good point! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] JingsongLi commented on a change in pull request #11223: [FLINK-16281][Table SQL / Ecosystem] parameter 'maxRetryTimes' can not work in JDBCUpsertTableSink.
JingsongLi commented on a change in pull request #11223: [FLINK-16281][Table SQL / Ecosystem] parameter 'maxRetryTimes' can not work in JDBCUpsertTableSink. URL: https://github.com/apache/flink/pull/11223#discussion_r385532707 ## File path: flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/AppendOnlyWriter.java ## @@ -38,6 +40,7 @@ private final String insertSQL; private final int[] fieldTypes; + private transient List cachedRows; private transient PreparedStatement statement; Review comment: I mean you can add a unit test for `AppendOnlyWriter`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-15509) Use sql cilents create view occur Unexpected exception
[ https://issues.apache.org/jira/browse/FLINK-15509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jingsong Lee reassigned FLINK-15509: Assignee: Leonard Xu > Use sql cilents create view occur Unexpected exception > -- > > Key: FLINK-15509 > URL: https://issues.apache.org/jira/browse/FLINK-15509 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Client >Affects Versions: 1.10.0 >Reporter: Xianxun Ye >Assignee: Leonard Xu >Priority: Major > Fix For: 1.11.0 > > > version:master. > Firstly I created a table sucessful by sql clients, and then throw an > unexcepetd exp when created a view. > My steps: > Flink SQL> create table myTable (id int); > *[INFO] Table has been created.* > Flink SQL> show tables ; > myTable > Flink SQL> describe myTable ; > root > |-- id: INT > Flink SQL> create view myView as select * from myTable; > > Exception in thread "main" org.apache.flink.table.client.SqlClientException: > Unexpected exception. This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190) > Caused by: org.apache.flink.table.api.ValidationException: SQL validation > failed. findAndCreateTableSource failed. > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130) > at > org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105) > at > org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124) > at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.addView(LocalExecutor.java:300) > at > org.apache.flink.table.client.cli.CliClient.callCreateView(CliClient.java:579) > at > org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:308) > at java.util.Optional.ifPresent(Optional.java:159) > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178) > Caused by: org.apache.flink.table.api.TableException: > findAndCreateTableSource failed. > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55) > at > org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:138) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:97) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:86) > at java.util.Optional.map(Optional.java:215) > at > org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:76) > at > org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) > at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) > at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) > at > org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) > at > org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203) > at > org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105) > at > org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:965) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3125) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3107) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3379) > at > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:965) > at
[GitHub] [flink] zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing
zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing URL: https://github.com/apache/flink/pull/11213#discussion_r385532166 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java ## @@ -280,23 +281,8 @@ private ExecutionGraph createExecutionGraph( failoverStrategy); } - /** -* @deprecated Direct access to the execution graph by scheduler implementations is discouraged -* because currently the execution graph has various features and responsibilities that a -* scheduler should not be concerned about. The following specialized abstractions to the -* execution graph and accessors should be preferred over direct access: -* -* {@link #getSchedulingTopology()} -* {@link #getFailoverTopology()} -* {@link #getInputsLocationsRetriever()} -* {@link #getExecutionVertex(ExecutionVertexID)} -* {@link #getExecutionVertexId(ExecutionAttemptID)} -* {@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)} -* -* Currently, only {@link LegacyScheduler} requires direct access to the execution graph. -*/ - @Deprecated - protected ExecutionGraph getExecutionGraph() { + @VisibleForTesting + public ExecutionGraph getExecutionGraph() { Review comment: There would be quite a few tests to be reworked. - ExecutionGraphDeploymentTest - ExecutionGraphPartitionReleaseTest - ExecutionGraphVariousFailuesTest - ExecutionTest - ExecutionVertexCancelTest - ExecutionVertexInputConstraintTest - ExecutionVertexTest - ExecutionGraphNotEnoughResourceTest - ExecutionGraphCheckpointCoordinatorTest - ExecutionGraphColocationRestartTest - ExecutionGraphSuspendTest - FinalizeOnMasterTest Most of them does not need much effort to be based on the new scheduler. Exposing ExecutionGraph would make the work much easier since we only need to force scheduling related actions to be conducted via the scheduler. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250#issuecomment-592336212 ## CI report: * 912485a96d61febfa41b7c84631aeae19d819325 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150964833) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5718) * e474761e49927bb30bac6a297e992dc2ec98e01a UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w… URL: https://github.com/apache/flink/pull/11248#discussion_r385530896 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -528,6 +558,31 @@ public void testOnStartContainerError() throws Exception { }}; } + @Test + public void testStartWithContainerFromPreviousAttempt() throws Exception { + new Context() {{ + runTest(() -> { + Container runningCountainer = mockContainer("container", 1234, 1, Resource.newInstance(1024, 1)); + Container newContainer = mockContainer("container", 1234, 2, Resource.newInstance(1024, 1)); + resourceManager.getWorkerNodeMap().put(new ResourceID(runningCountainer.getId().toString()), new YarnWorkerNode(runningCountainer)); + resourceManager.getWorkerNodeMap().put(new ResourceID(newContainer.getId().toString()), new YarnWorkerNode(newContainer)); + testingNMClientAsync.containerStatuses.add( + ContainerStatus.newInstance(runningCountainer.getId(), ContainerState.RUNNING, "", 0)); + testingNMClientAsync.containerStatuses.add( + ContainerStatus.newInstance(newContainer.getId(), ContainerState.NEW, "", 0)); + + CompletableFuture requestContainerStatusFuture = resourceManager.runInMainThread(() -> { + testingNMClientAsync.getContainerStatusAsync(runningCountainer.getId(), runningCountainer.getNodeId()); + testingNMClientAsync.getContainerStatusAsync(newContainer.getId(), newContainer.getNodeId()); + return null; + }); Review comment: I think we can make `getContainersFromPreviousAttempts` visible for testing, and call it on the main thread with a custom `RegisterApplicationMasterResponse`. The purpose of this test case is to test `YarnResourceManager` properly handles recovered containers, including querying their status from Yarn NM and handles them according to the received status. Currently, only covers second part of the workflow. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w… URL: https://github.com/apache/flink/pull/11248#discussion_r385527439 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -454,6 +456,11 @@ public void onError(Throwable error) { onFatalError(error); } + @VisibleForTesting + Map getWorkerNodeMap() { + return workerNodeMap; Review comment: ```suggestion return Collections.unmodifiableMap(workerNodeMap); ``` We should always be careful exposing non-primitive fields, such as `Map` and `List`. Despite declared `final`, they might still be changed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w… URL: https://github.com/apache/flink/pull/11248#discussion_r385519116 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -464,7 +471,15 @@ public void onContainerStarted(ContainerId containerId, Map @Override public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) { - // We are not interested in getting container status + // We fetch the status of the container from the previous attempts. + if (containerStatus.getState() == ContainerState.NEW) { + // If the status is "NEW", it means that the container is allocated but not be started yet. + // We need to release it. + log.warn("The container {} from the previous attempt did not start. Released.", containerId); Review comment: ```suggestion log.info("Releasing container {} from the previous attempt. No TaskExecutor started inside.", containerId); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w… URL: https://github.com/apache/flink/pull/11248#discussion_r385519373 ## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ## @@ -464,7 +471,15 @@ public void onContainerStarted(ContainerId containerId, Map @Override public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) { - // We are not interested in getting container status + // We fetch the status of the container from the previous attempts. + if (containerStatus.getState() == ContainerState.NEW) { + // If the status is "NEW", it means that the container is allocated but not be started yet. + // We need to release it. + log.warn("The container {} from the previous attempt did not start. Released.", containerId); Review comment: I think a INFO level log message should be enough. This is not causing any problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w… URL: https://github.com/apache/flink/pull/11248#discussion_r385525529 ## File path: flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java ## @@ -176,9 +179,37 @@ public void teardown() throws Exception { } } + static class TestingNMClientAsync extends NMClientAsync { + + public List containerStatuses = new ArrayList<>(); + + protected TestingNMClientAsync(CallbackHandler callbackHandler) { + super(callbackHandler); + } + + @Override + public void startContainerAsync(Container container, ContainerLaunchContext containerLaunchContext) { + // Do nothing. + } + + @Override + public void stopContainerAsync(ContainerId containerId, NodeId nodeId) { + // Do nothing. + } + + @Override + public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) { + for (ContainerStatus containerStatus: containerStatuses) { + if (containerStatus.getContainerId().equals(containerId)) { + callbackHandler.onContainerStatusReceived(containerId, containerStatus); + } + } + } + } Review comment: Let's make this class more general by using `Function` and `Consumer` to define its behaviors. Something like the following, taking `getContainerStatusAsync` as an example. ``` static class TestingNMClientAsync extends NMClientAsync { // ... private final BiFunction getContainerStatusAsyncFunction; // ... public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) { callbackHandler.onContainerStatusReceived(containerId, getContainerStatusAsyncFunction.apply(containerId, NodeId)); } // ... } ``` We can use a builder class to create this class, allowing setting custom `Function` and `Consumer`. If the codes for this class grows too much, we can also put it in a separate file. There are several benefit for doing this. - It allows defining per-test-case behavior, which makes is easier to reuse this class in the future. - It avoids using `mock`, `spy` and `verify`. - It avoids having a public accessible `containerStatuses`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-15786) Load connector code with separate classloader
[ https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047229#comment-17047229 ] Guowei Ma edited comment on FLINK-15786 at 2/28/20 6:40 AM: Thanks [~pnowojski] for your detailed explanation. I agree with you that we could change the DataStream API to postpone the construction of the source/sink operator. This could avoid the user to use the Plugin mechanism directly to create a source/sink object at the client-side for separating the connector class loader from the user code classloader. What I concern about is that what the relationship between the FlinkUserClassLoader and the connector Classloader. In theory, there might be interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This interface is provided by the Kafka connector. And the user could implement it. The implemented class is also passed to the Kafka connector and would be constructed at runtime. One option for this scenario is to provide a classloader that knows both the user implemented FlinkKafkaPartitioner class and the Kafka connector class. We could call it XClassloader at first. Do you think that we need to resolve this scenario? Do you have any concerns about the XClassloader? A related open question is that the datastream users could also enjoy the benefits of classloader isolation without changing any code if we choose to use the XClassLoader solution. was (Author: maguowei): Thanks [~pnowojski] for your detailed explanation. I agree with you that we could change the DataStream API to postpone the construction of the source/sink operator. This could avoid the user to use the Plugin mechanism directly to create a source/sink object at the client-side for separating the connector class loader from the user code classloader. What I concern about is that what the relationship between the FlinkUserClassLoader and the connector Classloader. In theory, there might be interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This interface is provided by the Kafka connector. And the user could implement it. The implemented class is also passed to the Kafka connector and would be constructed at runtime. One option for this scenario is to provide a classloader that knows both the user implemented FlinkKafkaPartitioner class and the Kafka connector class. We could call it XClassloader at first. Do you think that we need to resolve this scenario? A related open question is that we might not need to change(or add) the DataStream API for separating the connector classloader if we choose to use the XClassloader. > Load connector code with separate classloader > - > > Key: FLINK-15786 > URL: https://issues.apache.org/jira/browse/FLINK-15786 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Guowei Ma >Priority: Major > Labels: usability > > Currently, connector code can be seen as part of user code. Usually, users > only need to add the corresponding connector as a dependency and package it > in the user jar. This is convenient enough. > However, connectors usually need to interact with external systems and often > introduce heavy dependencies, there is a high possibility of a class conflict > of different connectors or the user code of the same job. For example, every > one or two weeks, we will receive issue reports relevant with connector class > conflict from our users. The problem can get worse when users want to analyze > data from different sources and write output to different sinks. > Using separate classloader to load the different connector code could resolve > the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner
wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner URL: https://github.com/apache/flink/pull/11174#discussion_r385528122 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/NegativeCallGen.scala ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.codegen.calls + +import org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull +import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression} +import org.apache.flink.table.types.logical.{BooleanType, LogicalType} + +/** + * Inverts the boolean value of a [[CallGenerator]] result. + */ +class NegativeCallGen(callGenerator: CallGenerator) extends CallGenerator { + + override def generate( +ctx: CodeGeneratorContext, +operands: Seq[GeneratedExpression], +returnType: LogicalType + ): GeneratedExpression = { +assert(returnType.isInstanceOf[BooleanType]) + +val expr = callGenerator.generate(ctx, operands, returnType) +generateCallIfArgsNotNull(ctx, returnType, Seq(expr), returnType.isNullable) { + originalTerms => +assert(originalTerms.size == 1) + +s"!${originalTerms.head}" +} Review comment: Can simplify to ```scala ScalarOperatorGens.generateNot(ctx, callGenerator.generate(ctx, operands, returnType)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner
wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner URL: https://github.com/apache/flink/pull/11174#discussion_r385526426 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala ## @@ -4195,4 +4195,24 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f55=f57", "true") } + + @Test + def testIsJSONPredicates(): Unit = { Review comment: Let's move json tests to a new test class, e.g. `JsonFunctionsTest`. `ScalarFunctionsTest` is too large now. Btw, let's use the lower case, `testIsJsonPredicates`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-16142) Memory Leak causes Metaspace OOM error on repeated job submission
[ https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047253#comment-17047253 ] Arvid Heise commented on FLINK-16142: - [~sewen], are we using [URLClassLoader#close|https://docs.oracle.com/javase/8/docs/technotes/guides/net/ClassLoader.html]? > Memory Leak causes Metaspace OOM error on repeated job submission > - > > Key: FLINK-16142 > URL: https://issues.apache.org/jira/browse/FLINK-16142 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.10.0 >Reporter: Thomas Wozniakowski >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > Attachments: Leak-GC-root.png, java_pid1.hprof, java_pid1.hprof > > > Hi Guys, > We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our > use-case exactly (RocksDB state backend running in a containerised cluster). > Unfortunately, it seems like there is a memory leak somewhere in the job > submission logic. We are getting this error: > {code:java} > 2020-02-18 10:22:10,020 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME > switched from RUNNING to FAILED. > java.lang.OutOfMemoryError: Metaspace > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:757) > at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) > at java.net.URLClassLoader.access$100(URLClassLoader.java:74) > at java.net.URLClassLoader$1.run(URLClassLoader.java:369) > at java.net.URLClassLoader$1.run(URLClassLoader.java:363) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:362) > at java.lang.ClassLoader.loadClass(ClassLoader.java:419) > at > org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60) > at java.lang.ClassLoader.loadClass(ClassLoader.java:352) > at > org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27) > at > org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398) > at > org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.(AwsSdkMetrics.java:359) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:389) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:279) > at > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:686) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:287) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > {code} > (The only change in the above text is the OPERATOR_NAME text where I removed > some of the internal specifics of our system). > This will reliably happen on a fresh cluster after submitting and cancelling > our job 3 times. > We are using the presto-s3 plugin, the CEP library and the Kinesis connector. > Please let me know what other diagnostics would be useful. > Tom -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (FLINK-16142) Memory Leak causes Metaspace OOM error on repeated job submission
[ https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047253#comment-17047253 ] Arvid Heise edited comment on FLINK-16142 at 2/28/20 6:39 AM: -- [~sewen], are we using [URLClassLoader#close|https://docs.oracle.com/javase/8/docs/technotes/guides/net/ClassLoader.html]? Not a guarantee, but probably better than nothing. was (Author: arvid heise): [~sewen], are we using [URLClassLoader#close|https://docs.oracle.com/javase/8/docs/technotes/guides/net/ClassLoader.html]? > Memory Leak causes Metaspace OOM error on repeated job submission > - > > Key: FLINK-16142 > URL: https://issues.apache.org/jira/browse/FLINK-16142 > Project: Flink > Issue Type: Bug > Components: Client / Job Submission >Affects Versions: 1.10.0 >Reporter: Thomas Wozniakowski >Priority: Blocker > Fix For: 1.10.1, 1.11.0 > > Attachments: Leak-GC-root.png, java_pid1.hprof, java_pid1.hprof > > > Hi Guys, > We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our > use-case exactly (RocksDB state backend running in a containerised cluster). > Unfortunately, it seems like there is a memory leak somewhere in the job > submission logic. We are getting this error: > {code:java} > 2020-02-18 10:22:10,020 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME > switched from RUNNING to FAILED. > java.lang.OutOfMemoryError: Metaspace > at java.lang.ClassLoader.defineClass1(Native Method) > at java.lang.ClassLoader.defineClass(ClassLoader.java:757) > at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) > at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) > at java.net.URLClassLoader.access$100(URLClassLoader.java:74) > at java.net.URLClassLoader$1.run(URLClassLoader.java:369) > at java.net.URLClassLoader$1.run(URLClassLoader.java:363) > at java.security.AccessController.doPrivileged(Native Method) > at java.net.URLClassLoader.findClass(URLClassLoader.java:362) > at java.lang.ClassLoader.loadClass(ClassLoader.java:419) > at > org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60) > at java.lang.ClassLoader.loadClass(ClassLoader.java:352) > at > org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27) > at > org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398) > at > org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.(AwsSdkMetrics.java:359) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611) > at > org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534) > at > org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:389) > at > org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:279) > at > org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:686) > at > org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:287) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > {code} > (The only change in the above text is the OPERATOR_NAME text where I removed > some of the internal specifics of our system). > This will reliably happen on a fresh cluster after submitting and cancelling > our job 3 times. > We are using the presto-s3 plugin, the CEP library and the Kinesis connector. > Please let me know what other diagnostics would be useful. > Tom -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner
wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner URL: https://github.com/apache/flink/pull/11174#discussion_r385527809 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/NegativeCallGen.scala ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.codegen.calls + +import org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull +import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression} +import org.apache.flink.table.types.logical.{BooleanType, LogicalType} + +/** + * Inverts the boolean value of a [[CallGenerator]] result. + */ +class NegativeCallGen(callGenerator: CallGenerator) extends CallGenerator { Review comment: In SQL, it is called `NOT`. We can call it `NotCallGen`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner
wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner URL: https://github.com/apache/flink/pull/11174#discussion_r385529676 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala ## @@ -753,6 +753,54 @@ object FunctionGenerator { Seq(FLOAT, INTEGER), BuiltInMethods.TRUNCATE_FLOAT) + addSqlFunctionMethod( +IS_JSON_VALUE, +Seq(CHAR), Review comment: I think `VARCHAR` will be safer than `CHAR` here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner
wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner URL: https://github.com/apache/flink/pull/11174#discussion_r385529970 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala ## @@ -4195,4 +4195,24 @@ class ScalarFunctionsTest extends ScalarTypesTestBase { "f55=f57", "true") } + + @Test + def testIsJSONPredicates(): Unit = { +testSqlApi("'{}' is json value", "true") +testSqlApi("'{]' is json value", "false") +testSqlApi("'{}' is json object", "true") +testSqlApi("'[]' is json object", "false") +testSqlApi("'{}' is json array", "false") +testSqlApi("'[]' is json array", "true") +testSqlApi("'100' is json scalar", "true") +testSqlApi("'[]' is json scalar", "false") +testSqlApi("'{}' is not json value", "false") +testSqlApi("'{]' is not json value", "true") +testSqlApi("'{}' is not json object", "false") +testSqlApi("'[]' is not json object", "true") +testSqlApi("'{}' is not json array", "true") +testSqlApi("'[]' is not json array", "false") +testSqlApi("'100' is not json scalar", "false") +testSqlApi("'[]' is not json scalar", "true") Review comment: Could you add some tests on a column reference? Currently, all the testing data is char. Please also test on non-string columns. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
flinkbot commented on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250#issuecomment-592336212 ## CI report: * 912485a96d61febfa41b7c84631aeae19d819325 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
flinkbot commented on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250#issuecomment-592334762 Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community to review your pull request. We will use this comment to track the progress of the review. ## Automated Checks Last check on commit 912485a96d61febfa41b7c84631aeae19d819325 (Fri Feb 28 06:25:53 UTC 2020) **Warnings:** * No documentation files were touched! Remember to keep the Flink docs up to date! * **This pull request references an unassigned [Jira ticket](https://issues.apache.org/jira/browse/FLINK-16302).** According to the [code contribution guide](https://flink.apache.org/contributing/contribute-code.html), tickets need to be assigned before starting with the implementation work. Mention the bot in a comment to re-run the automated checks. ## Review Progress * ❓ 1. The [description] looks good. * ❓ 2. There is [consensus] that the contribution should go into to Flink. * ❓ 3. Needs [attention] from. * ❓ 4. The change fits into the overall [architecture]. * ❓ 5. Overall code [quality] is good. Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands The @flinkbot bot supports the following commands: - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`) - `@flinkbot approve all` to approve all aspects - `@flinkbot approve-until architecture` to approve everything until `architecture` - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention - `@flinkbot disapprove architecture` to remove an approval you gave earlier This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] jinglining commented on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
jinglining commented on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250#issuecomment-592334929 @flinkbot run travis This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16302) add log list and read log by name for taskmanager
[ https://issues.apache.org/jira/browse/FLINK-16302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-16302: --- Labels: pull-request-available (was: ) > add log list and read log by name for taskmanager > - > > Key: FLINK-16302 > URL: https://issues.apache.org/jira/browse/FLINK-16302 > Project: Flink > Issue Type: Sub-task > Components: Runtime / REST >Reporter: lining >Priority: Major > Labels: pull-request-available > > * list taskmanager all log file > ** /taskmanagers/taskmanagerid/logs > ** > {code:java} > { > "logs": [ > { > "name": "taskmanager.log", > "size": 12529 > } > ] > } {code} > * read taskmanager log file > ** /taskmanagers/log/[filename] > ** response: same as taskmanager’s log -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] jinglining opened a new pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager
jinglining opened a new pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager URL: https://github.com/apache/flink/pull/11250 ## What is the purpose of the change This pull request makes rest api could get log list and get log by name for taskmanager ## Brief change log - get log list - get log by name ## Verifying this change This change added tests and can be verified as follows: - Added TaskManagerLogsHandlerTest that verfied TaskManagerLogsHandler. - Updated AbstractTaskManagerFileHandlerTest could verfied TaskManagerCustomFileHandler. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (docs) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-15786) Load connector code with separate classloader
[ https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047229#comment-17047229 ] Guowei Ma edited comment on FLINK-15786 at 2/28/20 6:15 AM: Thanks [~pnowojski] for your detailed explanation. I agree with you that we could change the DataStream API to postpone the construction of the source/sink operator. This could avoid the user to use the Plugin mechanism directly to create a source/sink object at the client-side for separating the connector class loader from the user code classloader. What I concern about is that what the relationship between the FlinkUserClassLoader and the connector Classloader. In theory, there might be interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This interface is provided by the Kafka connector. And the user could implement it. The implemented class is also passed to the Kafka connector and would be constructed at runtime. One option for this scenario is to provide a classloader that knows both the user implemented FlinkKafkaPartitioner class and the Kafka connector class. We could call it XClassloader at first. Do you think that we need to resolve this scenario? A related open question is that we might not need to change(or add) the DataStream API for separating the connector classloader if we choose to use the XClassloader. was (Author: maguowei): Thanks [~pnowojski] for your detailed explanation. I agree with you that we could change the DataStream API to postpone the construction of the source/sink operator. This could avoid the user to use the Plugin mechanism directly to create a source/sink object at the client-side for separating the connector class loader from the user code classloader. What I concern about is that what the relationship between the FlinkUserClassLoader and the connector Classloader. In theory, there might be interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This interface provided by the Kafka connector that the user could implement it. The implemented class is passed to the Kafka connector and would be constructed at runtime. One option for this scenario is to provide a classloader that knows both the user implemented FlinkKafkaPartitioner class and the Kafka connector class. We could call it XClassloader at first. Do you think that we need to resolve this scenario? A related open question is that we might not need to change(or add) the DataStream API for separating the connector classloader if we choose to use the XClassloader. > Load connector code with separate classloader > - > > Key: FLINK-15786 > URL: https://issues.apache.org/jira/browse/FLINK-15786 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Guowei Ma >Priority: Major > Labels: usability > > Currently, connector code can be seen as part of user code. Usually, users > only need to add the corresponding connector as a dependency and package it > in the user jar. This is convenient enough. > However, connectors usually need to interact with external systems and often > introduce heavy dependencies, there is a high possibility of a class conflict > of different connectors or the user code of the same job. For example, every > one or two weeks, we will receive issue reports relevant with connector class > conflict from our users. The problem can get worse when users want to analyze > data from different sources and write output to different sinks. > Using separate classloader to load the different connector code could resolve > the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design
flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design URL: https://github.com/apache/flink/pull/11233#issuecomment-591843120 ## CI report: * fb57917227853d0477aa1383d399a619146d7170 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150793256) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5676) * 786a5056c957863c05ad24b00ca1dca032905eb0 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150962647) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5717) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution
flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution URL: https://github.com/apache/flink/pull/11208#issuecomment-590746668 ## CI report: * 31226c452d50eb216796a3c1b963f047bb2c5698 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150957094) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5716) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-15786) Load connector code with separate classloader
[ https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047229#comment-17047229 ] Guowei Ma commented on FLINK-15786: --- Thanks [~pnowojski] for your detailed explanation. I agree with you that we could change the DataStream API to postpone the construction of the source/sink operator. This could avoid the user to use the Plugin mechanism directly to create a source/sink object at the client-side for separating the connector class loader from the user code classloader. What I concern about is that what the relationship between the FlinkUserClassLoader and the connector Classloader. In theory, there might be interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This interface provided by the Kafka connector that the user could implement it. The implemented class is passed to the Kafka connector and would be constructed at runtime. One option for this scenario is to provide a classloader that knows both the user implemented FlinkKafkaPartitioner class and the Kafka connector class. We could call it XClassloader at first. Do you think that we need to resolve this scenario? A related open question is that we might not need to change(or add) the DataStream API for separating the connector classloader if we choose to use the XClassloader. > Load connector code with separate classloader > - > > Key: FLINK-15786 > URL: https://issues.apache.org/jira/browse/FLINK-15786 > Project: Flink > Issue Type: Improvement > Components: Runtime / Task >Reporter: Guowei Ma >Priority: Major > Labels: usability > > Currently, connector code can be seen as part of user code. Usually, users > only need to add the corresponding connector as a dependency and package it > in the user jar. This is convenient enough. > However, connectors usually need to interact with external systems and often > introduce heavy dependencies, there is a high possibility of a class conflict > of different connectors or the user code of the same job. For example, every > one or two weeks, we will receive issue reports relevant with connector class > conflict from our users. The problem can get worse when users want to analyze > data from different sources and write output to different sinks. > Using separate classloader to load the different connector code could resolve > the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16325) A connection check is required, and it needs to be reopened when the JDBC connection is interrupted
renjianxu created FLINK-16325: - Summary: A connection check is required, and it needs to be reopened when the JDBC connection is interrupted Key: FLINK-16325 URL: https://issues.apache.org/jira/browse/FLINK-16325 Project: Flink Issue Type: Improvement Components: Connectors / JDBC Affects Versions: 1.10.0 Reporter: renjianxu JDBCOutputFormat#writeRecord. When writing data, if the JDBC connection has been disconnected, the data will be lost.Therefore, a connectivity judgment is required in the writeRecord method. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385521240 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -81,11 +95,47 @@ }); } - public NettyMessageSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { this.testReadOnlyBuffer = testReadOnlyBuffer; this.testCompressedBuffer = testCompressedBuffer; } + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new NetworkBufferPool(10, 1024, 2); + BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + + inputGate = new SingleInputGateBuilder() + .setNumberOfChannels(1) + .setBufferPoolFactory(bufferPool) + .build(); + inputChannel = createRemoteInputChannel( + inputGate, + mock(PartitionRequestClient.class), + networkBufferPool); + inputGate.assignExclusiveSegments(); + inputChannel.requestSubpartition(0); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); Review comment: CreditBasedPartitionRequestClientHandler -> NetworkClientHandler This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385521292 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -81,11 +95,47 @@ }); } - public NettyMessageSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { this.testReadOnlyBuffer = testReadOnlyBuffer; this.testCompressedBuffer = testCompressedBuffer; } + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new NetworkBufferPool(10, 1024, 2); + BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + + inputGate = new SingleInputGateBuilder() + .setNumberOfChannels(1) + .setBufferPoolFactory(bufferPool) + .build(); + inputChannel = createRemoteInputChannel( + inputGate, + mock(PartitionRequestClient.class), + networkBufferPool); + inputGate.assignExclusiveSegments(); + inputChannel.requestSubpartition(0); + + CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler(); + handler.addInputChannel(inputChannel); + + channel = new EmbeddedChannel( + new NettyMessage.NettyMessageEncoder(), // outbound messages Review comment: for outbound messages This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385521186 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -81,11 +95,47 @@ }); } - public NettyMessageSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { this.testReadOnlyBuffer = testReadOnlyBuffer; this.testCompressedBuffer = testCompressedBuffer; } + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new NetworkBufferPool(10, 1024, 2); + BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + + inputGate = new SingleInputGateBuilder() + .setNumberOfChannels(1) + .setBufferPoolFactory(bufferPool) + .build(); + inputChannel = createRemoteInputChannel( + inputGate, + mock(PartitionRequestClient.class), + networkBufferPool); + inputGate.assignExclusiveSegments(); + inputChannel.requestSubpartition(0); Review comment: I guess this is not necessary because the input channel would be added into handler explicitly below This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385521035 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -81,11 +95,47 @@ }); } - public NettyMessageSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { this.testReadOnlyBuffer = testReadOnlyBuffer; this.testCompressedBuffer = testCompressedBuffer; } + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new NetworkBufferPool(10, 1024, 2); + BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + + inputGate = new SingleInputGateBuilder() + .setNumberOfChannels(1) + .setBufferPoolFactory(bufferPool) + .build(); + inputChannel = createRemoteInputChannel( + inputGate, + mock(PartitionRequestClient.class), Review comment: Mock is not suggested, so we can bypass this component which is actually not used in tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385520841 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -81,11 +95,47 @@ }); } - public NettyMessageSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { + public NettyMessageClientSideSerializationTest(boolean testReadOnlyBuffer, boolean testCompressedBuffer) { this.testReadOnlyBuffer = testReadOnlyBuffer; this.testCompressedBuffer = testCompressedBuffer; } + @Before + public void setup() throws IOException, InterruptedException { + networkBufferPool = new NetworkBufferPool(10, 1024, 2); + BufferPool bufferPool = networkBufferPool.createBufferPool(8, 8); + + inputGate = new SingleInputGateBuilder() + .setNumberOfChannels(1) Review comment: this can be removed, default is 1. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design
flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design URL: https://github.com/apache/flink/pull/11233#issuecomment-591843120 ## CI report: * fb57917227853d0477aa1383d399a619146d7170 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150793256) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5676) * 786a5056c957863c05ad24b00ca1dca032905eb0 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11249: [FLINK-16297] Remove unnecessary indents and blank lines in code blocks
flinkbot edited a comment on issue #11249: [FLINK-16297] Remove unnecessary indents and blank lines in code blocks URL: https://github.com/apache/flink/pull/11249#issuecomment-592302102 ## CI report: * fe25cef13c65c9f584b53aa7216347ee1b3e8f16 Travis: [SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150955168) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5715) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385518757 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -183,15 +192,18 @@ private void testEncodeDecodeBuffer(boolean testReadOnlyBuffer, boolean testComp } NettyMessage.BufferResponse expected = new NettyMessage.BufferResponse( - testBuffer, random.nextInt(), new InputChannelID(), random.nextInt()); - NettyMessage.BufferResponse actual = encodeAndDecode(expected); + testBuffer, random.nextInt(), inputChannel.getInputChannelId(), random.nextInt()); + NettyMessage.BufferResponse actual = encodeAndDecode(expected, channel); // Netty 4.1 is not copying the messages, but retaining slices of them. BufferResponse actual is in this case // holding a reference to the buffer. Buffer will be recycled only once "actual" will be released. - assertFalse(buffer.isRecycled()); - assertFalse(testBuffer.isRecycled()); + assertTrue(buffer.isRecycled()); Review comment: Why this assert is changed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385518513 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java ## @@ -183,15 +192,18 @@ private void testEncodeDecodeBuffer(boolean testReadOnlyBuffer, boolean testComp } NettyMessage.BufferResponse expected = new NettyMessage.BufferResponse( - testBuffer, random.nextInt(), new InputChannelID(), random.nextInt()); - NettyMessage.BufferResponse actual = encodeAndDecode(expected); + testBuffer, random.nextInt(), inputChannel.getInputChannelId(), random.nextInt()); + NettyMessage.BufferResponse actual = encodeAndDecode(expected, channel); // Netty 4.1 is not copying the messages, but retaining slices of them. BufferResponse actual is in this case Review comment: This comment is not invalid now? because we do not use previous `LengthFieldBasedFrameDecoder` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhengcanbin edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design
zhengcanbin edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design URL: https://github.com/apache/flink/pull/11233#issuecomment-592323880 > [0001-hotfix-Refactor-subclass-isXXX-to-inherit-methods.txt](https://github.com/apache/flink/files/4263005/0001-hotfix-Refactor-subclass-isXXX-to-inherit-methods.txt) > > @zhengcanbin here is a patch for code quality advice that you can take a look at. As we discuss previously it is often better to use override methods instead of control super classes' logic by subclasses' isXXX method. > > Generally changes looks good. I will double check the client side code and wait for response from @wangyang0918 and possibly @tillrohrmann . Hi, @TisonKun , thanks for the comment, a new commit [786a505](https://github.com/apache/flink/pull/11233/commits/786a5056c957863c05ad24b00ca1dca032905eb0) resolves the problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhengcanbin commented on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design
zhengcanbin commented on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design URL: https://github.com/apache/flink/pull/11233#issuecomment-592323880 > [0001-hotfix-Refactor-subclass-isXXX-to-inherit-methods.txt](https://github.com/apache/flink/files/4263005/0001-hotfix-Refactor-subclass-isXXX-to-inherit-methods.txt) > > @zhengcanbin here is a patch for code quality advice that you can take a look at. As we discuss previously it is often better to use override methods instead of control super classes' logic by subclasses' isXXX method. > > Generally changes looks good. I will double check the client side code and wait for response from @wangyang0918 and possibly @tillrohrmann . Hi, tison, thanks for the comment, a new commit [786a505](https://github.com/apache/flink/pull/11233/commits/786a5056c957863c05ad24b00ca1dca032905eb0) resolves the problem. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-16324) Checkpoint tasks progress would display 100% in web UI even not all tasks finished
[ https://issues.apache.org/jira/browse/FLINK-16324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Tang updated FLINK-16324: - Description: The checkpoint progress details would display 100% even not all tasks finished in web UI. e.g {{996/1000}} would display as {{100%}} . (was: The checkpoint progress details would display 100% even not all tasks finished in web UI. e.g {{96/100}} would display as {{100%}} .) > Checkpoint tasks progress would display 100% in web UI even not all tasks > finished > -- > > Key: FLINK-16324 > URL: https://issues.apache.org/jira/browse/FLINK-16324 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Reporter: Yun Tang >Priority: Major > Fix For: 1.10.1, 1.11.0 > > > The checkpoint progress details would display 100% even not all tasks > finished in web UI. e.g {{996/1000}} would display as {{100%}} . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11249: [FLINK-16297] Remove unnecessary indents and blank lines in code blocks
flinkbot edited a comment on issue #11249: [FLINK-16297] Remove unnecessary indents and blank lines in code blocks URL: https://github.com/apache/flink/pull/11249#issuecomment-592302102 ## CI report: * fe25cef13c65c9f584b53aa7216347ee1b3e8f16 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150955168) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5715) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385514691 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.event.task.IntegerTaskEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes. + */ +public class NettyMessageServerSideSerializationTest { + + private final EmbeddedChannel channel = new EmbeddedChannel( Review comment: I not sure sure whether we should close the close finally. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385514533 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.event.task.IntegerTaskEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes. + */ +public class NettyMessageServerSideSerializationTest { + + private final EmbeddedChannel channel = new EmbeddedChannel( + new NettyMessage.NettyMessageEncoder(), // outbound messages + new NettyMessage.NettyMessageDecoder()); // inbound messages + + private final Random random = new Random(); + + @Test + public void testEncodeDecode() { + { + NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(), random.nextInt(), new InputChannelID(), random.nextInt()); Review comment: nit: we should make every argument in separate line This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385514390 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.event.task.IntegerTaskEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes. + */ +public class NettyMessageServerSideSerializationTest { + + private final EmbeddedChannel channel = new EmbeddedChannel( + new NettyMessage.NettyMessageEncoder(), // outbound messages + new NettyMessage.NettyMessageDecoder()); // inbound messages + + private final Random random = new Random(); + + @Test + public void testEncodeDecode() { + { + NettyMessage.PartitionRequest expected = new NettyMessage.PartitionRequest(new ResultPartitionID(), random.nextInt(), new InputChannelID(), random.nextInt()); Review comment: I prefer to making every following message as a separate test, then every test is short and easy to maintain. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385513879 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java ## @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.event.task.IntegerTaskEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferCompressor; +import org.apache.flink.runtime.io.network.buffer.BufferDecompressor; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode; +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; + +/** + * Tests for the serialization and deserialization of the various {@link NettyMessage} sub-classes. + */ +public class NettyMessageServerSideSerializationTest { + + private final EmbeddedChannel channel = new EmbeddedChannel( + new NettyMessage.NettyMessageEncoder(), // outbound messages Review comment: outbound messages -> for outbound messages This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-16324) Checkpoint tasks progress would display 100% in web UI even not all tasks finished
Yun Tang created FLINK-16324: Summary: Checkpoint tasks progress would display 100% in web UI even not all tasks finished Key: FLINK-16324 URL: https://issues.apache.org/jira/browse/FLINK-16324 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Reporter: Yun Tang Fix For: 1.10.1, 1.11.0 The checkpoint progress details would display 100% even not all tasks finished in web UI. e.g {{96/100}} would display as {{100%}} . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-16324) Checkpoint tasks progress would display 100% in web UI even not all tasks finished
[ https://issues.apache.org/jira/browse/FLINK-16324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047216#comment-17047216 ] Yun Tang commented on FLINK-16324: -- CC [~vthinkxie] > Checkpoint tasks progress would display 100% in web UI even not all tasks > finished > -- > > Key: FLINK-16324 > URL: https://issues.apache.org/jira/browse/FLINK-16324 > Project: Flink > Issue Type: Bug > Components: Runtime / Web Frontend >Reporter: Yun Tang >Priority: Major > Fix For: 1.10.1, 1.11.0 > > > The checkpoint progress details would display 100% even not all tasks > finished in web UI. e.g {{96/100}} would display as {{100%}} . -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-15400) elasticsearch sink support dynamic index.
[ https://issues.apache.org/jira/browse/FLINK-15400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047214#comment-17047214 ] Jark Wu commented on FLINK-15400: - Assigned to [~Leonard Xu]. Hi [~ouyangwuli] you can help to review the PR opened by Leonard to see whether it meets your requirements, if you have time. > elasticsearch sink support dynamic index. > - > > Key: FLINK-15400 > URL: https://issues.apache.org/jira/browse/FLINK-15400 > Project: Flink > Issue Type: New Feature > Components: Connectors / ElasticSearch, Table SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: ouyangwulin >Assignee: Leonard Xu >Priority: Major > Labels: usability > Fix For: 1.11.0 > > > From > user...@flink.apache.org([https://lists.apache.org/thread.html/ac4e0c068baeb3b070f0213a2e1314e6552b226b8132a4c49d667ecd%40%3Cuser-zh.flink.apache.org%3E]), > Becuase the es 6/7 not support ttl. so User need clean the index by > timestamp. Add dynamic index is a useful function. Add with properties > 'dynamicIndex' as a switch for open dynamicIndex. Add with properties > 'indexField' for the extract time field, Add properties 'indexInterval' for > change cycle mode. > > ||With property||discribe||default||Required|| > |dynamicIndex|Dynamic or not|false(true/false)|false| > |indexField|extract index field| none|dynamicIndex is true , then indexField > is required,only supported type "timestamp","date","long" | > |indexInterval|mode for cycle|d|ddynamicIndex is true , this field is > required ,the value optional is: > d:day > m:mouth > w:week| > > After discussion, the final design looks as following : > {code:java} > CREATE TABLE es_sink_table ( > log_source varchar , > log_content varchar , > log_level bigint , > log_ts timestamp, > ) WITH ( > 'connector.type' = 'elasticsearch', > 'connector.version' = '6', > 'connector.index'='my-log-{log_ts|-MM-dd}', > # elasticsearch index name, Flink support create index based on field at > # runtime dynamically, the index value comes from the dynamicIndex > # pattern when the field type is varchar, eg:'my-log-{log_source}',the # > dynamicIndex pattern support format and parse date by Java > # SimpleDataFormat when the field type is timestamp/date, > # eg:'my-log-{log_ts|-MM-dd}'. > 'connector.index-alias'='my-log', > # index alias name, the alias name mapping to all indies that > # creatd from 'connector.index'. > … > ) > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (FLINK-16170) SearchTemplateRequest ClassNotFoundException when use flink-sql-connector-elasticsearch7
[ https://issues.apache.org/jira/browse/FLINK-16170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-16170: --- Assignee: Leonard Xu > SearchTemplateRequest ClassNotFoundException when use > flink-sql-connector-elasticsearch7 > > > Key: FLINK-16170 > URL: https://issues.apache.org/jira/browse/FLINK-16170 > Project: Flink > Issue Type: Bug > Components: Connectors / ElasticSearch >Reporter: Jark Wu >Assignee: Leonard Xu >Priority: Blocker > Fix For: 1.10.1 > > > When run SQL CLI with elasticsearch7, when running a query insert into > elasticsearch, a > SearchTemplateRequest ClassNotFoundException will be thrown. > {code:java} > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/elasticsearch7/shaded/org/elasticsearch/script/mustache/SearchTemplateRequest > at > org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:76) > at > org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:48) > at > org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) > at >
[jira] [Assigned] (FLINK-15400) elasticsearch sink support dynamic index.
[ https://issues.apache.org/jira/browse/FLINK-15400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu reassigned FLINK-15400: --- Assignee: Leonard Xu > elasticsearch sink support dynamic index. > - > > Key: FLINK-15400 > URL: https://issues.apache.org/jira/browse/FLINK-15400 > Project: Flink > Issue Type: New Feature > Components: Connectors / ElasticSearch, Table SQL / Ecosystem >Affects Versions: 1.11.0 >Reporter: ouyangwulin >Assignee: Leonard Xu >Priority: Major > Labels: usability > Fix For: 1.11.0 > > > From > user...@flink.apache.org([https://lists.apache.org/thread.html/ac4e0c068baeb3b070f0213a2e1314e6552b226b8132a4c49d667ecd%40%3Cuser-zh.flink.apache.org%3E]), > Becuase the es 6/7 not support ttl. so User need clean the index by > timestamp. Add dynamic index is a useful function. Add with properties > 'dynamicIndex' as a switch for open dynamicIndex. Add with properties > 'indexField' for the extract time field, Add properties 'indexInterval' for > change cycle mode. > > ||With property||discribe||default||Required|| > |dynamicIndex|Dynamic or not|false(true/false)|false| > |indexField|extract index field| none|dynamicIndex is true , then indexField > is required,only supported type "timestamp","date","long" | > |indexInterval|mode for cycle|d|ddynamicIndex is true , this field is > required ,the value optional is: > d:day > m:mouth > w:week| > > After discussion, the final design looks as following : > {code:java} > CREATE TABLE es_sink_table ( > log_source varchar , > log_content varchar , > log_level bigint , > log_ts timestamp, > ) WITH ( > 'connector.type' = 'elasticsearch', > 'connector.version' = '6', > 'connector.index'='my-log-{log_ts|-MM-dd}', > # elasticsearch index name, Flink support create index based on field at > # runtime dynamically, the index value comes from the dynamicIndex > # pattern when the field type is varchar, eg:'my-log-{log_source}',the # > dynamicIndex pattern support format and parse date by Java > # SimpleDataFormat when the field type is timestamp/date, > # eg:'my-log-{log_ts|-MM-dd}'. > 'connector.index-alias'='my-log', > # index alias name, the alias name mapping to all indies that > # creatd from 'connector.index'. > … > ) > {code} > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (FLINK-16323) Support to join a static table in streaming mode
[ https://issues.apache.org/jira/browse/FLINK-16323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu updated FLINK-16323: Fix Version/s: 1.11.0 > Support to join a static table in streaming mode > > > Key: FLINK-16323 > URL: https://issues.apache.org/jira/browse/FLINK-16323 > Project: Flink > Issue Type: New Feature > Components: Table SQL / API, Table SQL / Planner >Reporter: Jark Wu >Priority: Major > Fix For: 1.11.0 > > > Currently, we already support to join a stream and a bounded stream using > reguar join. However, this will be tranlsated into stream-stream join which > is not efficient, because it will output early results and maybe retracted > afterwards. > A better and native support will be using a special temporal join operator > which will block the streaming side until all the static table data is > loaded. > This can help users to join a huge table, e.g. MySQL table with billion > records but changes little. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16323) Support to join a static table in streaming mode
Jark Wu created FLINK-16323: --- Summary: Support to join a static table in streaming mode Key: FLINK-16323 URL: https://issues.apache.org/jira/browse/FLINK-16323 Project: Flink Issue Type: New Feature Components: Table SQL / API, Table SQL / Planner Reporter: Jark Wu Currently, we already support to join a stream and a bounded stream using reguar join. However, this will be tranlsated into stream-stream join which is not efficient, because it will output early results and maybe retracted afterwards. A better and native support will be using a special temporal join operator which will block the streaming side until all the static table data is loaded. This can help users to join a huge table, e.g. MySQL table with billion records but changes little. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w… URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068 ## CI report: * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385507440 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java ## @@ -120,10 +120,14 @@ * @return channel handlers */ public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[] { - messageEncoder, - new NettyMessage.NettyMessageDecoder(), - new CreditBasedPartitionRequestClientHandler()}; + CreditBasedPartitionRequestClientHandler networkClientHandler = new CreditBasedPartitionRequestClientHandler(); + NettyMessageClientDecoderDelegate nettyMessageClientDecoderDelegate + = new NettyMessageClientDecoderDelegate(networkClientHandler); + + return new ChannelHandler[] { + messageEncoder, + nettyMessageClientDecoderDelegate, Review comment: nit: `new NettyMessageClientDecoderDelegate(networkClientHandler)` instead, avoid above too long definition. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385507196 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java ## @@ -120,10 +120,14 @@ * @return channel handlers */ public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[] { - messageEncoder, - new NettyMessage.NettyMessageDecoder(), - new CreditBasedPartitionRequestClientHandler()}; + CreditBasedPartitionRequestClientHandler networkClientHandler = new CreditBasedPartitionRequestClientHandler(); Review comment: CreditBasedPartitionRequestClientHandler ->NetworkClientHandler This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385507240 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java ## @@ -120,10 +120,14 @@ * @return channel handlers */ public ChannelHandler[] getClientChannelHandlers() { - return new ChannelHandler[] { - messageEncoder, - new NettyMessage.NettyMessageDecoder(), - new CreditBasedPartitionRequestClientHandler()}; + CreditBasedPartitionRequestClientHandler networkClientHandler = new CreditBasedPartitionRequestClientHandler(); + NettyMessageClientDecoderDelegate nettyMessageClientDecoderDelegate + = new NettyMessageClientDecoderDelegate(networkClientHandler); + + return new ChannelHandler[] { + messageEncoder, Review comment: unify indentation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution
flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution URL: https://github.com/apache/flink/pull/11208#issuecomment-590746668 ## CI report: * 2457edcc5d060582b0033741320daa63dff68b0e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150947684) * 31226c452d50eb216796a3c1b963f047bb2c5698 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150957094) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution
flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution URL: https://github.com/apache/flink/pull/11208#issuecomment-590746668 ## CI report: * 2457edcc5d060582b0033741320daa63dff68b0e Travis: [FAILURE](https://travis-ci.com/flink-ci/flink/builds/150947684) * 31226c452d50eb216796a3c1b963f047bb2c5698 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11247: [FLINK-16262][Connectors] Set the context classloader for parallel stream in FlinkKafkaProducer
flinkbot edited a comment on issue #11247: [FLINK-16262][Connectors] Set the context classloader for parallel stream in FlinkKafkaProducer URL: https://github.com/apache/flink/pull/11247#issuecomment-592026056 ## CI report: * ff5293a8f39aaef7a457e9720f7174e1982a928d UNKNOWN * 4fd63bfdc08cba777c9f476de7b0f7cbeddf294f Travis: [CANCELED](https://travis-ci.com/flink-ci/flink/builds/150948725) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5712) * 2843e7da6dacf7869b570049a769826e4a4673be UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w… URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068 ## CI report: * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385502888 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate inputGate, NetworkBufferPool /** * Returns a deserialized buffer message as it would be received during runtime. */ - private static BufferResponse createBufferResponse( + private BufferResponse createBufferResponse( Buffer buffer, int sequenceNumber, - InputChannelID receivingChannelId, - int backlog) throws IOException { + RemoteInputChannel receivingChannel, + int backlog, + CreditBasedPartitionRequestClientHandler clientHandler) throws IOException { + // Mock buffer to serialize - BufferResponse resp = new BufferResponse(buffer, sequenceNumber, receivingChannelId, backlog); + BufferResponse resp = new BufferResponse( + buffer, + sequenceNumber, + receivingChannel.getInputChannelId(), + backlog); ByteBuf serialized = resp.write(UnpooledByteBufAllocator.DEFAULT); // Skip general header bytes serialized.readBytes(NettyMessage.FRAME_HEADER_LENGTH); + // Deserialize the bytes again. We have to go this way, because we only partly deserialize // the header of the response and wait for a buffer from the buffer pool to copy the payload // data into. - BufferResponse deserialized = BufferResponse.readFrom(serialized); + NetworkBufferAllocator allocator = new NetworkBufferAllocator(clientHandler); Review comment: It seems better to pass `NetworkBufferAllocator` in the argument. Assuming we create multiple `BufferResponse` in one test, it is not necessary to create allocator for every `BufferResponse`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385502524 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate inputGate, NetworkBufferPool /** * Returns a deserialized buffer message as it would be received during runtime. */ - private static BufferResponse createBufferResponse( + private BufferResponse createBufferResponse( Buffer buffer, int sequenceNumber, - InputChannelID receivingChannelId, - int backlog) throws IOException { + RemoteInputChannel receivingChannel, + int backlog, + CreditBasedPartitionRequestClientHandler clientHandler) throws IOException { + // Mock buffer to serialize - BufferResponse resp = new BufferResponse(buffer, sequenceNumber, receivingChannelId, backlog); + BufferResponse resp = new BufferResponse( + buffer, + sequenceNumber, + receivingChannel.getInputChannelId(), + backlog); ByteBuf serialized = resp.write(UnpooledByteBufAllocator.DEFAULT); // Skip general header bytes serialized.readBytes(NettyMessage.FRAME_HEADER_LENGTH); + // Deserialize the bytes again. We have to go this way, because we only partly deserialize // the header of the response and wait for a buffer from the buffer pool to copy the payload // data into. - BufferResponse deserialized = BufferResponse.readFrom(serialized); + NetworkBufferAllocator allocator = new NetworkBufferAllocator(clientHandler); + BufferResponse deserialized = BufferResponse.readFrom(serialized, allocator); + + if (deserialized.getBuffer() != null) { Review comment: I guess it is not necessary to write bytes into buffer, because we never read and verify the respective buffer in tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385501156 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate inputGate, NetworkBufferPool /** * Returns a deserialized buffer message as it would be received during runtime. */ - private static BufferResponse createBufferResponse( + private BufferResponse createBufferResponse( Buffer buffer, int sequenceNumber, - InputChannelID receivingChannelId, - int backlog) throws IOException { + RemoteInputChannel receivingChannel, + int backlog, + CreditBasedPartitionRequestClientHandler clientHandler) throws IOException { + // Mock buffer to serialize - BufferResponse resp = new BufferResponse(buffer, sequenceNumber, receivingChannelId, backlog); + BufferResponse resp = new BufferResponse( Review comment: not necessary change if still use previous `receivingChannelId` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode URL: https://github.com/apache/flink/pull/7368#discussion_r385500959 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java ## @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate inputGate, NetworkBufferPool /** * Returns a deserialized buffer message as it would be received during runtime. */ - private static BufferResponse createBufferResponse( + private BufferResponse createBufferResponse( Buffer buffer, int sequenceNumber, - InputChannelID receivingChannelId, - int backlog) throws IOException { + RemoteInputChannel receivingChannel, Review comment: It is better to still use previous `receivingChannelId` instead, because we actually only need it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w… URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068 ## CI report: * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: [PENDING](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714) Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run travis` re-run the last Travis build - `@flinkbot run azure` re-run the last Azure build This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services