[GitHub] [flink] flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and 
read log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#issuecomment-592336212
 
 
   
   ## CI report:
   
   * 912485a96d61febfa41b7c84631aeae19d819325 Travis: 
[CANCELED](https://travis-ci.com/flink-ci/flink/builds/150964833) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5718)
 
   * e474761e49927bb30bac6a297e992dc2ec98e01a UNKNOWN
   * b8d51c94a0b93fdbfa4b167e0b4c630f791fba10 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution

2020-02-27 Thread GitBox
hequn8128 commented on a change in pull request #11208: [FLINK-16271][python] 
Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution
URL: https://github.com/apache/flink/pull/11208#discussion_r385552486
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/arrow/ArrowPythonScalarFunctionOperator.java
 ##
 @@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.python.scalar.arrow;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.python.PythonFunctionRunner;
+import org.apache.flink.python.env.PythonEnvironmentManager;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.python.PythonFunctionInfo;
+import org.apache.flink.table.runtime.arrow.ArrowReader;
+import org.apache.flink.table.runtime.arrow.ArrowUtils;
+import 
org.apache.flink.table.runtime.operators.python.scalar.AbstractRowPythonScalarFunctionOperator;
+import 
org.apache.flink.table.runtime.runners.python.scalar.arrow.ArrowPythonScalarFunctionRunner;
+import org.apache.flink.table.runtime.types.CRow;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+
+import java.io.IOException;
+
+/**
+ * Arrow Python {@link ScalarFunction} operator for the old planner.
+ */
+@Internal
+public class ArrowPythonScalarFunctionOperator extends 
AbstractRowPythonScalarFunctionOperator {
 
 Review comment:
   ok, let's keep it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16245) Use a delegating classloader as the user code classloader to prevent class leaks.

2020-02-27 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047288#comment-17047288
 ] 

Guowei Ma commented on FLINK-16245:
---

This means that there would still be some object leak, but the cost will be 
much smaller because only one delegate ClassLoader might be leaked if the user 
does not clean up the context class loader.

> Use a delegating classloader as the user code classloader to prevent class 
> leaks.
> -
>
> Key: FLINK-16245
> URL: https://issues.apache.org/jira/browse/FLINK-16245
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Stephan Ewen
>Priority: Critical
>  Labels: usability
> Fix For: 1.11.0
>
>
> As reported in FLINK-11205, a reference to the user-code ClassLoader can be 
> held by some libraries, causing class leaks.
> One way to circumvent this class leak is if the ClassLoader that we set as 
> the user-code ClassLoader is a delegating ClassLoader to the real class 
> loader, and when closing the user code ClassLoader we null out the reference.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] hequn8128 commented on a change in pull request #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams

2020-02-27 Thread GitBox
hequn8128 commented on a change in pull request #11220: 
[FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams
URL: https://github.com/apache/flink/pull/11220#discussion_r385546143
 
 

 ##
 File path: flink-python/setup.py
 ##
 @@ -224,7 +224,7 @@ def remove_if_exists(file_path):
 author_email='d...@flink.apache.org',
 python_requires='>=3.5',
 install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 
'apache-beam==2.19.0',
-  'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1'],
+  'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 
'jsonpickle==1.2'],
 
 Review comment:
   Since we only introduce jsonpickle now and the dependency is rather small, 
it does not hurt non-ML users while brings convenience for ML users. I find 
more advantages here.
   If we add NumPy support for ML, we can add all these dependencies into the 
optional at that time. 
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams

2020-02-27 Thread GitBox
hequn8128 commented on a change in pull request #11220: 
[FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams
URL: https://github.com/apache/flink/pull/11220#discussion_r385546143
 
 

 ##
 File path: flink-python/setup.py
 ##
 @@ -224,7 +224,7 @@ def remove_if_exists(file_path):
 author_email='d...@flink.apache.org',
 python_requires='>=3.5',
 install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 
'apache-beam==2.19.0',
-  'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1'],
+  'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 
'jsonpickle==1.2'],
 
 Review comment:
   Since we only introduce jsonpickle now and the dependency is rather small, 
it does not hurt non-ML users while brings convenience for ML users. I find 
more advantages here.
   If we add NumPy support for ML, we can add all these dependencies into the 
optional at that time. What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk edited a comment on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId

2020-02-27 Thread GitBox
zhuzhurk edited a comment on issue #11148: [FLINK-16180][runtime] Replace the 
nullable vertexExecution in ScheduledUnit with a non-null executionVertexId
URL: https://github.com/apache/flink/pull/11148#issuecomment-592390176
 
 
   >> The problem is that we are unable to manipulate the state of the 
ExecutionVertex easily such that getPreferredLocations() returns the desired 
result
   
   That's true and subclassing is just to solve this kind of problems. From the 
references you list, I find most people also think so. It's not perfect but 
better than no way. 
   
   Another problem of `SchedulerTestUtils` which currently uses mockito is that 
it only mocks what are needed by existing tests. So future tests may encounter 
NPE issue again when it uses these utils, e.g. `getResourceProfile()`. 
   
   If you think it's better to have a minimal fix (i.e. using mockito to mock 
`ExecutionVertex#getID()`) first to unblock higher priority tasks, we can do it 
first and discuss #11230 later.  Or check whether  there is a better way to 
solve the problem of `SchedulerTestUtils`.
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on issue #11148: [FLINK-16180][runtime] Replace the nullable vertexExecution in ScheduledUnit with a non-null executionVertexId

2020-02-27 Thread GitBox
zhuzhurk commented on issue #11148: [FLINK-16180][runtime] Replace the nullable 
vertexExecution in ScheduledUnit with a non-null executionVertexId
URL: https://github.com/apache/flink/pull/11148#issuecomment-592390176
 
 
   >> The problem is that we are unable to manipulate the state of the 
ExecutionVertex easily such that getPreferredLocations() returns the desired 
result
   
   That's true and subclassing is just to solve this kind of problems. From the 
references you list, I find most people also think so. It's not perfect but 
better than no way. And Flink already uses it for the testing of `SlotPoolImpl` 
with `TestingSlotPoolImpl`.
   
   Another problem of `SchedulerTestUtils` which currently uses mockito is that 
it only mocks what are needed by existing tests. So future tests may encounter 
NPE issue again when it uses these utils, e.g. `getResourceProfile()`. 
   
   If you think it's better to have a minimal fix (i.e. using mockito to mock 
`ExecutionVertex#getID()`) first to unblock higher priority tasks, we can do it 
first and discuss #11230 later. 
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 edited a comment on issue #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams

2020-02-27 Thread GitBox
hequn8128 edited a comment on issue #11220: [FLINK-16249][python][ml] Add 
interfaces for Params, ParamInfo and WithParams
URL: https://github.com/apache/flink/pull/11220#issuecomment-591807495
 
 
   > @hequn8128 Thanks for the PR.
   > 
   > 1. I noticed that there are two modules "ml" and "mllib" introduced. I'm 
wondering if one module such as ml is enough?

   @dianfu This sounds like a good idea. We can add api and lib under the 
`pyflink.ml` package, i.e., `pyflink.ml.api` and `pyflink.ml.lib`. The package 
directly under the pyflink should be an entire module, somehow consistent with 
`pyflink.table`. 
   
   @becketqin @walterddr  What's your opinion on this. 
   If we reach a consensus on this, we may need an extra vote since it is 
different from the FLIP wiki page. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and 
read log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#issuecomment-592336212
 
 
   
   ## CI report:
   
   * 912485a96d61febfa41b7c84631aeae19d819325 Travis: 
[CANCELED](https://travis-ci.com/flink-ci/flink/builds/150964833) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5718)
 
   * e474761e49927bb30bac6a297e992dc2ec98e01a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11213: [FLINK-16276][tests] Introduce a 
builder and factory methods to create DefaultScheduler for testing
URL: https://github.com/apache/flink/pull/11213#issuecomment-590853060
 
 
   
   ## CI report:
   
   * b31d7fab8da81827ca063075427a3737377bf0ed UNKNOWN
   * cade94bacbaa7fa76c0a0d013eb069406d01b6ad Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150784419) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5667)
 
   * d557df5126a0a4f2404ac08ccea25000e8495e25 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150968498) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5719)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #11220: [FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams

2020-02-27 Thread GitBox
hequn8128 commented on a change in pull request #11220: 
[FLINK-16249][python][ml] Add interfaces for Params, ParamInfo and WithParams
URL: https://github.com/apache/flink/pull/11220#discussion_r385546143
 
 

 ##
 File path: flink-python/setup.py
 ##
 @@ -224,7 +224,7 @@ def remove_if_exists(file_path):
 author_email='d...@flink.apache.org',
 python_requires='>=3.5',
 install_requires=['py4j==0.10.8.1', 'python-dateutil==2.8.0', 
'apache-beam==2.19.0',
-  'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1'],
+  'cloudpickle==1.2.2', 'avro-python3>=1.8.1,<=1.9.1', 
'jsonpickle==1.2'],
 
 Review comment:
   Since we only introduce jsonpickle now and the dependency is rather small, 
it does not hurt non-ML users while brings convenience for ML users. I find 
more advantages here.
   If we add NumPy support for ML, we can add all these dependencies into the 
optional at that time. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the 
Kubernetes decorator design
URL: https://github.com/apache/flink/pull/11233#issuecomment-591843120
 
 
   
   ## CI report:
   
   * 786a5056c957863c05ad24b00ca1dca032905eb0 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/150962647) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5717)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11236: [FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do not match when generic or POJO type is requested

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11236: 
[FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do 
not match when generic or POJO type is requested
URL: https://github.com/apache/flink/pull/11236#issuecomment-591902160
 
 
   
   ## CI report:
   
   * fd488f5cf0bf428170cb72cf434e01780cac8ef7 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150821616) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5689)
 
   * c2e5a4fb73a7b2aca22e914e57b36bc3128bb966 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385541960
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the methods in {@link ByteBufUtils}.
+ */
+public class ByteBufUtilsTest {
+
+   @Test
+   public void testAccumulateWithoutCopy() {
+   final int sourceLength = 128;
+   final int sourceStartPosition = 32;
+   final int expectedAccumulationSize = 16;
+
+   ByteBuf src = createSourceBuffer(sourceLength, 
sourceStartPosition);
+
+   ByteBuf target = Unpooled.buffer(expectedAccumulationSize);
+
+   // If src has enough data and no data has been copied yet, src 
will be returned without modification.
+   ByteBuf accumulated = ByteBufUtils.accumulate(target, src, 
expectedAccumulationSize, target.readableBytes());
+
+   assertSame(src, accumulated);
+   assertEquals(sourceStartPosition, src.readerIndex());
+
+   verifyBufferContent(src, sourceStartPosition, sourceLength - 
sourceStartPosition, sourceStartPosition);
+   }
+
+   @Test
+   public void testAccumulateWithCopy() {
+   final int firstSourceLength = 128;
+   final int firstSourceStartPosition = 32;
+   final int secondSourceLength = 64;
+   final int secondSourceStartPosition = 0;
+   final int expectedAccumulationSize = 128;
+
+   final int firstCopyLength = firstSourceLength - 
firstSourceStartPosition;
+   final int secondCopyLength = expectedAccumulationSize - 
firstCopyLength;
+
+   ByteBuf firstSource = createSourceBuffer(firstSourceLength, 
firstSourceStartPosition);
+   ByteBuf secondSource = createSourceBuffer(secondSourceLength, 
secondSourceStartPosition);
+
+   ByteBuf target = Unpooled.buffer(expectedAccumulationSize);
+
+   // If src does not have enough data, src will be copied into 
target and null will be returned.
+   ByteBuf accumulated = ByteBufUtils.accumulate(
+   target,
+   firstSource,
+   expectedAccumulationSize,
+   target.readableBytes());
+   assertNull(accumulated);
+   assertEquals(firstSourceLength, firstSource.readerIndex());
+   assertEquals(firstCopyLength, target.readableBytes());
+
+   // The remaining data will be copied from the second buffer, 
and the target buffer will be returned
+   // after all data is accumulated.
+   accumulated = ByteBufUtils.accumulate(
+   target,
+   secondSource,
+   expectedAccumulationSize,
+   target.readableBytes());
+   assertSame(target, accumulated);
+   assertEquals(secondSourceStartPosition + secondCopyLength, 
secondSource.readerIndex());
+   assertEquals(expectedAccumulationSize, target.readableBytes());
+
+   verifyBufferContent(accumulated, 0, firstCopyLength, 
firstSourceStartPosition);
+   verifyBufferContent(accumulated, firstCopyLength, 
secondCopyLength, secondSourceStartPosition);
+   }
+
+   private ByteBuf createSourceBuffer(int size, int readerIndex) {
+   ByteBuf buf = Unpooled.buffer(size);
+   for (int i = 0; i < size; ++i) {
+   buf.writeByte((byte) i);
+   }
+
+   buf.readerIndex(readerIndex);
+
+ 

[GitHub] [flink] flinkbot edited a comment on issue #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11213: [FLINK-16276][tests] Introduce a 
builder and factory methods to create DefaultScheduler for testing
URL: https://github.com/apache/flink/pull/11213#issuecomment-590853060
 
 
   
   ## CI report:
   
   * b31d7fab8da81827ca063075427a3737377bf0ed UNKNOWN
   * cade94bacbaa7fa76c0a0d013eb069406d01b6ad Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150784419) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5667)
 
   * d557df5126a0a4f2404ac08ccea25000e8495e25 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11236: [FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do not match when generic or POJO type is requested

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11236: 
[FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do 
not match when generic or POJO type is requested
URL: https://github.com/apache/flink/pull/11236#issuecomment-591902160
 
 
   
   ## CI report:
   
   * fd488f5cf0bf428170cb72cf434e01780cac8ef7 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150821616) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5689)
 
   * c2e5a4fb73a7b2aca22e914e57b36bc3128bb966 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385540907
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the methods in {@link ByteBufUtils}.
+ */
+public class ByteBufUtilsTest {
+
+   @Test
+   public void testAccumulateWithoutCopy() {
+   final int sourceLength = 128;
+   final int sourceStartPosition = 32;
+   final int expectedAccumulationSize = 16;
+
+   ByteBuf src = createSourceBuffer(sourceLength, 
sourceStartPosition);
+
+   ByteBuf target = Unpooled.buffer(expectedAccumulationSize);
+
+   // If src has enough data and no data has been copied yet, src 
will be returned without modification.
+   ByteBuf accumulated = ByteBufUtils.accumulate(target, src, 
expectedAccumulationSize, target.readableBytes());
+
+   assertSame(src, accumulated);
+   assertEquals(sourceStartPosition, src.readerIndex());
+
+   verifyBufferContent(src, sourceStartPosition, sourceLength - 
sourceStartPosition, sourceStartPosition);
+   }
+
+   @Test
+   public void testAccumulateWithCopy() {
+   final int firstSourceLength = 128;
+   final int firstSourceStartPosition = 32;
+   final int secondSourceLength = 64;
+   final int secondSourceStartPosition = 0;
+   final int expectedAccumulationSize = 128;
+
+   final int firstCopyLength = firstSourceLength - 
firstSourceStartPosition;
+   final int secondCopyLength = expectedAccumulationSize - 
firstCopyLength;
+
+   ByteBuf firstSource = createSourceBuffer(firstSourceLength, 
firstSourceStartPosition);
+   ByteBuf secondSource = createSourceBuffer(secondSourceLength, 
secondSourceStartPosition);
+
+   ByteBuf target = Unpooled.buffer(expectedAccumulationSize);
+
+   // If src does not have enough data, src will be copied into 
target and null will be returned.
+   ByteBuf accumulated = ByteBufUtils.accumulate(
+   target,
+   firstSource,
+   expectedAccumulationSize,
+   target.readableBytes());
+   assertNull(accumulated);
+   assertEquals(firstSourceLength, firstSource.readerIndex());
+   assertEquals(firstCopyLength, target.readableBytes());
+
+   // The remaining data will be copied from the second buffer, 
and the target buffer will be returned
+   // after all data is accumulated.
+   accumulated = ByteBufUtils.accumulate(
+   target,
+   secondSource,
+   expectedAccumulationSize,
+   target.readableBytes());
+   assertSame(target, accumulated);
+   assertEquals(secondSourceStartPosition + secondCopyLength, 
secondSource.readerIndex());
+   assertEquals(expectedAccumulationSize, target.readableBytes());
+
+   verifyBufferContent(accumulated, 0, firstCopyLength, 
firstSourceStartPosition);
 
 Review comment:
   I guess we can remove this intermediate result, and only verify the final 
result which can cover it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[GitHub] [flink] flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the 
Kubernetes decorator design
URL: https://github.com/apache/flink/pull/11233#issuecomment-591843120
 
 
   
   ## CI report:
   
   * 786a5056c957863c05ad24b00ca1dca032905eb0 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/150962647) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5717)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385540280
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the methods in {@link ByteBufUtils}.
+ */
+public class ByteBufUtilsTest {
+
+   @Test
+   public void testAccumulateWithoutCopy() {
+   final int sourceLength = 128;
+   final int sourceStartPosition = 32;
+   final int expectedAccumulationSize = 16;
+
+   ByteBuf src = createSourceBuffer(sourceLength, 
sourceStartPosition);
+
+   ByteBuf target = Unpooled.buffer(expectedAccumulationSize);
+
+   // If src has enough data and no data has been copied yet, src 
will be returned without modification.
+   ByteBuf accumulated = ByteBufUtils.accumulate(target, src, 
expectedAccumulationSize, target.readableBytes());
+
+   assertSame(src, accumulated);
+   assertEquals(sourceStartPosition, src.readerIndex());
+
+   verifyBufferContent(src, sourceStartPosition, sourceLength - 
sourceStartPosition, sourceStartPosition);
+   }
+
+   @Test
+   public void testAccumulateWithCopy() {
+   final int firstSourceLength = 128;
+   final int firstSourceStartPosition = 32;
+   final int secondSourceLength = 64;
 
 Review comment:
   we can make two sources have same length to achieve the same goal, then to 
void too many vars.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16326) Eagerly validate strictly required Flink configurations for Stateful Functions

2020-02-27 Thread Tzu-Li (Gordon) Tai (Jira)
Tzu-Li (Gordon) Tai created FLINK-16326:
---

 Summary: Eagerly validate strictly required Flink configurations 
for Stateful Functions
 Key: FLINK-16326
 URL: https://issues.apache.org/jira/browse/FLINK-16326
 Project: Flink
  Issue Type: Improvement
  Components: Stateful Functions
Affects Versions: statefun-1.1
Reporter: Tzu-Li (Gordon) Tai


Currently, when Stateful Functions users want to set their own Flink 
configurations, they are required to build on top of a base template 
{{flink-conf.yaml}} which has some strictly required configurations predefined, 
such as parent-first classloading and state backend settings.

These Flink settings should never (as of now) be changed by the user, but there 
is no validation of that in place. We should do that eagerly pre-submission of 
the translated job, probably in {{StatefulFunctionsConfig}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385538145
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the methods in {@link ByteBufUtils}.
+ */
+public class ByteBufUtilsTest {
+
+   @Test
+   public void testAccumulateWithoutCopy() {
+   final int sourceLength = 128;
 
 Review comment:
   remove final to keep consistency with others.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385538145
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the methods in {@link ByteBufUtils}.
+ */
+public class ByteBufUtilsTest {
+
+   @Test
+   public void testAccumulateWithoutCopy() {
+   final int sourceLength = 128;
 
 Review comment:
   remove final to keep consistent with others.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385537626
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the methods in {@link ByteBufUtils}.
+ */
+public class ByteBufUtilsTest {
+
+   @Test
+   public void testAccumulateWithoutCopy() {
+   final int sourceLength = 128;
+   final int sourceStartPosition = 32;
+   final int expectedAccumulationSize = 16;
+
+   ByteBuf src = createSourceBuffer(sourceLength, 
sourceStartPosition);
+
+   ByteBuf target = Unpooled.buffer(expectedAccumulationSize);
+
+   // If src has enough data and no data has been copied yet, src 
will be returned without modification.
+   ByteBuf accumulated = ByteBufUtils.accumulate(target, src, 
expectedAccumulationSize, target.readableBytes());
+
+   assertSame(src, accumulated);
+   assertEquals(sourceStartPosition, src.readerIndex());
+
 
 Review comment:
   nit: remove this empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385537572
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the methods in {@link ByteBufUtils}.
+ */
+public class ByteBufUtilsTest {
+
+   @Test
+   public void testAccumulateWithoutCopy() {
+   final int sourceLength = 128;
+   final int sourceStartPosition = 32;
+   final int expectedAccumulationSize = 16;
+
+   ByteBuf src = createSourceBuffer(sourceLength, 
sourceStartPosition);
+
 
 Review comment:
   nit: remove this empty line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385536593
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the methods in {@link ByteBufUtils}.
+ */
+public class ByteBufUtilsTest {
+
+   @Test
+   public void testAccumulateWithoutCopy() {
+   final int sourceLength = 128;
+   final int sourceStartPosition = 32;
+   final int expectedAccumulationSize = 16;
+
+   ByteBuf src = createSourceBuffer(sourceLength, 
sourceStartPosition);
+
+   ByteBuf target = Unpooled.buffer(expectedAccumulationSize);
+
+   // If src has enough data and no data has been copied yet, src 
will be returned without modification.
+   ByteBuf accumulated = ByteBufUtils.accumulate(target, src, 
expectedAccumulationSize, target.readableBytes());
+
+   assertSame(src, accumulated);
+   assertEquals(sourceStartPosition, src.readerIndex());
+
+   verifyBufferContent(src, sourceStartPosition, sourceLength - 
sourceStartPosition, sourceStartPosition);
+   }
+
+   @Test
+   public void testAccumulateWithCopy() {
+   final int firstSourceLength = 128;
+   final int firstSourceStartPosition = 32;
+   final int secondSourceLength = 64;
+   final int secondSourceStartPosition = 0;
+   final int expectedAccumulationSize = 128;
+
+   final int firstCopyLength = firstSourceLength - 
firstSourceStartPosition;
+   final int secondCopyLength = expectedAccumulationSize - 
firstCopyLength;
+
+   ByteBuf firstSource = createSourceBuffer(firstSourceLength, 
firstSourceStartPosition);
+   ByteBuf secondSource = createSourceBuffer(secondSourceLength, 
secondSourceStartPosition);
+
+   ByteBuf target = Unpooled.buffer(expectedAccumulationSize);
+
+   // If src does not have enough data, src will be copied into 
target and null will be returned.
+   ByteBuf accumulated = ByteBufUtils.accumulate(
+   target,
+   firstSource,
+   expectedAccumulationSize,
+   target.readableBytes());
+   assertNull(accumulated);
+   assertEquals(firstSourceLength, firstSource.readerIndex());
+   assertEquals(firstCopyLength, target.readableBytes());
+
+   // The remaining data will be copied from the second buffer, 
and the target buffer will be returned
+   // after all data is accumulated.
+   accumulated = ByteBufUtils.accumulate(
+   target,
+   secondSource,
+   expectedAccumulationSize,
+   target.readableBytes());
+   assertSame(target, accumulated);
+   assertEquals(secondSourceStartPosition + secondCopyLength, 
secondSource.readerIndex());
+   assertEquals(expectedAccumulationSize, target.readableBytes());
+
+   verifyBufferContent(accumulated, 0, firstCopyLength, 
firstSourceStartPosition);
+   verifyBufferContent(accumulated, firstCopyLength, 
secondCopyLength, secondSourceStartPosition);
+   }
+
+   private ByteBuf createSourceBuffer(int size, int readerIndex) {
+   ByteBuf buf = Unpooled.buffer(size);
+   for (int i = 0; i < size; ++i) {
+   buf.writeByte((byte) i);
 
 Review comment:
   `buf.writeByte(i)`?


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385536489
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ByteBufUtilsTest.java
 ##
 @@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+/**
+ * Tests the methods in {@link ByteBufUtils}.
+ */
+public class ByteBufUtilsTest {
+
+   @Test
+   public void testAccumulateWithoutCopy() {
+   final int sourceLength = 128;
+   final int sourceStartPosition = 32;
 
 Review comment:
   sourceStartPosition -> sourceReaderIndex


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and 
read log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#issuecomment-592336212
 
 
   
   ## CI report:
   
   * 912485a96d61febfa41b7c84631aeae19d819325 Travis: 
[CANCELED](https://travis-ci.com/flink-ci/flink/builds/150964833) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5718)
 
   * e474761e49927bb30bac6a297e992dc2ec98e01a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on issue #11236: [FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do not match when generic or POJO type is requested

2020-02-27 Thread GitBox
wuchong commented on issue #11236: 
[FLINK-16269][FLINK-16108][table-planner-blink] Fix schema of query and sink do 
not match when generic or POJO type is requested
URL: https://github.com/apache/flink/pull/11236#issuecomment-592370151
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #11188: [FLINK-16202][sql] Support JSON_QUERY for blink planner

2020-02-27 Thread GitBox
wuchong commented on a change in pull request #11188: [FLINK-16202][sql] 
Support JSON_QUERY for blink planner
URL: https://github.com/apache/flink/pull/11188#discussion_r385535789
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
 ##
 @@ -4195,4 +4195,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "f55=f57",
   "true")
   }
+
+  //---
+  // JSON functions
+  //---
+  @Test
+  def testJsonQuery(): Unit = {
+// lax test
+testSqlApi("json_query('{\"foo\":100}', 'lax $' null on empty)", 
"{\"foo\":100}")
 
 Review comment:
   Could you add an other test without error behavior? 
`json_query('{\"foo\":100}', 'lax $'`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #11188: [FLINK-16202][sql] Support JSON_QUERY for blink planner

2020-02-27 Thread GitBox
wuchong commented on a change in pull request #11188: [FLINK-16202][sql] 
Support JSON_QUERY for blink planner
URL: https://github.com/apache/flink/pull/11188#discussion_r385535362
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
 ##
 @@ -4195,4 +4195,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "f55=f57",
   "true")
   }
+
+  //---
+  // JSON functions
+  //---
+  @Test
+  def testJsonQuery(): Unit = {
 
 Review comment:
   Let's move json tests to a new test class, e.g. `JsonFunctionsTest`. 
`ScalarFunctionsTest` is too large now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #11188: [FLINK-16202][sql] Support JSON_QUERY for blink planner

2020-02-27 Thread GitBox
wuchong commented on a change in pull request #11188: [FLINK-16202][sql] 
Support JSON_QUERY for blink planner
URL: https://github.com/apache/flink/pull/11188#discussion_r385535655
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
 ##
 @@ -4195,4 +4195,53 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "f55=f57",
   "true")
   }
+
+  //---
+  // JSON functions
+  //---
+  @Test
+  def testJsonQuery(): Unit = {
+// lax test
+testSqlApi("json_query('{\"foo\":100}', 'lax $' null on empty)", 
"{\"foo\":100}")
+testSqlApi("json_query('{\"foo\":100}', 'lax $' error on empty)", 
"{\"foo\":100}")
+testSqlApi("json_query('{\"foo\":100}', 'lax $' empty array on empty)", 
"{\"foo\":100}");
+testSqlApi("json_query('{\"foo\":100}', 'lax $' empty object on empty)", 
"{\"foo\":100}");
+testSqlApi("json_query('{\"foo\":100}', 'lax $.foo' null on empty)", 
"null");
+testSqlApi("json_query('{\"foo\":100}', 'lax $.foo' empty array on 
empty)", "[]");
+testSqlApi("json_query('{\"foo\":100}', 'lax $.foo' empty object on 
empty)", "{}");
+
+// path error test
+testSqlApi("json_query('{\"foo\":100}', 'invalid $.foo' null on error)", 
"null");
+testSqlApi("json_query('{\"foo\":100}', 'invalid $.foo' empty array on 
error)", "[]");
+testSqlApi("json_query('{\"foo\":100}', 'invalid $.foo' empty object on 
error)", "{}");
+
+// strict test
+testSqlApi("json_query('{\"foo\":100}', 'strict $' null on empty)", 
"{\"foo\":100}");
+testSqlApi("json_query('{\"foo\":100}', 'strict $' error on empty)", 
"{\"foo\":100}");
+testSqlApi("json_query('{\"foo\":100}', 'strict $' empty array on error)", 
"{\"foo\":100}");
+testSqlApi("json_query('{\"foo\":100}', 'strict $' empty object on 
error)", "{\"foo\":100}");
+
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo1' null on error)", 
"null");
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo1' empty array on 
error)", "[]");
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo1' empty object on 
error)", "{}");
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' null on error)", 
"null");
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' empty array on 
error)", "[]");
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' empty object on 
error)", "{}");
+
+// array wrapper test
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' without wrapper)", 
"null");
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' without array 
wrapper)", "null");
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' with wrapper)", 
"[100]");
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' with unconditional 
wrapper)", "[100]");
+testSqlApi("json_query('{\"foo\":100}', 'strict $.foo' with conditional 
wrapper)", "[100]");
+testSqlApi("json_query('{\"foo\":[100]}', 'strict $.foo' without 
wrapper)", "[100]");
+testSqlApi("json_query('{\"foo\":[100]}', 'strict $.foo' without array 
wrapper)", "[100]");
+testSqlApi("json_query('{\"foo\":[100]}', 'strict $.foo' with wrapper)", 
"[[100]]");
+testSqlApi("json_query('{\"foo\":[100]}', 'strict $.foo' with 
unconditional wrapper)",
+  "[[100]]");
+testSqlApi("json_query('{\"foo\":[100]}', 'strict $.foo' with conditional 
wrapper)", "[100]");
+
+// nulls
+testSqlApi("json_query(cast(null as varchar), 'lax $')", "null")
 
 Review comment:
   Please also add some tests on a column reference? Currently, all the testing 
data is char.
   Please also test on non-string columns.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing

2020-02-27 Thread GitBox
zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] 
Introduce a builder and factory methods to create DefaultScheduler for testing
URL: https://github.com/apache/flink/pull/11213#discussion_r385534676
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -280,23 +281,8 @@ private ExecutionGraph createExecutionGraph(
failoverStrategy);
}
 
-   /**
-* @deprecated Direct access to the execution graph by scheduler 
implementations is discouraged
-* because currently the execution graph has various features and 
responsibilities that a
-* scheduler should not be concerned about. The following specialized 
abstractions to the
-* execution graph and accessors should be preferred over direct access:
-* 
-* {@link #getSchedulingTopology()}
-* {@link #getFailoverTopology()}
-* {@link #getInputsLocationsRetriever()}
-* {@link #getExecutionVertex(ExecutionVertexID)}
-* {@link #getExecutionVertexId(ExecutionAttemptID)}
-* {@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)}
-* 
-* Currently, only {@link LegacyScheduler} requires direct access to 
the execution graph.
-*/
-   @Deprecated
-   protected ExecutionGraph getExecutionGraph() {
+   @VisibleForTesting
+   public ExecutionGraph getExecutionGraph() {
 
 Review comment:
   I will think again about whether we must expose `getExecutionGraph()` to the 
tests.
   Will drop this commit and may do it when reworking the tests. And as you 
mentioned, it should be a non-hotfix commit then if we would have it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing

2020-02-27 Thread GitBox
zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] 
Introduce a builder and factory methods to create DefaultScheduler for testing
URL: https://github.com/apache/flink/pull/11213#discussion_r385534676
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -280,23 +281,8 @@ private ExecutionGraph createExecutionGraph(
failoverStrategy);
}
 
-   /**
-* @deprecated Direct access to the execution graph by scheduler 
implementations is discouraged
-* because currently the execution graph has various features and 
responsibilities that a
-* scheduler should not be concerned about. The following specialized 
abstractions to the
-* execution graph and accessors should be preferred over direct access:
-* 
-* {@link #getSchedulingTopology()}
-* {@link #getFailoverTopology()}
-* {@link #getInputsLocationsRetriever()}
-* {@link #getExecutionVertex(ExecutionVertexID)}
-* {@link #getExecutionVertexId(ExecutionAttemptID)}
-* {@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)}
-* 
-* Currently, only {@link LegacyScheduler} requires direct access to 
the execution graph.
-*/
-   @Deprecated
-   protected ExecutionGraph getExecutionGraph() {
+   @VisibleForTesting
+   public ExecutionGraph getExecutionGraph() {
 
 Review comment:
   I will think again about whether we must expose `getExecutionGraph()` to the 
tests.
   Will drop this commit and may do it when reworking the tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16274) Add typed builder methods for setting dynamic configuration on StatefulFunctionsAppContainers

2020-02-27 Thread Tzu-Li (Gordon) Tai (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047259#comment-17047259
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-16274:
-

[~igal] thanks for the comment.

{quote}
a) Having users (statefun core devs) supplying the flink-conf.yaml as part of 
the e2e test makes the e2e test
more likely to catch property renaming in Flink.
{quote}

I'm not sure I'm following your thoughts on this.
If a config key happens to be renamed in Flink / Statefun, the e2e should still 
catch them all the same, because in the end _some_ flink-conf.yaml is being 
used.

{quote}
b) It requires pulling in a dependency on flink-statefun-core into the test 
driving code, that is otherwise independent of the runtime which is a very nice 
property for an end-to-end black box test.
{quote}

The only reason we have that extra dependency on {{statefun-flink-core}} right 
now is only due to this:
https://github.com/apache/flink-statefun/pull/35/files#diff-663c1410bf2a21982e28d4512b1de018R161
I feel like this can be resolved by perhaps moving the configuration-related 
classes to a common module, instead of residing in core.

> Add typed builder methods for setting dynamic configuration on 
> StatefulFunctionsAppContainers
> -
>
> Key: FLINK-16274
> URL: https://issues.apache.org/jira/browse/FLINK-16274
> Project: Flink
>  Issue Type: New Feature
>  Components: Stateful Functions, Test Infrastructure
>Affects Versions: statefun-1.1
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Excerpt from: 
> https://github.com/apache/flink-statefun/pull/32#discussion_r383644382
> Currently, you'd need to provide a complete {{Configuration}} as dynamic 
> properties when constructing a {{StatefulFunctionsAppContainers}}.
> It'll be nicer if this is built like this:
> {code}
> public StatefulFunctionsAppContainers verificationApp =
> new StatefulFunctionsAppContainers("sanity-verification", 2)
> .withModuleGlobalConfiguration("kafka-broker", 
> kafka.getBootstrapServers())
> .withConfiguration(ConfigOption option, configValue)
> {code}
> And by default the {{StatefulFunctionsAppContainers}} just only has the 
> configs in the base template {{flink-conf.yaml}}.
> This would require lazy construction of the containers on {{beforeTest}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing

2020-02-27 Thread GitBox
zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] 
Introduce a builder and factory methods to create DefaultScheduler for testing
URL: https://github.com/apache/flink/pull/11213#discussion_r385532166
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -280,23 +281,8 @@ private ExecutionGraph createExecutionGraph(
failoverStrategy);
}
 
-   /**
-* @deprecated Direct access to the execution graph by scheduler 
implementations is discouraged
-* because currently the execution graph has various features and 
responsibilities that a
-* scheduler should not be concerned about. The following specialized 
abstractions to the
-* execution graph and accessors should be preferred over direct access:
-* 
-* {@link #getSchedulingTopology()}
-* {@link #getFailoverTopology()}
-* {@link #getInputsLocationsRetriever()}
-* {@link #getExecutionVertex(ExecutionVertexID)}
-* {@link #getExecutionVertexId(ExecutionAttemptID)}
-* {@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)}
-* 
-* Currently, only {@link LegacyScheduler} requires direct access to 
the execution graph.
-*/
-   @Deprecated
-   protected ExecutionGraph getExecutionGraph() {
+   @VisibleForTesting
+   public ExecutionGraph getExecutionGraph() {
 
 Review comment:
   There would be quite a few tests to be reworked. 
   - ExecutionGraphDeploymentTest
   - ExecutionGraphPartitionReleaseTest
   - ExecutionGraphVariousFailuesTest
   - ExecutionTest
   - ExecutionVertexCancelTest
   - ExecutionVertexInputConstraintTest
   - ExecutionVertexTest
   - ExecutionGraphNotEnoughResourceTest
   - ExecutionGraphCheckpointCoordinatorTest
   - ExecutionGraphColocationRestartTest
   - ExecutionGraphSuspendTest
   - FinalizeOnMasterTest
   
   Most of them does not need much effort to be based on the new scheduler if 
we exposes ExecutionGraph. It would make the work much easier since we only 
need to force scheduling related actions to be conducted via the scheduler.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
KarmaGYZ commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385533680
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -176,9 +179,37 @@ public void teardown() throws Exception {
}
}
 
+   static class TestingNMClientAsync extends NMClientAsync {
+
+   public List containerStatuses = new 
ArrayList<>();
+
+   protected TestingNMClientAsync(CallbackHandler callbackHandler) 
{
+   super(callbackHandler);
+   }
+
+   @Override
+   public void startContainerAsync(Container container, 
ContainerLaunchContext containerLaunchContext) {
+   // Do nothing.
+   }
+
+   @Override
+   public void stopContainerAsync(ContainerId containerId, NodeId 
nodeId) {
+   // Do nothing.
+   }
+
+   @Override
+   public void getContainerStatusAsync(ContainerId containerId, 
NodeId nodeId) {
+   for (ContainerStatus containerStatus: 
containerStatuses) {
+   if 
(containerStatus.getContainerId().equals(containerId)) {
+   
callbackHandler.onContainerStatusReceived(containerId, containerStatus);
+   }
+   }
+   }
+   }
 
 Review comment:
   Good point!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] JingsongLi commented on a change in pull request #11223: [FLINK-16281][Table SQL / Ecosystem] parameter 'maxRetryTimes' can not work in JDBCUpsertTableSink.

2020-02-27 Thread GitBox
JingsongLi commented on a change in pull request #11223: [FLINK-16281][Table 
SQL / Ecosystem] parameter 'maxRetryTimes' can not work in JDBCUpsertTableSink.
URL: https://github.com/apache/flink/pull/11223#discussion_r385532707
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/writer/AppendOnlyWriter.java
 ##
 @@ -38,6 +40,7 @@
private final String insertSQL;
private final int[] fieldTypes;
 
+   private transient List cachedRows;
private transient PreparedStatement statement;
 
 Review comment:
   I mean you can add a unit test for `AppendOnlyWriter`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-15509) Use sql cilents create view occur Unexpected exception

2020-02-27 Thread Jingsong Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15509?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee reassigned FLINK-15509:


Assignee: Leonard Xu

> Use sql cilents create view occur Unexpected exception
> --
>
> Key: FLINK-15509
> URL: https://issues.apache.org/jira/browse/FLINK-15509
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / Client
>Affects Versions: 1.10.0
>Reporter: Xianxun Ye
>Assignee: Leonard Xu
>Priority: Major
> Fix For: 1.11.0
>
>
> version:master.
> Firstly I created a table sucessful by sql clients,  and then throw an 
> unexcepetd exp when created a view.
> My steps:
> Flink SQL> create table myTable (id int);
> *[INFO] Table has been created.*
> Flink SQL> show tables ;
> myTable
> Flink SQL> describe myTable ;
> root
>  |-- id: INT
> Flink SQL> create view myView as select * from myTable;
>  
> Exception in thread "main" org.apache.flink.table.client.SqlClientException: 
> Unexpected exception. This is a bug. Please consider filing an issue.
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:190)
> Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
> failed. findAndCreateTableSource failed.
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validateInternal(FlinkPlannerImpl.scala:130)
>  at 
> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:105)
>  at 
> org.apache.flink.table.sqlexec.SqlToOperationConverter.convert(SqlToOperationConverter.java:124)
>  at org.apache.flink.table.planner.ParserImpl.parse(ParserImpl.java:66)
>  at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
>  at 
> org.apache.flink.table.client.gateway.local.LocalExecutor.addView(LocalExecutor.java:300)
>  at 
> org.apache.flink.table.client.cli.CliClient.callCreateView(CliClient.java:579)
>  at 
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:308)
>  at java.util.Optional.ifPresent(Optional.java:159)
>  at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:200)
>  at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>  at org.apache.flink.table.client.SqlClient.start(SqlClient.java:104)
>  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:178)
> Caused by: org.apache.flink.table.api.TableException: 
> findAndCreateTableSource failed.
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
>  at 
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:138)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:97)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:86)
>  at java.util.Optional.map(Optional.java:215)
>  at 
> org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:76)
>  at 
> org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
>  at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289)
>  at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143)
>  at 
> org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
>  at 
> org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
>  at 
> org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:965)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3125)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3107)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3379)
>  at 
> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
>  at 
> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1005)
>  at 
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:965)
>  at 

[GitHub] [flink] zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] Introduce a builder and factory methods to create DefaultScheduler for testing

2020-02-27 Thread GitBox
zhuzhurk commented on a change in pull request #11213: [FLINK-16276][tests] 
Introduce a builder and factory methods to create DefaultScheduler for testing
URL: https://github.com/apache/flink/pull/11213#discussion_r385532166
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 ##
 @@ -280,23 +281,8 @@ private ExecutionGraph createExecutionGraph(
failoverStrategy);
}
 
-   /**
-* @deprecated Direct access to the execution graph by scheduler 
implementations is discouraged
-* because currently the execution graph has various features and 
responsibilities that a
-* scheduler should not be concerned about. The following specialized 
abstractions to the
-* execution graph and accessors should be preferred over direct access:
-* 
-* {@link #getSchedulingTopology()}
-* {@link #getFailoverTopology()}
-* {@link #getInputsLocationsRetriever()}
-* {@link #getExecutionVertex(ExecutionVertexID)}
-* {@link #getExecutionVertexId(ExecutionAttemptID)}
-* {@link #getExecutionVertexIdOrThrow(ExecutionAttemptID)}
-* 
-* Currently, only {@link LegacyScheduler} requires direct access to 
the execution graph.
-*/
-   @Deprecated
-   protected ExecutionGraph getExecutionGraph() {
+   @VisibleForTesting
+   public ExecutionGraph getExecutionGraph() {
 
 Review comment:
   There would be quite a few tests to be reworked. 
   - ExecutionGraphDeploymentTest
   - ExecutionGraphPartitionReleaseTest
   - ExecutionGraphVariousFailuesTest
   - ExecutionTest
   - ExecutionVertexCancelTest
   - ExecutionVertexInputConstraintTest
   - ExecutionVertexTest
   - ExecutionGraphNotEnoughResourceTest
   - ExecutionGraphCheckpointCoordinatorTest
   - ExecutionGraphColocationRestartTest
   - ExecutionGraphSuspendTest
   - FinalizeOnMasterTest
   
   Most of them does not need much effort to be based on the new scheduler. 
Exposing ExecutionGraph would make the work much easier since we only need to 
force scheduling related actions to be conducted via the scheduler.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11250: [FLINK-16302][rest]add log list and 
read log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#issuecomment-592336212
 
 
   
   ## CI report:
   
   * 912485a96d61febfa41b7c84631aeae19d819325 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150964833) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5718)
 
   * e474761e49927bb30bac6a297e992dc2ec98e01a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385530896
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -528,6 +558,31 @@ public void testOnStartContainerError() throws Exception {
}};
}
 
+   @Test
+   public void testStartWithContainerFromPreviousAttempt() throws 
Exception {
+   new Context() {{
+   runTest(() -> {
+   Container runningCountainer = 
mockContainer("container", 1234, 1, Resource.newInstance(1024, 1));
+   Container newContainer = 
mockContainer("container", 1234, 2, Resource.newInstance(1024, 1));
+   resourceManager.getWorkerNodeMap().put(new 
ResourceID(runningCountainer.getId().toString()), new 
YarnWorkerNode(runningCountainer));
+   resourceManager.getWorkerNodeMap().put(new 
ResourceID(newContainer.getId().toString()), new YarnWorkerNode(newContainer));
+   testingNMClientAsync.containerStatuses.add(
+   
ContainerStatus.newInstance(runningCountainer.getId(), ContainerState.RUNNING, 
"", 0));
+   testingNMClientAsync.containerStatuses.add(
+   
ContainerStatus.newInstance(newContainer.getId(), ContainerState.NEW, "", 0));
+
+   CompletableFuture 
requestContainerStatusFuture = resourceManager.runInMainThread(() -> {
+   
testingNMClientAsync.getContainerStatusAsync(runningCountainer.getId(), 
runningCountainer.getNodeId());
+   
testingNMClientAsync.getContainerStatusAsync(newContainer.getId(), 
newContainer.getNodeId());
+   return null;
+   });
 
 Review comment:
   I think we can make `getContainersFromPreviousAttempts` visible for testing, 
and call it on the main thread with a custom 
`RegisterApplicationMasterResponse`.
   
   The purpose of this test case is to test `YarnResourceManager` properly 
handles recovered containers, including querying their status from Yarn NM and 
handles them according to the received status. Currently, only covers second 
part of the workflow. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385527439
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -454,6 +456,11 @@ public void onError(Throwable error) {
onFatalError(error);
}
 
+   @VisibleForTesting
+   Map getWorkerNodeMap() {
+   return workerNodeMap;
 
 Review comment:
   ```suggestion
return Collections.unmodifiableMap(workerNodeMap);
   ```
   We should always be careful exposing non-primitive fields, such as `Map` and 
`List`. Despite declared `final`, they might still be changed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385519116
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -464,7 +471,15 @@ public void onContainerStarted(ContainerId containerId, 
Map
 
@Override
public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-   // We are not interested in getting container status
+   // We fetch the status of the container from the previous 
attempts.
+   if (containerStatus.getState() == ContainerState.NEW) {
+   // If the status is "NEW", it means that the container 
is allocated but not be started yet.
+   // We need to release it.
+   log.warn("The container {} from the previous attempt 
did not start. Released.", containerId);
 
 Review comment:
   ```suggestion
log.info("Releasing container {} from the previous 
attempt. No TaskExecutor started inside.", containerId);
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385519373
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -464,7 +471,15 @@ public void onContainerStarted(ContainerId containerId, 
Map
 
@Override
public void onContainerStatusReceived(ContainerId containerId, 
ContainerStatus containerStatus) {
-   // We are not interested in getting container status
+   // We fetch the status of the container from the previous 
attempts.
+   if (containerStatus.getState() == ContainerState.NEW) {
+   // If the status is "NEW", it means that the container 
is allocated but not be started yet.
+   // We need to release it.
+   log.warn("The container {} from the previous attempt 
did not start. Released.", containerId);
 
 Review comment:
   I think a INFO level log message should be enough. This is not causing any 
problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
xintongsong commented on a change in pull request #11248: [FLINK-16299] Release 
containers recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#discussion_r385525529
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -176,9 +179,37 @@ public void teardown() throws Exception {
}
}
 
+   static class TestingNMClientAsync extends NMClientAsync {
+
+   public List containerStatuses = new 
ArrayList<>();
+
+   protected TestingNMClientAsync(CallbackHandler callbackHandler) 
{
+   super(callbackHandler);
+   }
+
+   @Override
+   public void startContainerAsync(Container container, 
ContainerLaunchContext containerLaunchContext) {
+   // Do nothing.
+   }
+
+   @Override
+   public void stopContainerAsync(ContainerId containerId, NodeId 
nodeId) {
+   // Do nothing.
+   }
+
+   @Override
+   public void getContainerStatusAsync(ContainerId containerId, 
NodeId nodeId) {
+   for (ContainerStatus containerStatus: 
containerStatuses) {
+   if 
(containerStatus.getContainerId().equals(containerId)) {
+   
callbackHandler.onContainerStatusReceived(containerId, containerStatus);
+   }
+   }
+   }
+   }
 
 Review comment:
   Let's make this class more general by using `Function` and `Consumer` to 
define its behaviors. Something like the following, taking 
`getContainerStatusAsync` as an example.
   ```
   static class TestingNMClientAsync extends NMClientAsync {
   // ...
   
   private final BiFunction 
getContainerStatusAsyncFunction;
   
   // ...
   
   public void getContainerStatusAsync(ContainerId containerId, NodeId 
nodeId) {
   callbackHandler.onContainerStatusReceived(containerId, 
getContainerStatusAsyncFunction.apply(containerId, NodeId));
   }
   
   // ...
   }
   ```
   
   We can use a builder class to create this class, allowing setting custom 
`Function` and `Consumer`. If the codes for this class grows too much, we can 
also put it in a separate file.
   
   There are several benefit for doing this.
   - It allows defining per-test-case behavior, which makes is easier to reuse 
this class in the future.
   - It avoids using `mock`, `spy` and `verify`.
   - It avoids having a public accessible `containerStatuses`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-15786) Load connector code with separate classloader

2020-02-27 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047229#comment-17047229
 ] 

Guowei Ma edited comment on FLINK-15786 at 2/28/20 6:40 AM:


Thanks [~pnowojski] for your detailed explanation.

I agree with you that we could change the DataStream API to postpone the 
construction of the source/sink operator. This could avoid the user to use the 
Plugin mechanism directly to create a source/sink object at the client-side for 
separating the connector class loader from the user code classloader.

What I concern about is that what the relationship between the 
FlinkUserClassLoader and the connector Classloader. In theory, there might be 
interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This 
interface is provided by the Kafka connector. And the user could implement it. 
The implemented class is also passed to the Kafka connector and would be 
constructed at runtime. One option for this scenario is to provide a 
classloader that knows both the user implemented FlinkKafkaPartitioner class 
and the Kafka connector class. We could call it XClassloader at first. 

Do you think that we need to resolve this scenario?  Do you have any concerns 
about the XClassloader? A related open question is that the datastream users 
could also enjoy the benefits of classloader isolation without changing any 
code if we choose to use the XClassLoader solution.


was (Author: maguowei):
Thanks [~pnowojski] for your detailed explanation.

I agree with you that we could change the DataStream API to postpone the 
construction of the source/sink operator. This could avoid the user to use the 
Plugin mechanism directly to create a source/sink object at the client-side for 
separating the connector class loader from the user code classloader.

What I concern about is that what the relationship between the 
FlinkUserClassLoader and the connector Classloader. In theory, there might be 
interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This 
interface is provided by the Kafka connector. And the user could implement it. 
The implemented class is also passed to the Kafka connector and would be 
constructed at runtime. One option for this scenario is to provide a 
classloader that knows both the user implemented FlinkKafkaPartitioner class 
and the Kafka connector class. We could call it XClassloader at first. 

Do you think that we need to resolve this scenario?  A related open question is 
that we might not need to change(or add) the DataStream API for separating the 
connector classloader if we choose to use the XClassloader.

> Load connector code with separate classloader
> -
>
> Key: FLINK-15786
> URL: https://issues.apache.org/jira/browse/FLINK-15786
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Guowei Ma
>Priority: Major
>  Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users 
> only need to add the corresponding connector as a dependency and package it 
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often 
> introduce heavy dependencies, there is a high possibility of a class conflict 
> of different connectors or the user code of the same job. For example, every 
> one or two weeks, we will receive issue reports relevant with connector class 
> conflict from our users. The problem can get worse when users want to analyze 
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve 
> the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner

2020-02-27 Thread GitBox
wuchong commented on a change in pull request #11174: [FLINK-16199][sql] 
Support IS JSON predicate for blink planner
URL: https://github.com/apache/flink/pull/11174#discussion_r385528122
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/NegativeCallGen.scala
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+
+import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
GeneratedExpression}
+import org.apache.flink.table.types.logical.{BooleanType, LogicalType}
+
+/**
+ * Inverts the boolean value of a [[CallGenerator]] result.
+ */
+class NegativeCallGen(callGenerator: CallGenerator) extends CallGenerator {
+
+  override def generate(
+ctx: CodeGeneratorContext,
+operands: Seq[GeneratedExpression],
+returnType: LogicalType
+  ): GeneratedExpression = {
+assert(returnType.isInstanceOf[BooleanType])
+
+val expr = callGenerator.generate(ctx, operands, returnType)
+generateCallIfArgsNotNull(ctx, returnType, Seq(expr), 
returnType.isNullable) {
+  originalTerms =>
+assert(originalTerms.size == 1)
+
+s"!${originalTerms.head}"
+}
 
 Review comment:
   Can simplify to 
   ```scala
   ScalarOperatorGens.generateNot(ctx, callGenerator.generate(ctx, operands, 
returnType))
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner

2020-02-27 Thread GitBox
wuchong commented on a change in pull request #11174: [FLINK-16199][sql] 
Support IS JSON predicate for blink planner
URL: https://github.com/apache/flink/pull/11174#discussion_r385526426
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
 ##
 @@ -4195,4 +4195,24 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "f55=f57",
   "true")
   }
+
+  @Test
+  def testIsJSONPredicates(): Unit = {
 
 Review comment:
   Let's move json tests to a new test class, e.g. `JsonFunctionsTest`. 
`ScalarFunctionsTest` is too large now. 
   Btw, let's use the lower case, `testIsJsonPredicates`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16142) Memory Leak causes Metaspace OOM error on repeated job submission

2020-02-27 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047253#comment-17047253
 ] 

Arvid Heise commented on FLINK-16142:
-

[~sewen], are we using 
[URLClassLoader#close|https://docs.oracle.com/javase/8/docs/technotes/guides/net/ClassLoader.html]?

> Memory Leak causes Metaspace OOM error on repeated job submission
> -
>
> Key: FLINK-16142
> URL: https://issues.apache.org/jira/browse/FLINK-16142
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: Thomas Wozniakowski
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
> Attachments: Leak-GC-root.png, java_pid1.hprof, java_pid1.hprof
>
>
> Hi Guys,
> We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our 
> use-case exactly (RocksDB state backend running in a containerised cluster). 
> Unfortunately, it seems like there is a memory leak somewhere in the job 
> submission logic. We are getting this error:
> {code:java}
> 2020-02-18 10:22:10,020 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME 
> switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: Metaspace
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.(AwsSdkMetrics.java:359)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:389)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:279)
> at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:686)
> at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:287)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> {code}
> (The only change in the above text is the OPERATOR_NAME text where I removed 
> some of the internal specifics of our system).
> This will reliably happen on a fresh cluster after submitting and cancelling 
> our job 3 times.
> We are using the presto-s3 plugin, the CEP library and the Kinesis connector.
> Please let me know what other diagnostics would be useful.
> Tom



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (FLINK-16142) Memory Leak causes Metaspace OOM error on repeated job submission

2020-02-27 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047253#comment-17047253
 ] 

Arvid Heise edited comment on FLINK-16142 at 2/28/20 6:39 AM:
--

[~sewen], are we using 
[URLClassLoader#close|https://docs.oracle.com/javase/8/docs/technotes/guides/net/ClassLoader.html]?
 Not a guarantee, but probably better than nothing.


was (Author: arvid heise):
[~sewen], are we using 
[URLClassLoader#close|https://docs.oracle.com/javase/8/docs/technotes/guides/net/ClassLoader.html]?

> Memory Leak causes Metaspace OOM error on repeated job submission
> -
>
> Key: FLINK-16142
> URL: https://issues.apache.org/jira/browse/FLINK-16142
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.10.0
>Reporter: Thomas Wozniakowski
>Priority: Blocker
> Fix For: 1.10.1, 1.11.0
>
> Attachments: Leak-GC-root.png, java_pid1.hprof, java_pid1.hprof
>
>
> Hi Guys,
> We've just tried deploying 1.10.0 as it has lots of shiny stuff that fits our 
> use-case exactly (RocksDB state backend running in a containerised cluster). 
> Unfortunately, it seems like there is a memory leak somewhere in the job 
> submission logic. We are getting this error:
> {code:java}
> 2020-02-18 10:22:10,020 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph - OPERATOR_NAME 
> switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: Metaspace
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:757)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
> at 
> org.apache.flink.util.ChildFirstClassLoader.loadClass(ChildFirstClassLoader.java:60)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:398)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.metrics.AwsSdkMetrics.(AwsSdkMetrics.java:359)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:728)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:660)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:652)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:611)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:606)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeListShards(AmazonKinesisClient.java:1534)
> at 
> org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.listShards(AmazonKinesisClient.java:1528)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.listShards(KinesisProxy.java:439)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardsOfStream(KinesisProxy.java:389)
> at 
> org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getShardList(KinesisProxy.java:279)
> at 
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.discoverNewShardsToSubscribe(KinesisDataFetcher.java:686)
> at 
> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:287)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> {code}
> (The only change in the above text is the OPERATOR_NAME text where I removed 
> some of the internal specifics of our system).
> This will reliably happen on a fresh cluster after submitting and cancelling 
> our job 3 times.
> We are using the presto-s3 plugin, the CEP library and the Kinesis connector.
> Please let me know what other diagnostics would be useful.
> Tom



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner

2020-02-27 Thread GitBox
wuchong commented on a change in pull request #11174: [FLINK-16199][sql] 
Support IS JSON predicate for blink planner
URL: https://github.com/apache/flink/pull/11174#discussion_r385527809
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/NegativeCallGen.scala
 ##
 @@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.codegen.calls
+
+import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateCallIfArgsNotNull
+import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
GeneratedExpression}
+import org.apache.flink.table.types.logical.{BooleanType, LogicalType}
+
+/**
+ * Inverts the boolean value of a [[CallGenerator]] result.
+ */
+class NegativeCallGen(callGenerator: CallGenerator) extends CallGenerator {
 
 Review comment:
   In SQL, it is called `NOT`. We can call it `NotCallGen`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner

2020-02-27 Thread GitBox
wuchong commented on a change in pull request #11174: [FLINK-16199][sql] 
Support IS JSON predicate for blink planner
URL: https://github.com/apache/flink/pull/11174#discussion_r385529676
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/FunctionGenerator.scala
 ##
 @@ -753,6 +753,54 @@ object FunctionGenerator {
 Seq(FLOAT, INTEGER),
 BuiltInMethods.TRUNCATE_FLOAT)
 
+  addSqlFunctionMethod(
+IS_JSON_VALUE,
+Seq(CHAR),
 
 Review comment:
   I think `VARCHAR` will be safer than `CHAR`  here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wuchong commented on a change in pull request #11174: [FLINK-16199][sql] Support IS JSON predicate for blink planner

2020-02-27 Thread GitBox
wuchong commented on a change in pull request #11174: [FLINK-16199][sql] 
Support IS JSON predicate for blink planner
URL: https://github.com/apache/flink/pull/11174#discussion_r385529970
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
 ##
 @@ -4195,4 +4195,24 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
   "f55=f57",
   "true")
   }
+
+  @Test
+  def testIsJSONPredicates(): Unit = {
+testSqlApi("'{}' is json value", "true")
+testSqlApi("'{]' is json value", "false")
+testSqlApi("'{}' is json object", "true")
+testSqlApi("'[]' is json object", "false")
+testSqlApi("'{}' is json array", "false")
+testSqlApi("'[]' is json array", "true")
+testSqlApi("'100' is json scalar", "true")
+testSqlApi("'[]' is json scalar", "false")
+testSqlApi("'{}' is not json value", "false")
+testSqlApi("'{]' is not json value", "true")
+testSqlApi("'{}' is not json object", "false")
+testSqlApi("'[]' is not json object", "true")
+testSqlApi("'{}' is not json array", "true")
+testSqlApi("'[]' is not json array", "false")
+testSqlApi("'100' is not json scalar", "false")
+testSqlApi("'[]' is not json scalar", "true")
 
 Review comment:
   Could you add some tests on a column reference? Currently, all the testing 
data is char. 
   Please also test on non-string columns.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-02-27 Thread GitBox
flinkbot commented on issue #11250: [FLINK-16302][rest]add log list and read 
log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#issuecomment-592336212
 
 
   
   ## CI report:
   
   * 912485a96d61febfa41b7c84631aeae19d819325 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-02-27 Thread GitBox
flinkbot commented on issue #11250: [FLINK-16302][rest]add log list and read 
log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#issuecomment-592334762
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 912485a96d61febfa41b7c84631aeae19d819325 (Fri Feb 28 
06:25:53 UTC 2020)
   
   **Warnings:**
* No documentation files were touched! Remember to keep the Flink docs up 
to date!
* **This pull request references an unassigned [Jira 
ticket](https://issues.apache.org/jira/browse/FLINK-16302).** According to the 
[code contribution 
guide](https://flink.apache.org/contributing/contribute-code.html), tickets 
need to be assigned before starting with the implementation work.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] jinglining commented on issue #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-02-27 Thread GitBox
jinglining commented on issue #11250: [FLINK-16302][rest]add log list and read 
log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250#issuecomment-592334929
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16302) add log list and read log by name for taskmanager

2020-02-27 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-16302:
---
Labels: pull-request-available  (was: )

> add log list and read log by name for taskmanager
> -
>
> Key: FLINK-16302
> URL: https://issues.apache.org/jira/browse/FLINK-16302
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Reporter: lining
>Priority: Major
>  Labels: pull-request-available
>
> *  list taskmanager all log file
>  ** /taskmanagers/taskmanagerid/logs
>  ** 
> {code:java}
> {
>   "logs": [
> {
>   "name": "taskmanager.log",
>   "size": 12529
> }
>   ]
> } {code}
>  * read taskmanager log file
>  **  /taskmanagers/log/[filename]
>  ** response: same as taskmanager’s log



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] jinglining opened a new pull request #11250: [FLINK-16302][rest]add log list and read log by name for taskmanager

2020-02-27 Thread GitBox
jinglining opened a new pull request #11250: [FLINK-16302][rest]add log list 
and read log by name for taskmanager
URL: https://github.com/apache/flink/pull/11250
 
 
   
   ## What is the purpose of the change
   
   This pull request makes rest api could get log list and get log by name for 
taskmanager
   
   
   ## Brief change log
 - get log list
 - get log by name
   
   ## Verifying this change
   
   
   
   This change added tests and can be verified as follows:
-  Added TaskManagerLogsHandlerTest that verfied TaskManagerLogsHandler.
-  Updated AbstractTaskManagerFileHandlerTest could verfied 
TaskManagerCustomFileHandler.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (docs)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-15786) Load connector code with separate classloader

2020-02-27 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047229#comment-17047229
 ] 

Guowei Ma edited comment on FLINK-15786 at 2/28/20 6:15 AM:


Thanks [~pnowojski] for your detailed explanation.

I agree with you that we could change the DataStream API to postpone the 
construction of the source/sink operator. This could avoid the user to use the 
Plugin mechanism directly to create a source/sink object at the client-side for 
separating the connector class loader from the user code classloader.

What I concern about is that what the relationship between the 
FlinkUserClassLoader and the connector Classloader. In theory, there might be 
interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This 
interface is provided by the Kafka connector. And the user could implement it. 
The implemented class is also passed to the Kafka connector and would be 
constructed at runtime. One option for this scenario is to provide a 
classloader that knows both the user implemented FlinkKafkaPartitioner class 
and the Kafka connector class. We could call it XClassloader at first. 

Do you think that we need to resolve this scenario?  A related open question is 
that we might not need to change(or add) the DataStream API for separating the 
connector classloader if we choose to use the XClassloader.


was (Author: maguowei):
Thanks [~pnowojski] for your detailed explanation.

I agree with you that we could change the DataStream API to postpone the 
construction of the source/sink operator. This could avoid the user to use the 
Plugin mechanism directly to create a source/sink object at the client-side for 
separating the connector class loader from the user code classloader.

What I concern about is that what the relationship between the 
FlinkUserClassLoader and the connector Classloader. In theory, there might be 
interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This 
interface provided by the Kafka connector that the user could implement it. The 
implemented class is passed to the Kafka connector and would be constructed at 
runtime. One option for this scenario is to provide a classloader that knows 
both the user implemented FlinkKafkaPartitioner class and the Kafka connector 
class. We could call it XClassloader at first. 

Do you think that we need to resolve this scenario?  A related open question is 
that we might not need to change(or add) the DataStream API for separating the 
connector classloader if we choose to use the XClassloader.

> Load connector code with separate classloader
> -
>
> Key: FLINK-15786
> URL: https://issues.apache.org/jira/browse/FLINK-15786
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Guowei Ma
>Priority: Major
>  Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users 
> only need to add the corresponding connector as a dependency and package it 
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often 
> introduce heavy dependencies, there is a high possibility of a class conflict 
> of different connectors or the user code of the same job. For example, every 
> one or two weeks, we will receive issue reports relevant with connector class 
> conflict from our users. The problem can get worse when users want to analyze 
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve 
> the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the 
Kubernetes decorator design
URL: https://github.com/apache/flink/pull/11233#issuecomment-591843120
 
 
   
   ## CI report:
   
   * fb57917227853d0477aa1383d399a619146d7170 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150793256) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5676)
 
   * 786a5056c957863c05ad24b00ca1dca032905eb0 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150962647) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5717)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce 
ArrowPythonScalarFunctionOperator for vectorized Python UDF execution
URL: https://github.com/apache/flink/pull/11208#issuecomment-590746668
 
 
   
   ## CI report:
   
   * 31226c452d50eb216796a3c1b963f047bb2c5698 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/150957094) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5716)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-15786) Load connector code with separate classloader

2020-02-27 Thread Guowei Ma (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047229#comment-17047229
 ] 

Guowei Ma commented on FLINK-15786:
---

Thanks [~pnowojski] for your detailed explanation.

I agree with you that we could change the DataStream API to postpone the 
construction of the source/sink operator. This could avoid the user to use the 
Plugin mechanism directly to create a source/sink object at the client-side for 
separating the connector class loader from the user code classloader.

What I concern about is that what the relationship between the 
FlinkUserClassLoader and the connector Classloader. In theory, there might be 
interdependent scenarios such as FlinkKafkaPartitioner/KafkaConnector. This 
interface provided by the Kafka connector that the user could implement it. The 
implemented class is passed to the Kafka connector and would be constructed at 
runtime. One option for this scenario is to provide a classloader that knows 
both the user implemented FlinkKafkaPartitioner class and the Kafka connector 
class. We could call it XClassloader at first. 

Do you think that we need to resolve this scenario?  A related open question is 
that we might not need to change(or add) the DataStream API for separating the 
connector classloader if we choose to use the XClassloader.

> Load connector code with separate classloader
> -
>
> Key: FLINK-15786
> URL: https://issues.apache.org/jira/browse/FLINK-15786
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Task
>Reporter: Guowei Ma
>Priority: Major
>  Labels: usability
>
> Currently, connector code can be seen as part of user code. Usually, users 
> only need to add the corresponding connector as a dependency and package it 
> in the user jar. This is convenient enough.
> However, connectors usually need to interact with external systems and often 
> introduce heavy dependencies, there is a high possibility of a class conflict 
> of different connectors or the user code of the same job. For example, every 
> one or two weeks, we will receive issue reports relevant with connector class 
> conflict from our users. The problem can get worse when users want to analyze 
> data from different sources and write output to different sinks.
> Using separate classloader to load the different connector code could resolve 
> the problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16325) A connection check is required, and it needs to be reopened when the JDBC connection is interrupted

2020-02-27 Thread renjianxu (Jira)
renjianxu created FLINK-16325:
-

 Summary:  A connection check is required, and it needs to be 
reopened when the JDBC connection is interrupted
 Key: FLINK-16325
 URL: https://issues.apache.org/jira/browse/FLINK-16325
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / JDBC
Affects Versions: 1.10.0
Reporter: renjianxu


JDBCOutputFormat#writeRecord.

When writing data, if the JDBC connection has been disconnected, the data will 
be lost.Therefore, a connectivity judgment is required in the writeRecord 
method.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385521240
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
 ##
 @@ -81,11 +95,47 @@
});
}
 
-   public NettyMessageSerializationTest(boolean testReadOnlyBuffer, 
boolean testCompressedBuffer) {
+   public NettyMessageClientSideSerializationTest(boolean 
testReadOnlyBuffer, boolean testCompressedBuffer) {
this.testReadOnlyBuffer = testReadOnlyBuffer;
this.testCompressedBuffer = testCompressedBuffer;
}
 
+   @Before
+   public void setup() throws IOException, InterruptedException {
+   networkBufferPool = new NetworkBufferPool(10, 1024, 2);
+   BufferPool bufferPool = networkBufferPool.createBufferPool(8, 
8);
+
+   inputGate = new SingleInputGateBuilder()
+   .setNumberOfChannels(1)
+   .setBufferPoolFactory(bufferPool)
+   .build();
+   inputChannel = createRemoteInputChannel(
+   inputGate,
+   mock(PartitionRequestClient.class),
+   networkBufferPool);
+   inputGate.assignExclusiveSegments();
+   inputChannel.requestSubpartition(0);
+
+   CreditBasedPartitionRequestClientHandler handler = new 
CreditBasedPartitionRequestClientHandler();
 
 Review comment:
   CreditBasedPartitionRequestClientHandler -> NetworkClientHandler


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385521292
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
 ##
 @@ -81,11 +95,47 @@
});
}
 
-   public NettyMessageSerializationTest(boolean testReadOnlyBuffer, 
boolean testCompressedBuffer) {
+   public NettyMessageClientSideSerializationTest(boolean 
testReadOnlyBuffer, boolean testCompressedBuffer) {
this.testReadOnlyBuffer = testReadOnlyBuffer;
this.testCompressedBuffer = testCompressedBuffer;
}
 
+   @Before
+   public void setup() throws IOException, InterruptedException {
+   networkBufferPool = new NetworkBufferPool(10, 1024, 2);
+   BufferPool bufferPool = networkBufferPool.createBufferPool(8, 
8);
+
+   inputGate = new SingleInputGateBuilder()
+   .setNumberOfChannels(1)
+   .setBufferPoolFactory(bufferPool)
+   .build();
+   inputChannel = createRemoteInputChannel(
+   inputGate,
+   mock(PartitionRequestClient.class),
+   networkBufferPool);
+   inputGate.assignExclusiveSegments();
+   inputChannel.requestSubpartition(0);
+
+   CreditBasedPartitionRequestClientHandler handler = new 
CreditBasedPartitionRequestClientHandler();
+   handler.addInputChannel(inputChannel);
+
+   channel = new EmbeddedChannel(
+   new NettyMessage.NettyMessageEncoder(), // outbound 
messages
 
 Review comment:
   for outbound messages


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385521186
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
 ##
 @@ -81,11 +95,47 @@
});
}
 
-   public NettyMessageSerializationTest(boolean testReadOnlyBuffer, 
boolean testCompressedBuffer) {
+   public NettyMessageClientSideSerializationTest(boolean 
testReadOnlyBuffer, boolean testCompressedBuffer) {
this.testReadOnlyBuffer = testReadOnlyBuffer;
this.testCompressedBuffer = testCompressedBuffer;
}
 
+   @Before
+   public void setup() throws IOException, InterruptedException {
+   networkBufferPool = new NetworkBufferPool(10, 1024, 2);
+   BufferPool bufferPool = networkBufferPool.createBufferPool(8, 
8);
+
+   inputGate = new SingleInputGateBuilder()
+   .setNumberOfChannels(1)
+   .setBufferPoolFactory(bufferPool)
+   .build();
+   inputChannel = createRemoteInputChannel(
+   inputGate,
+   mock(PartitionRequestClient.class),
+   networkBufferPool);
+   inputGate.assignExclusiveSegments();
+   inputChannel.requestSubpartition(0);
 
 Review comment:
   I guess this is not necessary because the input channel would be added into 
handler explicitly below


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385521035
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
 ##
 @@ -81,11 +95,47 @@
});
}
 
-   public NettyMessageSerializationTest(boolean testReadOnlyBuffer, 
boolean testCompressedBuffer) {
+   public NettyMessageClientSideSerializationTest(boolean 
testReadOnlyBuffer, boolean testCompressedBuffer) {
this.testReadOnlyBuffer = testReadOnlyBuffer;
this.testCompressedBuffer = testCompressedBuffer;
}
 
+   @Before
+   public void setup() throws IOException, InterruptedException {
+   networkBufferPool = new NetworkBufferPool(10, 1024, 2);
+   BufferPool bufferPool = networkBufferPool.createBufferPool(8, 
8);
+
+   inputGate = new SingleInputGateBuilder()
+   .setNumberOfChannels(1)
+   .setBufferPoolFactory(bufferPool)
+   .build();
+   inputChannel = createRemoteInputChannel(
+   inputGate,
+   mock(PartitionRequestClient.class),
 
 Review comment:
   Mock is not suggested, so we can bypass this component which is actually not 
used in tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385520841
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
 ##
 @@ -81,11 +95,47 @@
});
}
 
-   public NettyMessageSerializationTest(boolean testReadOnlyBuffer, 
boolean testCompressedBuffer) {
+   public NettyMessageClientSideSerializationTest(boolean 
testReadOnlyBuffer, boolean testCompressedBuffer) {
this.testReadOnlyBuffer = testReadOnlyBuffer;
this.testCompressedBuffer = testCompressedBuffer;
}
 
+   @Before
+   public void setup() throws IOException, InterruptedException {
+   networkBufferPool = new NetworkBufferPool(10, 1024, 2);
+   BufferPool bufferPool = networkBufferPool.createBufferPool(8, 
8);
+
+   inputGate = new SingleInputGateBuilder()
+   .setNumberOfChannels(1)
 
 Review comment:
   this can be removed, default is 1.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the 
Kubernetes decorator design
URL: https://github.com/apache/flink/pull/11233#issuecomment-591843120
 
 
   
   ## CI report:
   
   * fb57917227853d0477aa1383d399a619146d7170 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150793256) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5676)
 
   * 786a5056c957863c05ad24b00ca1dca032905eb0 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11249: [FLINK-16297] Remove unnecessary indents and blank lines in code blocks

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11249: [FLINK-16297] Remove unnecessary 
indents and blank lines in code blocks
URL: https://github.com/apache/flink/pull/11249#issuecomment-592302102
 
 
   
   ## CI report:
   
   * fe25cef13c65c9f584b53aa7216347ee1b3e8f16 Travis: 
[SUCCESS](https://travis-ci.com/flink-ci/flink/builds/150955168) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5715)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385518757
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
 ##
 @@ -183,15 +192,18 @@ private void testEncodeDecodeBuffer(boolean 
testReadOnlyBuffer, boolean testComp
}
 
NettyMessage.BufferResponse expected = new 
NettyMessage.BufferResponse(
-   testBuffer, random.nextInt(), new InputChannelID(), 
random.nextInt());
-   NettyMessage.BufferResponse actual = encodeAndDecode(expected);
+   testBuffer, random.nextInt(), 
inputChannel.getInputChannelId(), random.nextInt());
+   NettyMessage.BufferResponse actual = encodeAndDecode(expected, 
channel);
 
// Netty 4.1 is not copying the messages, but retaining slices 
of them. BufferResponse actual is in this case
// holding a reference to the buffer. Buffer will be recycled 
only once "actual" will be released.
-   assertFalse(buffer.isRecycled());
-   assertFalse(testBuffer.isRecycled());
+   assertTrue(buffer.isRecycled());
 
 Review comment:
   Why this assert is changed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385518513
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
 ##
 @@ -183,15 +192,18 @@ private void testEncodeDecodeBuffer(boolean 
testReadOnlyBuffer, boolean testComp
}
 
NettyMessage.BufferResponse expected = new 
NettyMessage.BufferResponse(
-   testBuffer, random.nextInt(), new InputChannelID(), 
random.nextInt());
-   NettyMessage.BufferResponse actual = encodeAndDecode(expected);
+   testBuffer, random.nextInt(), 
inputChannel.getInputChannelId(), random.nextInt());
+   NettyMessage.BufferResponse actual = encodeAndDecode(expected, 
channel);
 
// Netty 4.1 is not copying the messages, but retaining slices 
of them. BufferResponse actual is in this case
 
 Review comment:
   This comment is not invalid now? because we do not use previous 
`LengthFieldBasedFrameDecoder`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhengcanbin edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design

2020-02-27 Thread GitBox
zhengcanbin edited a comment on issue #11233: [FLINK-16194][k8s] Refactor the 
Kubernetes decorator design
URL: https://github.com/apache/flink/pull/11233#issuecomment-592323880
 
 
   > 
[0001-hotfix-Refactor-subclass-isXXX-to-inherit-methods.txt](https://github.com/apache/flink/files/4263005/0001-hotfix-Refactor-subclass-isXXX-to-inherit-methods.txt)
   > 
   > @zhengcanbin here is a patch for code quality advice that you can take a 
look at. As we discuss previously it is often better to use override methods 
instead of control super classes' logic by subclasses' isXXX method.
   > 
   > Generally changes looks good. I will double check the client side code and 
wait for response from @wangyang0918 and possibly @tillrohrmann .
   
   Hi, @TisonKun , thanks for the comment, a new commit 
[786a505](https://github.com/apache/flink/pull/11233/commits/786a5056c957863c05ad24b00ca1dca032905eb0)
 resolves the problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhengcanbin commented on issue #11233: [FLINK-16194][k8s] Refactor the Kubernetes decorator design

2020-02-27 Thread GitBox
zhengcanbin commented on issue #11233: [FLINK-16194][k8s] Refactor the 
Kubernetes decorator design
URL: https://github.com/apache/flink/pull/11233#issuecomment-592323880
 
 
   > 
[0001-hotfix-Refactor-subclass-isXXX-to-inherit-methods.txt](https://github.com/apache/flink/files/4263005/0001-hotfix-Refactor-subclass-isXXX-to-inherit-methods.txt)
   > 
   > @zhengcanbin here is a patch for code quality advice that you can take a 
look at. As we discuss previously it is often better to use override methods 
instead of control super classes' logic by subclasses' isXXX method.
   > 
   > Generally changes looks good. I will double check the client side code and 
wait for response from @wangyang0918 and possibly @tillrohrmann .
   
   Hi, tison, thanks for the comment, a new commit 
[786a505](https://github.com/apache/flink/pull/11233/commits/786a5056c957863c05ad24b00ca1dca032905eb0)
 resolves the problem.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-16324) Checkpoint tasks progress would display 100% in web UI even not all tasks finished

2020-02-27 Thread Yun Tang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Tang updated FLINK-16324:
-
Description: The checkpoint progress details would display 100% even not 
all tasks finished in web UI. e.g {{996/1000}} would display as {{100%}} .  
(was: The checkpoint progress details would display 100% even not all tasks 
finished in web UI. e.g {{96/100}} would display as {{100%}} .)

> Checkpoint tasks progress would display 100% in web UI even not all tasks 
> finished
> --
>
> Key: FLINK-16324
> URL: https://issues.apache.org/jira/browse/FLINK-16324
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Reporter: Yun Tang
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> The checkpoint progress details would display 100% even not all tasks 
> finished in web UI. e.g {{996/1000}} would display as {{100%}} .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11249: [FLINK-16297] Remove unnecessary indents and blank lines in code blocks

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11249: [FLINK-16297] Remove unnecessary 
indents and blank lines in code blocks
URL: https://github.com/apache/flink/pull/11249#issuecomment-592302102
 
 
   
   ## CI report:
   
   * fe25cef13c65c9f584b53aa7216347ee1b3e8f16 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150955168) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5715)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385514691
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.event.task.IntegerTaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode;
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the serialization and deserialization of the various {@link 
NettyMessage} sub-classes.
+ */
+public class NettyMessageServerSideSerializationTest {
+
+   private final EmbeddedChannel channel = new EmbeddedChannel(
 
 Review comment:
   I not sure sure whether we should close the close finally.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385514533
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.event.task.IntegerTaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode;
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the serialization and deserialization of the various {@link 
NettyMessage} sub-classes.
+ */
+public class NettyMessageServerSideSerializationTest {
+
+   private final EmbeddedChannel channel = new EmbeddedChannel(
+   new NettyMessage.NettyMessageEncoder(), // outbound 
messages
+   new NettyMessage.NettyMessageDecoder()); // inbound 
messages
+
+   private final Random random = new Random();
+
+   @Test
+   public void testEncodeDecode() {
+   {
+   NettyMessage.PartitionRequest expected = new 
NettyMessage.PartitionRequest(new ResultPartitionID(), random.nextInt(), new 
InputChannelID(), random.nextInt());
 
 Review comment:
   nit: we should make every argument in separate line 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385514390
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.event.task.IntegerTaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode;
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the serialization and deserialization of the various {@link 
NettyMessage} sub-classes.
+ */
+public class NettyMessageServerSideSerializationTest {
+
+   private final EmbeddedChannel channel = new EmbeddedChannel(
+   new NettyMessage.NettyMessageEncoder(), // outbound 
messages
+   new NettyMessage.NettyMessageDecoder()); // inbound 
messages
+
+   private final Random random = new Random();
+
+   @Test
+   public void testEncodeDecode() {
+   {
+   NettyMessage.PartitionRequest expected = new 
NettyMessage.PartitionRequest(new ResultPartitionID(), random.nextInt(), new 
InputChannelID(), random.nextInt());
 
 Review comment:
   I prefer to making every following message as a separate test, then every 
test is short and easy to maintain. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385513879
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageServerSideSerializationTest.java
 ##
 @@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.event.task.IntegerTaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.io.network.netty.NettyTestUtil.encodeAndDecode;
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the serialization and deserialization of the various {@link 
NettyMessage} sub-classes.
+ */
+public class NettyMessageServerSideSerializationTest {
+
+   private final EmbeddedChannel channel = new EmbeddedChannel(
+   new NettyMessage.NettyMessageEncoder(), // outbound 
messages
 
 Review comment:
   outbound messages -> for outbound messages


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-16324) Checkpoint tasks progress would display 100% in web UI even not all tasks finished

2020-02-27 Thread Yun Tang (Jira)
Yun Tang created FLINK-16324:


 Summary: Checkpoint tasks progress would display 100% in web UI 
even not all tasks finished
 Key: FLINK-16324
 URL: https://issues.apache.org/jira/browse/FLINK-16324
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Reporter: Yun Tang
 Fix For: 1.10.1, 1.11.0


The checkpoint progress details would display 100% even not all tasks finished 
in web UI. e.g {{96/100}} would display as {{100%}} .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16324) Checkpoint tasks progress would display 100% in web UI even not all tasks finished

2020-02-27 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047216#comment-17047216
 ] 

Yun Tang commented on FLINK-16324:
--

CC [~vthinkxie]

> Checkpoint tasks progress would display 100% in web UI even not all tasks 
> finished
> --
>
> Key: FLINK-16324
> URL: https://issues.apache.org/jira/browse/FLINK-16324
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Web Frontend
>Reporter: Yun Tang
>Priority: Major
> Fix For: 1.10.1, 1.11.0
>
>
> The checkpoint progress details would display 100% even not all tasks 
> finished in web UI. e.g {{96/100}} would display as {{100%}} .



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-15400) elasticsearch sink support dynamic index.

2020-02-27 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-15400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17047214#comment-17047214
 ] 

Jark Wu commented on FLINK-15400:
-

Assigned to [~Leonard Xu]. 

Hi [~ouyangwuli] you can help to review the PR opened by Leonard to see whether 
it meets your requirements, if you have time. 

> elasticsearch sink support dynamic index.
> -
>
> Key: FLINK-15400
> URL: https://issues.apache.org/jira/browse/FLINK-15400
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / ElasticSearch, Table SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: ouyangwulin
>Assignee: Leonard Xu
>Priority: Major
>  Labels: usability
> Fix For: 1.11.0
>
>
> From 
> user...@flink.apache.org([https://lists.apache.org/thread.html/ac4e0c068baeb3b070f0213a2e1314e6552b226b8132a4c49d667ecd%40%3Cuser-zh.flink.apache.org%3E]),
>   Becuase the es 6/7 not support ttl. so User need clean the index by 
> timestamp. Add dynamic index is a useful function.  Add with properties 
> 'dynamicIndex' as a switch for open dynamicIndex. Add with  properties 
> 'indexField'  for the extract time field, Add properties 'indexInterval' for 
> change cycle mode.
>  
> ||With property||discribe||default||Required||
> |dynamicIndex|Dynamic or not|false(true/false)|false|
> |indexField|extract index field| none|dynamicIndex is true , then indexField 
> is required,only supported type "timestamp","date","long" |
> |indexInterval|mode for  cycle|d|ddynamicIndex is true , this field is 
> required ,the value optional is:
>  d:day
>  m:mouth
>  w:week|
>  
> After discussion, the final design looks as following :
> {code:java}
> CREATE TABLE es_sink_table (
>   log_source varchar ,
>   log_content varchar ,
>   log_level bigint ,
>   log_ts timestamp,
> ) WITH (
> 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',
> 'connector.index'='my-log-{log_ts|-MM-dd}', 
> # elasticsearch index name, Flink support create index based on field at 
> # runtime dynamically, the index value comes from the dynamicIndex 
> # pattern when the field type is varchar, eg:'my-log-{log_source}',the # 
> dynamicIndex pattern support format and parse date by Java 
> # SimpleDataFormat when the field type is timestamp/date, 
> # eg:'my-log-{log_ts|-MM-dd}'.
> 'connector.index-alias'='my-log',       
> # index alias name, the alias name mapping to all indies that 
> # creatd from 'connector.index'. 
> …
> )
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (FLINK-16170) SearchTemplateRequest ClassNotFoundException when use flink-sql-connector-elasticsearch7

2020-02-27 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-16170:
---

Assignee: Leonard Xu

> SearchTemplateRequest ClassNotFoundException when use 
> flink-sql-connector-elasticsearch7
> 
>
> Key: FLINK-16170
> URL: https://issues.apache.org/jira/browse/FLINK-16170
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / ElasticSearch
>Reporter: Jark Wu
>Assignee: Leonard Xu
>Priority: Blocker
> Fix For: 1.10.1
>
>
> When run SQL CLI with elasticsearch7, when running a query insert into 
> elasticsearch, a 
> SearchTemplateRequest ClassNotFoundException will be thrown.
> {code:java}
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>   at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>   at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/elasticsearch7/shaded/org/elasticsearch/script/mustache/SearchTemplateRequest
>   at 
> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:76)
>   at 
> org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7ApiCallBridge.createClient(Elasticsearch7ApiCallBridge.java:48)
>   at 
> org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:299)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>   at 
> 

[jira] [Assigned] (FLINK-15400) elasticsearch sink support dynamic index.

2020-02-27 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu reassigned FLINK-15400:
---

Assignee: Leonard Xu

> elasticsearch sink support dynamic index.
> -
>
> Key: FLINK-15400
> URL: https://issues.apache.org/jira/browse/FLINK-15400
> Project: Flink
>  Issue Type: New Feature
>  Components: Connectors / ElasticSearch, Table SQL / Ecosystem
>Affects Versions: 1.11.0
>Reporter: ouyangwulin
>Assignee: Leonard Xu
>Priority: Major
>  Labels: usability
> Fix For: 1.11.0
>
>
> From 
> user...@flink.apache.org([https://lists.apache.org/thread.html/ac4e0c068baeb3b070f0213a2e1314e6552b226b8132a4c49d667ecd%40%3Cuser-zh.flink.apache.org%3E]),
>   Becuase the es 6/7 not support ttl. so User need clean the index by 
> timestamp. Add dynamic index is a useful function.  Add with properties 
> 'dynamicIndex' as a switch for open dynamicIndex. Add with  properties 
> 'indexField'  for the extract time field, Add properties 'indexInterval' for 
> change cycle mode.
>  
> ||With property||discribe||default||Required||
> |dynamicIndex|Dynamic or not|false(true/false)|false|
> |indexField|extract index field| none|dynamicIndex is true , then indexField 
> is required,only supported type "timestamp","date","long" |
> |indexInterval|mode for  cycle|d|ddynamicIndex is true , this field is 
> required ,the value optional is:
>  d:day
>  m:mouth
>  w:week|
>  
> After discussion, the final design looks as following :
> {code:java}
> CREATE TABLE es_sink_table (
>   log_source varchar ,
>   log_content varchar ,
>   log_level bigint ,
>   log_ts timestamp,
> ) WITH (
> 'connector.type' = 'elasticsearch',
> 'connector.version' = '6',
> 'connector.index'='my-log-{log_ts|-MM-dd}', 
> # elasticsearch index name, Flink support create index based on field at 
> # runtime dynamically, the index value comes from the dynamicIndex 
> # pattern when the field type is varchar, eg:'my-log-{log_source}',the # 
> dynamicIndex pattern support format and parse date by Java 
> # SimpleDataFormat when the field type is timestamp/date, 
> # eg:'my-log-{log_ts|-MM-dd}'.
> 'connector.index-alias'='my-log',       
> # index alias name, the alias name mapping to all indies that 
> # creatd from 'connector.index'. 
> …
> )
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-16323) Support to join a static table in streaming mode

2020-02-27 Thread Jark Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16323?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-16323:

Fix Version/s: 1.11.0

> Support to join a static table in streaming mode
> 
>
> Key: FLINK-16323
> URL: https://issues.apache.org/jira/browse/FLINK-16323
> Project: Flink
>  Issue Type: New Feature
>  Components: Table SQL / API, Table SQL / Planner
>Reporter: Jark Wu
>Priority: Major
> Fix For: 1.11.0
>
>
> Currently, we already support to join a stream and a bounded stream using 
> reguar join. However, this will be tranlsated into stream-stream join which 
> is not efficient, because it will output early results and maybe retracted 
> afterwards. 
> A better and native support will be using a special temporal join operator 
> which will block the streaming side until all the static table data is 
> loaded. 
> This can help users to join a huge table, e.g. MySQL table with billion 
> records but changes little.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16323) Support to join a static table in streaming mode

2020-02-27 Thread Jark Wu (Jira)
Jark Wu created FLINK-16323:
---

 Summary: Support to join a static table in streaming mode
 Key: FLINK-16323
 URL: https://issues.apache.org/jira/browse/FLINK-16323
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / API, Table SQL / Planner
Reporter: Jark Wu


Currently, we already support to join a stream and a bounded stream using 
reguar join. However, this will be tranlsated into stream-stream join which is 
not efficient, because it will output early results and maybe retracted 
afterwards. 

A better and native support will be using a special temporal join operator 
which will block the streaming side until all the static table data is loaded. 

This can help users to join a huge table, e.g. MySQL table with billion records 
but changes little.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers 
recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   
   ## CI report:
   
   * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385507440
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
 ##
 @@ -120,10 +120,14 @@
 * @return channel handlers
 */
public ChannelHandler[] getClientChannelHandlers() {
-   return new ChannelHandler[] {
-   messageEncoder,
-   new NettyMessage.NettyMessageDecoder(),
-   new CreditBasedPartitionRequestClientHandler()};
+   CreditBasedPartitionRequestClientHandler 
networkClientHandler = new CreditBasedPartitionRequestClientHandler();
+   NettyMessageClientDecoderDelegate 
nettyMessageClientDecoderDelegate
+   = new 
NettyMessageClientDecoderDelegate(networkClientHandler);
+
+   return new ChannelHandler[] {
+   messageEncoder,
+   nettyMessageClientDecoderDelegate,
 
 Review comment:
   nit: `new NettyMessageClientDecoderDelegate(networkClientHandler)` instead, 
avoid above too long definition.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385507196
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
 ##
 @@ -120,10 +120,14 @@
 * @return channel handlers
 */
public ChannelHandler[] getClientChannelHandlers() {
-   return new ChannelHandler[] {
-   messageEncoder,
-   new NettyMessage.NettyMessageDecoder(),
-   new CreditBasedPartitionRequestClientHandler()};
+   CreditBasedPartitionRequestClientHandler 
networkClientHandler = new CreditBasedPartitionRequestClientHandler();
 
 Review comment:
   CreditBasedPartitionRequestClientHandler ->NetworkClientHandler


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385507240
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyProtocol.java
 ##
 @@ -120,10 +120,14 @@
 * @return channel handlers
 */
public ChannelHandler[] getClientChannelHandlers() {
-   return new ChannelHandler[] {
-   messageEncoder,
-   new NettyMessage.NettyMessageDecoder(),
-   new CreditBasedPartitionRequestClientHandler()};
+   CreditBasedPartitionRequestClientHandler 
networkClientHandler = new CreditBasedPartitionRequestClientHandler();
+   NettyMessageClientDecoderDelegate 
nettyMessageClientDecoderDelegate
+   = new 
NettyMessageClientDecoderDelegate(networkClientHandler);
+
+   return new ChannelHandler[] {
+   messageEncoder,
 
 Review comment:
   unify indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce 
ArrowPythonScalarFunctionOperator for vectorized Python UDF execution
URL: https://github.com/apache/flink/pull/11208#issuecomment-590746668
 
 
   
   ## CI report:
   
   * 2457edcc5d060582b0033741320daa63dff68b0e Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/150947684) 
   * 31226c452d50eb216796a3c1b963f047bb2c5698 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150957094) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce ArrowPythonScalarFunctionOperator for vectorized Python UDF execution

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11208: [FLINK-16271][python] Introduce 
ArrowPythonScalarFunctionOperator for vectorized Python UDF execution
URL: https://github.com/apache/flink/pull/11208#issuecomment-590746668
 
 
   
   ## CI report:
   
   * 2457edcc5d060582b0033741320daa63dff68b0e Travis: 
[FAILURE](https://travis-ci.com/flink-ci/flink/builds/150947684) 
   * 31226c452d50eb216796a3c1b963f047bb2c5698 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11247: [FLINK-16262][Connectors] Set the context classloader for parallel stream in FlinkKafkaProducer

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11247: [FLINK-16262][Connectors] Set the 
context classloader for parallel stream in FlinkKafkaProducer
URL: https://github.com/apache/flink/pull/11247#issuecomment-592026056
 
 
   
   ## CI report:
   
   * ff5293a8f39aaef7a457e9720f7174e1982a928d UNKNOWN
   * 4fd63bfdc08cba777c9f476de7b0f7cbeddf294f Travis: 
[CANCELED](https://travis-ci.com/flink-ci/flink/builds/150948725) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5712)
 
   * 2843e7da6dacf7869b570049a769826e4a4673be UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers 
recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   
   ## CI report:
   
   * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385502888
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 ##
 @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate 
inputGate, NetworkBufferPool
/**
 * Returns a deserialized buffer message as it would be received during 
runtime.
 */
-   private static BufferResponse createBufferResponse(
+   private BufferResponse createBufferResponse(
Buffer buffer,
int sequenceNumber,
-   InputChannelID receivingChannelId,
-   int backlog) throws IOException {
+   RemoteInputChannel receivingChannel,
+   int backlog,
+   CreditBasedPartitionRequestClientHandler clientHandler) 
throws IOException {
+
// Mock buffer to serialize
-   BufferResponse resp = new BufferResponse(buffer, 
sequenceNumber, receivingChannelId, backlog);
+   BufferResponse resp = new BufferResponse(
+   buffer,
+   sequenceNumber,
+   receivingChannel.getInputChannelId(),
+   backlog);
 
ByteBuf serialized = 
resp.write(UnpooledByteBufAllocator.DEFAULT);
 
// Skip general header bytes
serialized.readBytes(NettyMessage.FRAME_HEADER_LENGTH);
 
+
// Deserialize the bytes again. We have to go this way, because 
we only partly deserialize
// the header of the response and wait for a buffer from the 
buffer pool to copy the payload
// data into.
-   BufferResponse deserialized = 
BufferResponse.readFrom(serialized);
+   NetworkBufferAllocator allocator = new 
NetworkBufferAllocator(clientHandler);
 
 Review comment:
   It seems better to pass `NetworkBufferAllocator` in the argument. Assuming 
we create multiple `BufferResponse` in one test, it is not necessary to create 
allocator for every `BufferResponse`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385502524
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 ##
 @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate 
inputGate, NetworkBufferPool
/**
 * Returns a deserialized buffer message as it would be received during 
runtime.
 */
-   private static BufferResponse createBufferResponse(
+   private BufferResponse createBufferResponse(
Buffer buffer,
int sequenceNumber,
-   InputChannelID receivingChannelId,
-   int backlog) throws IOException {
+   RemoteInputChannel receivingChannel,
+   int backlog,
+   CreditBasedPartitionRequestClientHandler clientHandler) 
throws IOException {
+
// Mock buffer to serialize
-   BufferResponse resp = new BufferResponse(buffer, 
sequenceNumber, receivingChannelId, backlog);
+   BufferResponse resp = new BufferResponse(
+   buffer,
+   sequenceNumber,
+   receivingChannel.getInputChannelId(),
+   backlog);
 
ByteBuf serialized = 
resp.write(UnpooledByteBufAllocator.DEFAULT);
 
// Skip general header bytes
serialized.readBytes(NettyMessage.FRAME_HEADER_LENGTH);
 
+
// Deserialize the bytes again. We have to go this way, because 
we only partly deserialize
// the header of the response and wait for a buffer from the 
buffer pool to copy the payload
// data into.
-   BufferResponse deserialized = 
BufferResponse.readFrom(serialized);
+   NetworkBufferAllocator allocator = new 
NetworkBufferAllocator(clientHandler);
+   BufferResponse deserialized = 
BufferResponse.readFrom(serialized, allocator);
+
+   if (deserialized.getBuffer() != null) {
 
 Review comment:
   I guess it is not necessary to write bytes into buffer, because we never 
read and verify the respective buffer in tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385501156
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 ##
 @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate 
inputGate, NetworkBufferPool
/**
 * Returns a deserialized buffer message as it would be received during 
runtime.
 */
-   private static BufferResponse createBufferResponse(
+   private BufferResponse createBufferResponse(
Buffer buffer,
int sequenceNumber,
-   InputChannelID receivingChannelId,
-   int backlog) throws IOException {
+   RemoteInputChannel receivingChannel,
+   int backlog,
+   CreditBasedPartitionRequestClientHandler clientHandler) 
throws IOException {
+
// Mock buffer to serialize
-   BufferResponse resp = new BufferResponse(buffer, 
sequenceNumber, receivingChannelId, backlog);
+   BufferResponse resp = new BufferResponse(
 
 Review comment:
   not necessary change if still use previous `receivingChannelId`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] Let Netty use Flink's buffers directly in credit-based mode

2020-02-27 Thread GitBox
zhijiangW commented on a change in pull request #7368: [FLINK-10742][network] 
Let Netty use Flink's buffers directly in credit-based mode
URL: https://github.com/apache/flink/pull/7368#discussion_r385500959
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 ##
 @@ -430,23 +431,35 @@ private static void releaseResource(SingleInputGate 
inputGate, NetworkBufferPool
/**
 * Returns a deserialized buffer message as it would be received during 
runtime.
 */
-   private static BufferResponse createBufferResponse(
+   private BufferResponse createBufferResponse(
Buffer buffer,
int sequenceNumber,
-   InputChannelID receivingChannelId,
-   int backlog) throws IOException {
+   RemoteInputChannel receivingChannel,
 
 Review comment:
   It is better to still use previous `receivingChannelId` instead, because we 
actually only need it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers recovered from previous attempt in w…

2020-02-27 Thread GitBox
flinkbot edited a comment on issue #11248: [FLINK-16299] Release containers 
recovered from previous attempt in w…
URL: https://github.com/apache/flink/pull/11248#issuecomment-592302068
 
 
   
   ## CI report:
   
   * 401e60a289f63177b3c59e0dd9b5b600a929adb7 Travis: 
[PENDING](https://travis-ci.com/flink-ci/flink/builds/150955151) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=5714)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   7   >