[GitHub] [flink] flinkbot edited a comment on issue #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11541: [FLINK-15416][network] add task 
manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#issuecomment-604812212
 
 
   
   ## CI report:
   
   * b5ef19285000d692d6b1b8b0aa7a37ce35c216f5 UNKNOWN
   * baf2c55f7bb0b9df7bdef94083c947693b75eae2 UNKNOWN
   * e5419f41a18eaae05814837d679f58480a2cb94c UNKNOWN
   * 7346f3729147e2b94cbf145fbea2dcd3c3332e19 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159173715) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7177)
 
   * 8e2da05d8e34fe5f8d7b86e849b0df1275579242 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on issue #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
HuangZhenQiu commented on issue #11541: [FLINK-15416][network] add task manager 
netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#issuecomment-611885676
 
 
   @zhijiangW Appreciate your kind review. It is really helpful to find out the 
bug and improvements.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #11541: 
[FLINK-15416][network] add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406609291
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -120,6 +127,40 @@ public void closeOpenChannelConnections(ConnectionID 
connectionId) {
}
}
 
+   void connectChannelWithRetry(ConnectingChannel connectingChannel, 
ConnectionID connectionId)
 
 Review comment:
   Agree. Updated according.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #11541: 
[FLINK-15416][network] add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406609362
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -120,6 +127,40 @@ public void closeOpenChannelConnections(ConnectionID 
connectionId) {
}
}
 
+   void connectChannelWithRetry(ConnectingChannel connectingChannel, 
ConnectionID connectionId)
+   throws IOException, InterruptedException {
+   int count = 0;
+   Exception exception = null;
+   do {
+   try {
+   LOG.info("Connecting to {} at {} attempt", 
connectionId.getAddress(), count);
+   
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
+
+   NettyPartitionRequestClient client = 
connectingChannel.waitForChannel();
+
+   clients.replace(connectionId, 
connectingChannel, client);
+   exception = null;
 
 Review comment:
   Yes, it should return directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #11541: 
[FLINK-15416][network] add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406609258
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -99,7 +106,7 @@ else if (old instanceof ConnectingChannel) {
 
// Make sure to increment the reference count before 
handing a client
// out to ensure correct bookkeeping for channel 
closing.
-   if (!client.incrementReferenceCounter()) {
+   if (client != null && 
!client.incrementReferenceCounter()) {
 
 Review comment:
   No need. Mistakenly added.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11351: [FLINK-16404][runtime] Avoid caching buffers for blocked input channels before barrier alignment

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11351: [FLINK-16404][runtime] 
Avoid caching buffers for blocked input channels before barrier alignment
URL: https://github.com/apache/flink/pull/11351#discussion_r406609085
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
 ##
 @@ -90,7 +111,11 @@ else if (obj != null && obj.getClass() == 
CheckpointOptions.class) {
 
@Override
public String toString() {
-   return "CheckpointOptions: " + checkpointType + " @ " + 
targetLocation;
+   return "CheckpointOptions{" +
 
 Review comment:
   nit: whitespace for `{` and `=`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11351: [FLINK-16404][runtime] Avoid caching buffers for blocked input channels before barrier alignment

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11351: [FLINK-16404][runtime] 
Avoid caching buffers for blocked input channels before barrier alignment
URL: https://github.com/apache/flink/pull/11351#discussion_r406609183
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
 ##
 @@ -42,12 +43,23 @@
/** Target location for the checkpoint. */
private final CheckpointStorageLocationReference targetLocation;
 
+   private final boolean isExactlyOnceMode;
+
+   @VisibleForTesting
+   public CheckpointOptions(
+   CheckpointType checkpointType,
 
 Review comment:
   nit: increase indentation for arguments 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #11541: 
[FLINK-15416][network] add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406609157
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -81,16 +92,12 @@ NettyPartitionRequestClient 
createPartitionRequestClient(ConnectionID connection
Object old = clients.putIfAbsent(connectionId, 
connectingChannel);
 
if (old == null) {
-   
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
-
-   client = 
connectingChannel.waitForChannel();
-
-   clients.replace(connectionId, 
connectingChannel, client);
+   
connectChannelWithRetry(connectingChannel, connectionId);
+   client = (NettyPartitionRequestClient) 
clients.get(connectionId);
}
else if (old instanceof ConnectingChannel) {
-   client = ((ConnectingChannel) 
old).waitForChannel();
-
-   clients.replace(connectionId, old, 
client);
+   
connectChannelWithRetry((ConnectingChannel) old, connectionId);
 
 Review comment:
   Good catch. It is a bug. There should be a flag in parameter to check 
whether to connect or not.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] HuangZhenQiu commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
HuangZhenQiu commented on a change in pull request #11541: 
[FLINK-15416][network] add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406609040
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -81,16 +92,12 @@ NettyPartitionRequestClient 
createPartitionRequestClient(ConnectionID connection
Object old = clients.putIfAbsent(connectionId, 
connectingChannel);
 
if (old == null) {
 
 Review comment:
   As the race condition only happens among the subpartitions for the same 
channel, the syncronized lock is probably too heavy. If considering a job with 
256 tms, 255 connection needs to build. The lock will introduce a longer job 
initialization time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11351: [FLINK-16404][runtime] Avoid caching buffers for blocked input channels before barrier alignment

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11351: [FLINK-16404][runtime] 
Avoid caching buffers for blocked input channels before barrier alignment
URL: https://github.com/apache/flink/pull/11351#discussion_r406606270
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
 ##
 @@ -114,7 +114,7 @@ private boolean add(BufferConsumer bufferConsumer, boolean 
finish) {
buffers.add(bufferConsumer);
updateStatistics(bufferConsumer);
increaseBuffersInBacklog(bufferConsumer);
-   notifyDataAvailable = shouldNotifyDataAvailable() || 
finish;
+   notifyDataAvailable = shouldNotifyDataAvailable();
 
 Review comment:
   If this condition is actually redundant irrelevant with this PR, then we can 
create a separate ticket for removing it.
   I am not sure whether there were any considerations for adding this tag 
before and might impact behaviors, then it is better for a separate ticket for 
checking instead of a hotfix commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11682: [FLINK-16668][python] Refactor PythonDriver to read python dependency info both from command line and environment configuration.

2020-04-09 Thread GitBox
dianfu commented on a change in pull request #11682: [FLINK-16668][python] 
Refactor PythonDriver to read python dependency info both from command line and 
environment configuration.
URL: https://github.com/apache/flink/pull/11682#discussion_r406589029
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
 ##
 @@ -18,7 +18,10 @@
 
 package org.apache.flink.client.python;
 
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.cli.CliArgsException;
 
 Review comment:
   unused import


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11682: [FLINK-16668][python] Refactor PythonDriver to read python dependency info both from command line and environment configuration.

2020-04-09 Thread GitBox
dianfu commented on a change in pull request #11682: [FLINK-16668][python] 
Refactor PythonDriver to read python dependency info both from command line and 
environment configuration.
URL: https://github.com/apache/flink/pull/11682#discussion_r406588477
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -159,7 +159,7 @@
"py37.zip/py37/bin/python). The data files could be 
accessed in Python UDF, e.g.: " +
"f = open('data/data.txt', 'r').");
 
-   static final Option PYEXEC_OPTION = new Option("pyexec", 
"pyExecutable", true,
+   public static final Option PYEXEC_OPTION = new Option("pyexec", 
"pyExecutable", true,
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11682: [FLINK-16668][python] Refactor PythonDriver to read python dependency info both from command line and environment configuration.

2020-04-09 Thread GitBox
dianfu commented on a change in pull request #11682: [FLINK-16668][python] 
Refactor PythonDriver to read python dependency info both from command line and 
environment configuration.
URL: https://github.com/apache/flink/pull/11682#discussion_r406588349
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -124,29 +124,29 @@
public static final Option STOP_AND_DRAIN = new Option("d", "drain", 
false,
"Send MAX_WATERMARK before taking the savepoint and 
stopping the pipelne.");
 
-   static final Option PY_OPTION = new Option("py", "python", true,
+   public static final Option PY_OPTION = new Option("py", "python", true,
"Python script with the program entry point. " +
"The dependent resources can be configured with the 
`--pyFiles` option.");
 
-   static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true,
+   public static final Option PYFILES_OPTION = new Option("pyfs", 
"pyFiles", true,
 
 Review comment:
   unnecessary change


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11682: [FLINK-16668][python] Refactor PythonDriver to read python dependency info both from command line and environment configuration.

2020-04-09 Thread GitBox
dianfu commented on a change in pull request #11682: [FLINK-16668][python] 
Refactor PythonDriver to read python dependency info both from command line and 
environment configuration.
URL: https://github.com/apache/flink/pull/11682#discussion_r406588450
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -124,29 +124,29 @@
public static final Option STOP_AND_DRAIN = new Option("d", "drain", 
false,
"Send MAX_WATERMARK before taking the savepoint and 
stopping the pipelne.");
 
-   static final Option PY_OPTION = new Option("py", "python", true,
+   public static final Option PY_OPTION = new Option("py", "python", true,
"Python script with the program entry point. " +
"The dependent resources can be configured with the 
`--pyFiles` option.");
 
-   static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true,
+   public static final Option PYFILES_OPTION = new Option("pyfs", 
"pyFiles", true,
"Attach custom python files for job. " +
"These files will be added to the PYTHONPATH of both 
the local client and the remote python UDF worker. " +
"The standard python resource file suffixes such as 
.py/.egg/.zip or directory are all supported. " +
"Comma (',') could be used as the separator to specify 
multiple files " +
"(e.g.: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).");
 
-   static final Option PYMODULE_OPTION = new Option("pym", "pyModule", 
true,
+   public static final Option PYMODULE_OPTION = new Option("pym", 
"pyModule", true,
"Python module with the program entry point. " +
"This option must be used in conjunction with 
`--pyFiles`.");
 
-   static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", 
"pyRequirements", true,
+   public static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", 
"pyRequirements", true,
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11682: [FLINK-16668][python] Refactor PythonDriver to read python dependency info both from command line and environment configuration.

2020-04-09 Thread GitBox
dianfu commented on a change in pull request #11682: [FLINK-16668][python] 
Refactor PythonDriver to read python dependency info both from command line and 
environment configuration.
URL: https://github.com/apache/flink/pull/11682#discussion_r406588461
 
 

 ##
 File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 ##
 @@ -124,29 +124,29 @@
public static final Option STOP_AND_DRAIN = new Option("d", "drain", 
false,
"Send MAX_WATERMARK before taking the savepoint and 
stopping the pipelne.");
 
-   static final Option PY_OPTION = new Option("py", "python", true,
+   public static final Option PY_OPTION = new Option("py", "python", true,
"Python script with the program entry point. " +
"The dependent resources can be configured with the 
`--pyFiles` option.");
 
-   static final Option PYFILES_OPTION = new Option("pyfs", "pyFiles", true,
+   public static final Option PYFILES_OPTION = new Option("pyfs", 
"pyFiles", true,
"Attach custom python files for job. " +
"These files will be added to the PYTHONPATH of both 
the local client and the remote python UDF worker. " +
"The standard python resource file suffixes such as 
.py/.egg/.zip or directory are all supported. " +
"Comma (',') could be used as the separator to specify 
multiple files " +
"(e.g.: --pyFiles 
file:///tmp/myresource.zip,hdfs:///$namenode_address/myresource2.zip).");
 
-   static final Option PYMODULE_OPTION = new Option("pym", "pyModule", 
true,
+   public static final Option PYMODULE_OPTION = new Option("pym", 
"pyModule", true,
"Python module with the program entry point. " +
"This option must be used in conjunction with 
`--pyFiles`.");
 
-   static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", 
"pyRequirements", true,
+   public static final Option PYREQUIREMENTS_OPTION = new Option("pyreq", 
"pyRequirements", true,
"Specify a requirements.txt file which defines the third-party 
dependencies. " +
"These dependencies will be installed and added to the 
PYTHONPATH of the python UDF worker. " +
"A directory which contains the installation packages 
of these dependencies could be specified " +
"optionally. Use '#' as the separator if the optional 
parameter exists " +
"(e.g.: --pyRequirements 
file:///tmp/requirements.txt#file:///tmp/cached_dir).");
 
-   static final Option PYARCHIVE_OPTION = new Option("pyarch", 
"pyArchives", true,
+   public static final Option PYARCHIVE_OPTION = new Option("pyarch", 
"pyArchives", true,
 
 Review comment:
   ditto


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11682: [FLINK-16668][python] Refactor PythonDriver to read python dependency info both from command line and environment configuration.

2020-04-09 Thread GitBox
dianfu commented on a change in pull request #11682: [FLINK-16668][python] 
Refactor PythonDriver to read python dependency info both from command line and 
environment configuration.
URL: https://github.com/apache/flink/pull/11682#discussion_r406600893
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonDriverEnvUtils.java
 ##
 @@ -226,6 +186,48 @@ public FileVisitResult visitFile(java.nio.file.Path file, 
BasicFileAttributes at
return libFiles;
}
 
+   private static void appendUserFiles(PythonEnvironment env, List 
userFileList) throws IOException {
+   List pythonPathList = new ArrayList<>();
+   Path tmpDirPath = new Path(env.tempDirectory);
+
+   for (Path pythonFile : userFileList) {
+   String sourceFileName = pythonFile.getName();
+   // add random UUID parent directory to avoid name 
conflict.
+   Path targetPath = new Path(
+   tmpDirPath,
+   String.join(File.separator, 
UUID.randomUUID().toString(), sourceFileName));
+   try {
+   if 
(!pythonFile.getFileSystem().isDistributedFS()) {
+   if 
(!pythonFile.getFileSystem().exists(pythonFile)) {
+   LOG.error(pythonFile + " not 
exists, skipping...");
+   continue;
+   }
+   // if the path is local file, try to 
create symbolic link.
+   new 
File(targetPath.getParent().toString()).mkdir();
+   createSymbolicLinkForPyflinkLib(
+   Paths.get(new 
File(pythonFile.getPath()).getAbsolutePath()),
+   
Paths.get(targetPath.toString()));
+   } else {
+   FileUtils.copy(pythonFile, targetPath, 
true);
+   }
+   } catch (Exception e) {
+   LOG.error("Error occured when copying user 
python files, skipping...", e);
 
 Review comment:
   It seems that we should throw out the exception in this case.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] dianfu commented on a change in pull request #11682: [FLINK-16668][python] Refactor PythonDriver to read python dependency info both from command line and environment configuration.

2020-04-09 Thread GitBox
dianfu commented on a change in pull request #11682: [FLINK-16668][python] 
Refactor PythonDriver to read python dependency info both from command line and 
environment configuration.
URL: https://github.com/apache/flink/pull/11682#discussion_r406591224
 
 

 ##
 File path: 
flink-python/src/main/java/org/apache/flink/client/python/PythonDriverEnvUtils.java
 ##
 @@ -37,34 +36,31 @@
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.python.PythonOptions.PYTHON_CLIENT_EXECUTABLE;
+import static org.apache.flink.python.PythonOptions.PYTHON_FILES;
+import static 
org.apache.flink.python.util.PythonDependencyUtils.FILE_DELIMITER;
 
 /**
  * The util class help to prepare Python env and run the python process.
  */
 public final class PythonDriverEnvUtils {
private static final Logger LOG = 
LoggerFactory.getLogger(PythonDriverEnvUtils.class);
 
-   private static final String FLINK_OPT_DIR = 
System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR);
-
-   private static final String FLINK_OPT_DIR_PYTHON = FLINK_OPT_DIR + 
File.separator + "python";
-
@VisibleForTesting
-   public static final String PYFLINK_PY_FILES = "PYFLINK_PY_FILES";
+   static Map systemEnv = System.getenv();
 
 Review comment:
   Just call `System.getenv()` when need to access the system variables?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11671: [FLINK-17052] [client] Introduce PlanGenerator

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11671: [FLINK-17052] [client] Introduce 
PlanGenerator
URL: https://github.com/apache/flink/pull/11671#issuecomment-610971753
 
 
   
   ## CI report:
   
   * f82a2583fb82c50376de97b616de11db678d88e6 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/159725289) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7308)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-16921) "kubernetes session test" is unstable

2020-04-09 Thread Zhijiang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17080209#comment-17080209
 ] 

Zhijiang commented on FLINK-16921:
--

Another instance: [https://travis-ci.org/github/apache/flink/builds/672997960] 
for  kubernetes session test failure.

It only indicates "

Error from server (BadRequest): container "flink-task-manager" in pod 
"flink-task-manager-67959f4985-fd6tr" is terminated

" in the log, not sure the root cause is the same.

> "kubernetes session test" is unstable
> -
>
> Key: FLINK-16921
> URL: https://issues.apache.org/jira/browse/FLINK-16921
> Project: Flink
>  Issue Type: Bug
>  Components: Deployment / Kubernetes, Tests
>Affects Versions: 1.11.0
>Reporter: Robert Metzger
>Assignee: Yang Wang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
>  Time Spent: 1h
>  Remaining Estimate: 0h
>
> CI: 
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6915=logs=c88eea3b-64a0-564d-0031-9fdcd7b8abee=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5
> I assume some services didn't come up?
> {code}
> Caused by: java.util.concurrent.CompletionException: 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>  Connection refused: /10.1.0.4:30095
> {code}
> Full log
> {code}
> 2020-04-01T09:13:59.0673858Z Successfully built ba628fa7af0d
> 2020-04-01T09:13:59.0726818Z Successfully tagged 
> test_kubernetes_session:latest
> 2020-04-01T09:13:59.2547709Z 
> clusterrolebinding.rbac.authorization.k8s.io/flink-role-binding-default 
> created
> 2020-04-01T09:14:00.0586087Z 2020-04-01 09:14:00,055 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: jobmanager.rpc.address, localhost
> 2020-04-01T09:14:00.0608876Z 2020-04-01 09:14:00,060 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: jobmanager.rpc.port, 6123
> 2020-04-01T09:14:00.0611236Z 2020-04-01 09:14:00,060 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: jobmanager.heap.size, 1024m
> 2020-04-01T09:14:00.0613869Z 2020-04-01 09:14:00,061 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: taskmanager.memory.process.size, 1728m
> 2020-04-01T09:14:00.0616344Z 2020-04-01 09:14:00,061 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2020-04-01T09:14:00.0619384Z 2020-04-01 09:14:00,061 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: parallelism.default, 1
> 2020-04-01T09:14:00.0624467Z 2020-04-01 09:14:00,062 INFO  
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-04-01T09:14:00.9838038Z 2020-04-01 09:14:00,983 INFO  
> org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils [] - The 
> derived from fraction jvm overhead memory (172.800mb (181193935 bytes)) is 
> less than its min value 192.000mb (201326592 bytes), min value will be used 
> instead
> 2020-04-01T09:14:00.9922554Z 2020-04-01 09:14:00,991 INFO  
> org.apache.flink.kubernetes.utils.KubernetesUtils[] - Kubernetes 
> deployment requires a fixed port. Configuration blob.server.port will be set 
> to 6124
> 2020-04-01T09:14:00.9927409Z 2020-04-01 09:14:00,992 INFO  
> org.apache.flink.kubernetes.utils.KubernetesUtils[] - Kubernetes 
> deployment requires a fixed port. Configuration taskmanager.rpc.port will be 
> set to 6122
> 2020-04-01T09:14:01.0587014Z 2020-04-01 09:14:01,058 WARN  
> org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator [] 
> - Found 0 files in directory null/etc/hadoop, skip to mount the Hadoop 
> Configuration ConfigMap.
> 2020-04-01T09:14:01.0592498Z 2020-04-01 09:14:01,059 WARN  
> org.apache.flink.kubernetes.kubeclient.decorators.HadoopConfMountDecorator [] 
> - Found 0 files in directory null/etc/hadoop, skip to create the Hadoop 
> Configuration ConfigMap.
> 2020-04-01T09:14:01.8684880Z 2020-04-01 09:14:01,868 INFO  
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Create 
> flink session cluster flink-native-k8s-session-1 successfully, JobManager Web 
> Interface: http://10.1.0.4:30095
> 2020-04-01T09:14:03.2952029Z Executing WordCount example with default input 
> data set.
> 2020-04-01T09:14:03.2955365Z Use --input to specify file input.
> 2020-04-01T09:15:31.5606577Z 
> 2020-04-01T09:15:31.5610358Z 
> 
> 

[jira] [Closed] (FLINK-16687) Support Python UDF in Java Correlate

2020-04-09 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng closed FLINK-16687.
---
Resolution: Resolved

> Support Python UDF in Java Correlate
> 
>
> Key: FLINK-16687
> URL: https://issues.apache.org/jira/browse/FLINK-16687
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.10.0
>Reporter: mayne wong
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
>  
> I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined 
> a source from element, and use UDF split the string to list.
> raise org.codehaus.commons.compiler.CompileException: Cannot determine simple 
> type name "PythonScalarFunction$0"
> {code:python}
> import os
> from pyflink.table.udf import udf
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink
> @udf(input_types=[DataTypes.STRING()], 
> result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def format_string_to_array(item):
> return item.replace('[', '').replace(']', '').replace(', ', 
> ',').split(',')
> if __name__ == '__main__':
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> st_env = StreamTableEnvironment.create(env)
> result_file = "result.csv"
> if os.path.exists(result_file):
> os.remove(result_file)
> st_env.register_table_sink("result_tab",  CsvTableSink(["id", "url"], 
> [DataTypes.STRING(), DataTypes.STRING()],  result_file))
> st_env.register_function("format_string_to_array", format_string_to_array)
> tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), 
> ("2", "['www.taobao.com']")], ['id', 'urls'])
> st_env.register_table("temp_table", tab)
> st_env.sql_query("Select id, A.url from temp_table, 
> UNNEST(format_string_to_array(temp_table.urls)) AS 
> A(url)").insert_into("result_tab")
> st_env.execute("udf")
> {code}
>  
> When I execute the program, I get the following exception:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at 
> org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at 
> 

[jira] [Commented] (FLINK-16687) Support Python UDF in Java Correlate

2020-04-09 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17080199#comment-17080199
 ] 

Hequn Cheng commented on FLINK-16687:
-

Resolved in 1.11.0 via 492d8fdfbe15cfa79a66d50dbabca981055696bb

> Support Python UDF in Java Correlate
> 
>
> Key: FLINK-16687
> URL: https://issues.apache.org/jira/browse/FLINK-16687
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.10.0
>Reporter: mayne wong
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
>  
> I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined 
> a source from element, and use UDF split the string to list.
> raise org.codehaus.commons.compiler.CompileException: Cannot determine simple 
> type name "PythonScalarFunction$0"
> {code:python}
> import os
> from pyflink.table.udf import udf
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink
> @udf(input_types=[DataTypes.STRING()], 
> result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def format_string_to_array(item):
> return item.replace('[', '').replace(']', '').replace(', ', 
> ',').split(',')
> if __name__ == '__main__':
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> st_env = StreamTableEnvironment.create(env)
> result_file = "result.csv"
> if os.path.exists(result_file):
> os.remove(result_file)
> st_env.register_table_sink("result_tab",  CsvTableSink(["id", "url"], 
> [DataTypes.STRING(), DataTypes.STRING()],  result_file))
> st_env.register_function("format_string_to_array", format_string_to_array)
> tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), 
> ("2", "['www.taobao.com']")], ['id', 'urls'])
> st_env.register_table("temp_table", tab)
> st_env.sql_query("Select id, A.url from temp_table, 
> UNNEST(format_string_to_array(temp_table.urls)) AS 
> A(url)").insert_into("result_tab")
> st_env.execute("udf")
> {code}
>  
> When I execute the program, I get the following exception:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at 
> org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at 

[jira] [Updated] (FLINK-16687) PyFlink Cannot determine simple type name "PythonScalarFunction$0"

2020-04-09 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-16687:

Issue Type: New Feature  (was: Bug)

> PyFlink Cannot determine simple type name "PythonScalarFunction$0"
> --
>
> Key: FLINK-16687
> URL: https://issues.apache.org/jira/browse/FLINK-16687
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.10.0
>Reporter: mayne wong
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
>  
> I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined 
> a source from element, and use UDF split the string to list.
> raise org.codehaus.commons.compiler.CompileException: Cannot determine simple 
> type name "PythonScalarFunction$0"
> {code:python}
> import os
> from pyflink.table.udf import udf
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink
> @udf(input_types=[DataTypes.STRING()], 
> result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def format_string_to_array(item):
> return item.replace('[', '').replace(']', '').replace(', ', 
> ',').split(',')
> if __name__ == '__main__':
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> st_env = StreamTableEnvironment.create(env)
> result_file = "result.csv"
> if os.path.exists(result_file):
> os.remove(result_file)
> st_env.register_table_sink("result_tab",  CsvTableSink(["id", "url"], 
> [DataTypes.STRING(), DataTypes.STRING()],  result_file))
> st_env.register_function("format_string_to_array", format_string_to_array)
> tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), 
> ("2", "['www.taobao.com']")], ['id', 'urls'])
> st_env.register_table("temp_table", tab)
> st_env.sql_query("Select id, A.url from temp_table, 
> UNNEST(format_string_to_array(temp_table.urls)) AS 
> A(url)").insert_into("result_tab")
> st_env.execute("udf")
> {code}
>  
> When I execute the program, I get the following exception:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at 
> org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at 
> 

[jira] [Updated] (FLINK-16687) Support Python UDF in Java Correlate

2020-04-09 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-16687:

Summary: Support Python UDF in Java Correlate  (was: PyFlink Cannot 
determine simple type name "PythonScalarFunction$0")

> Support Python UDF in Java Correlate
> 
>
> Key: FLINK-16687
> URL: https://issues.apache.org/jira/browse/FLINK-16687
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.10.0
>Reporter: mayne wong
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
>  
> I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined 
> a source from element, and use UDF split the string to list.
> raise org.codehaus.commons.compiler.CompileException: Cannot determine simple 
> type name "PythonScalarFunction$0"
> {code:python}
> import os
> from pyflink.table.udf import udf
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink
> @udf(input_types=[DataTypes.STRING()], 
> result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def format_string_to_array(item):
> return item.replace('[', '').replace(']', '').replace(', ', 
> ',').split(',')
> if __name__ == '__main__':
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> st_env = StreamTableEnvironment.create(env)
> result_file = "result.csv"
> if os.path.exists(result_file):
> os.remove(result_file)
> st_env.register_table_sink("result_tab",  CsvTableSink(["id", "url"], 
> [DataTypes.STRING(), DataTypes.STRING()],  result_file))
> st_env.register_function("format_string_to_array", format_string_to_array)
> tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), 
> ("2", "['www.taobao.com']")], ['id', 'urls'])
> st_env.register_table("temp_table", tab)
> st_env.sql_query("Select id, A.url from temp_table, 
> UNNEST(format_string_to_array(temp_table.urls)) AS 
> A(url)").insert_into("result_tab")
> st_env.execute("udf")
> {code}
>  
> When I execute the program, I get the following exception:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at 
> org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at 
> 

[jira] [Updated] (FLINK-16687) Support Python UDF in Java Correlate

2020-04-09 Thread Hequn Cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hequn Cheng updated FLINK-16687:

Fix Version/s: 1.11.0

> Support Python UDF in Java Correlate
> 
>
> Key: FLINK-16687
> URL: https://issues.apache.org/jira/browse/FLINK-16687
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.10.0
>Reporter: mayne wong
>Assignee: Huang Xingbo
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
>  
> I try to run PyFlink UDF with SQL UNNEST, execution of job failed, I defined 
> a source from element, and use UDF split the string to list.
> raise org.codehaus.commons.compiler.CompileException: Cannot determine simple 
> type name "PythonScalarFunction$0"
> {code:python}
> import os
> from pyflink.table.udf import udf
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import StreamTableEnvironment, DataTypes, CsvTableSink
> @udf(input_types=[DataTypes.STRING()], 
> result_type=DataTypes.ARRAY(DataTypes.STRING()))
> def format_string_to_array(item):
> return item.replace('[', '').replace(']', '').replace(', ', 
> ',').split(',')
> if __name__ == '__main__':
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> st_env = StreamTableEnvironment.create(env)
> result_file = "result.csv"
> if os.path.exists(result_file):
> os.remove(result_file)
> st_env.register_table_sink("result_tab",  CsvTableSink(["id", "url"], 
> [DataTypes.STRING(), DataTypes.STRING()],  result_file))
> st_env.register_function("format_string_to_array", format_string_to_array)
> tab = st_env.from_elements([("1", "['www.bing.com', 'www.google.com']"), 
> ("2", "['www.taobao.com']")], ['id', 'urls'])
> st_env.register_table("temp_table", tab)
> st_env.sql_query("Select id, A.url from temp_table, 
> UNNEST(format_string_to_array(temp_table.urls)) AS 
> A(url)").insert_into("result_tab")
> st_env.execute("udf")
> {code}
>  
> When I execute the program, I get the following exception:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute.
> : java.util.concurrent.ExecutionException: 
> org.apache.flink.client.program.ProgramInvocationException: Job failed 
> (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
> at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> at 
> org.apache.flink.table.executor.StreamExecutor.execute(StreamExecutor.java:50)
> at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
> failed (JobID: 5d63838ad2043bf4a5d0bca83623959d)
> at 
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> at 
> 

[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406592247
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -120,6 +127,40 @@ public void closeOpenChannelConnections(ConnectionID 
connectionId) {
}
}
 
+   void connectChannelWithRetry(ConnectingChannel connectingChannel, 
ConnectionID connectionId)
+   throws IOException, InterruptedException {
+   int count = 0;
+   Exception exception = null;
+   do {
+   try {
+   LOG.info("Connecting to {} at {} attempt", 
connectionId.getAddress(), count);
+   
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
+
+   NettyPartitionRequestClient client = 
connectingChannel.waitForChannel();
+
+   clients.replace(connectionId, 
connectingChannel, client);
+   exception = null;
 
 Review comment:
   we should exit the loop here to avoid entering the loop again if no exception


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 merged pull request #11650: [FLINK-16687][python] Support Python UDF in Java Correlate

2020-04-09 Thread GitBox
hequn8128 merged pull request #11650: [FLINK-16687][python] Support Python UDF 
in Java Correlate
URL: https://github.com/apache/flink/pull/11650
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406591777
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -81,16 +92,12 @@ NettyPartitionRequestClient 
createPartitionRequestClient(ConnectionID connection
Object old = clients.putIfAbsent(connectionId, 
connectingChannel);
 
if (old == null) {
-   
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
-
-   client = 
connectingChannel.waitForChannel();
-
-   clients.replace(connectionId, 
connectingChannel, client);
+   
connectChannelWithRetry(connectingChannel, connectionId);
+   client = (NettyPartitionRequestClient) 
clients.get(connectionId);
}
else if (old instanceof ConnectingChannel) {
-   client = ((ConnectingChannel) 
old).waitForChannel();
-
-   clients.replace(connectionId, old, 
client);
+   
connectChannelWithRetry((ConnectingChannel) old, connectionId);
 
 Review comment:
   I have some concerns with this operation. 
`nettyClient.connect(connectionId.getAddress())` should be called only once if 
multiple threads invoking the `createPartitionRequestClient` at the same time. 
But now the `#connect` might be called multiple times by different threads to 
establish multiple physical connections.
   
   E.g. when the first one `clients.putIfAbsent(connectionId, 
connectingChannel)` to enter `if (old == null)` path, then it would call 
`connectChannelWithRetry` to establish a connection.
   
   The second one `clients.putIfAbsent(connectionId, connectingChannel)` to get 
`old instanceof ConnectingChannel` to enter this path, then it also calls 
`connectChannelWithRetry` to establish another connection.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11671: [FLINK-17052] [client] Introduce PlanGenerator

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11671: [FLINK-17052] [client] Introduce 
PlanGenerator
URL: https://github.com/apache/flink/pull/11671#issuecomment-610971753
 
 
   
   ## CI report:
   
   * f82a2583fb82c50376de97b616de11db678d88e6 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/159725289) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7308)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11697: [hotfix][docs] fix some typos in elasticsearch connector

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11697: [hotfix][docs] fix some typos in 
elasticsearch connector
URL: https://github.com/apache/flink/pull/11697#issuecomment-611840689
 
 
   
   ## CI report:
   
   * 11d9d6d466e5bef9f71d3c9f14d237d1ef55b152 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159725321) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to 
the total number of slots
URL: https://github.com/apache/flink/pull/11615#issuecomment-607717339
 
 
   
   ## CI report:
   
   * 2db316d977d0e790de8fa98b27dd219b68abb136 UNKNOWN
   * 0a168d7a43629f9b4aa5741b2a43bad4ae8c2237 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/159726261) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7309)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406589156
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -81,16 +92,12 @@ NettyPartitionRequestClient 
createPartitionRequestClient(ConnectionID connection
Object old = clients.putIfAbsent(connectionId, 
connectingChannel);
 
if (old == null) {
 
 Review comment:
   TBH I do not like the previous way of implementing this logic via 
`ConcurrentHashMap`, because it results in five different `if..else` code paths 
together and some paths are almost very similar. 
   
   Instead if we realize it via an explicit `synchronized` lock outside, then 
the internal logic seems very simple, because we do not need to consider the 
race conditions of connect/wait among multiple threads.  Only one thread would 
actually establish the connection and wait, all the other threads would be 
blocked outside of `synchronized` until the connection established or exception 
after retries.
   
   It seems not get any benefits to have the small scope lock via 
`ConcurrentHashMap`, because this operation will be done only once, not 
frequent operations. But I am not forcing to refactor it in this PR since it 
can be considered as another issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to 
the total number of slots
URL: https://github.com/apache/flink/pull/11615#issuecomment-607717339
 
 
   
   ## CI report:
   
   * 2db316d977d0e790de8fa98b27dd219b68abb136 UNKNOWN
   * 0a168d7a43629f9b4aa5741b2a43bad4ae8c2237 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159726261) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7309)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406583546
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -81,16 +92,12 @@ NettyPartitionRequestClient 
createPartitionRequestClient(ConnectionID connection
Object old = clients.putIfAbsent(connectionId, 
connectingChannel);
 
if (old == null) {
-   
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
-
-   client = 
connectingChannel.waitForChannel();
-
-   clients.replace(connectionId, 
connectingChannel, client);
+   
connectChannelWithRetry(connectingChannel, connectionId);
+   client = (NettyPartitionRequestClient) 
clients.get(connectionId);
}
else if (old instanceof ConnectingChannel) {
 
 Review comment:
   we can merge `if (old == null || old instanceof ConnectionChannel)` because 
the actions are actually the same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406586623
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -120,6 +132,34 @@ public void closeOpenChannelConnections(ConnectionID 
connectionId) {
}
}
 
+   void connectChannelWithRetry(ConnectingChannel connectingChannel, 
ConnectionID connectionId)
 
 Review comment:
   Yes, actually the intellij would give this warning for never used argument.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406586376
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -120,6 +132,34 @@ public void closeOpenChannelConnections(ConnectionID 
connectionId) {
}
}
 
+   void connectChannelWithRetry(ConnectingChannel connectingChannel, 
ConnectionID connectionId)
+   throws IOException, InterruptedException {
+   int count = 0;
+   Exception exception = null;
+   do {
+   try {
+   connectingChannel = new 
ConnectingChannel(connectionId, this);
 
 Review comment:
   I see it is already out of loop in latest changes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406585618
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -99,7 +106,7 @@ else if (old instanceof ConnectingChannel) {
 
// Make sure to increment the reference count before 
handing a client
// out to ensure correct bookkeeping for channel 
closing.
-   if (!client.incrementReferenceCounter()) {
+   if (client != null && 
!client.incrementReferenceCounter()) {
 
 Review comment:
   Does the `client` have the possibility to be still `null` here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-17071) Kubernetes session CLI logging output is either misleading or concerning

2020-04-09 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17071?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17080185#comment-17080185
 ] 

Canbin Zheng commented on FLINK-17071:
--

Hi [~chesnay]! Could this be a subtask of FLINK-15856?

> Kubernetes session CLI logging output is either misleading or concerning
> 
>
> Key: FLINK-17071
> URL: https://issues.apache.org/jira/browse/FLINK-17071
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client, Deployment / Kubernetes
>Affects Versions: 1.11.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> When running any command against the KubernetesSessionCLI it prints a log 
> message about having created a session cluster.
> This should certainly not appear when running a stop/help command.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-17070) Kubernetes session CLI ignores stop/help command in detached mode

2020-04-09 Thread Canbin Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17080184#comment-17080184
 ] 

Canbin Zheng commented on FLINK-17070:
--

Hi [~chesnay]! Could this be a subtask of FLINK-15856?

> Kubernetes session CLI ignores stop/help command in detached mode
> -
>
> Key: FLINK-17070
> URL: https://issues.apache.org/jira/browse/FLINK-17070
> Project: Flink
>  Issue Type: Bug
>  Components: Command Line Client, Deployment / Kubernetes
>Affects Versions: 1.11.0
>Reporter: Chesnay Schepler
>Priority: Major
>
> The CLI experience for the KubernetesSessionCli is far from ideal.
> Getting the help information should be easier than
> {code}
> $ echo 'help' | ./bin/kubernetes-session.sh -Dexecution.attached=true
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (FLINK-17078) Logging output is misleading when executing bin/flink -e kubernetes-session without specifying cluster-id

2020-04-09 Thread Canbin Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Canbin Zheng updated FLINK-17078:
-
Parent: FLINK-15856
Issue Type: Sub-task  (was: Bug)

> Logging output is misleading when executing bin/flink -e kubernetes-session 
> without specifying cluster-id
> -
>
> Key: FLINK-17078
> URL: https://issues.apache.org/jira/browse/FLINK-17078
> Project: Flink
>  Issue Type: Sub-task
>  Components: Deployment / Kubernetes
>Reporter: Canbin Zheng
>Priority: Minor
> Fix For: 1.11.0
>
>
> When executing the following command:
> {code:java}
> ./bin/flink run -d -e kubernetes-session 
> examples/streaming/SocketWindowWordCount.jar --hostname 172.16.0.6 --port 
> 12345
> {code}
> The exception stack would be:
> {quote}org.apache.flink.client.program.ProgramInvocationException: The main 
> method caused an error: 
> org.apache.flink.client.deployment.ClusterRetrieveException: Could not get 
> the rest endpoint of flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
>  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
>  at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
>  Caused by: java.lang.RuntimeException: 
> org.apache.flink.client.deployment.ClusterRetrieveException: Could not get 
> the rest endpoint of flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
>  at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:94)
>  at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:118)
>  at 
> org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:59)
>  at 
> org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:63)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1756)
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106)
>  at 
> org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
>  at 
> org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>  ... 8 more
>  Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: 
> Could not get the rest endpoint of 
> flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
>  ... 22 more
> {quote}
> The logging output is misleading, we'd better throw an exception indicating 
> that people should explicitly specify the value of {{kubernetes.cluster-id}}.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to 
the total number of slots
URL: https://github.com/apache/flink/pull/11615#issuecomment-607717339
 
 
   
   ## CI report:
   
   * 97223a194aa83228049de648853725b9728b7ccc Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157719880) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6975)
 
   * 2db316d977d0e790de8fa98b27dd219b68abb136 UNKNOWN
   * 0a168d7a43629f9b4aa5741b2a43bad4ae8c2237 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159726261) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7309)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406583546
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -81,16 +92,12 @@ NettyPartitionRequestClient 
createPartitionRequestClient(ConnectionID connection
Object old = clients.putIfAbsent(connectionId, 
connectingChannel);
 
if (old == null) {
-   
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
-
-   client = 
connectingChannel.waitForChannel();
-
-   clients.replace(connectionId, 
connectingChannel, client);
+   
connectChannelWithRetry(connectingChannel, connectionId);
+   client = (NettyPartitionRequestClient) 
clients.get(connectionId);
}
else if (old instanceof ConnectingChannel) {
 
 Review comment:
   we can merge `if (old == null || old instanceof ConnectionChannel)` because 
the actions are actually the same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406583145
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -120,6 +127,40 @@ public void closeOpenChannelConnections(ConnectionID 
connectionId) {
}
}
 
+   void connectChannelWithRetry(ConnectingChannel connectingChannel, 
ConnectionID connectionId)
+   throws IOException, InterruptedException {
+   int count = 0;
+   Exception exception = null;
+   do {
+   try {
+   LOG.info("Connecting to {} at {} attempt", 
connectionId.getAddress(), count);
+   
nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);
+
+   NettyPartitionRequestClient client = 
connectingChannel.waitForChannel();
+
+   clients.replace(connectionId, 
connectingChannel, client);
+   exception = null;
 
 Review comment:
   nit: this seems redundant, because the initialization of this variable is 
also `null`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406583022
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 ##
 @@ -120,6 +127,40 @@ public void closeOpenChannelConnections(ConnectionID 
connectionId) {
}
}
 
+   void connectChannelWithRetry(ConnectingChannel connectingChannel, 
ConnectionID connectionId)
 
 Review comment:
   nit: can be private if we do not plan to make it access out of this class


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] add task manager netty client retry mechenism

2020-04-09 Thread GitBox
zhijiangW commented on a change in pull request #11541: [FLINK-15416][network] 
add task manager netty client retry mechenism
URL: https://github.com/apache/flink/pull/11541#discussion_r406582084
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 ##
 @@ -234,6 +234,14 @@

.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec")
.withDescription("The Netty client connection 
timeout.");
 
+   @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+   public static final ConfigOption NETWORK_RETRIES =
+   key("taskmanager.network.retries")
 
 Review comment:
   that is fine for me


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-09 Thread GitBox
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] 
Make FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#discussion_r406580787
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -323,6 +330,18 @@ public void testGetCpuCoresNumSlots() {
assertThat(resourceManager.getCpuCores(configuration), is(3.0));
}
 
+   @Test
+   public void testCreateTaskManagerPodFailedAndRetry() throws Exception {
+   flinkKubeClient = spy(createFlinkKubeClient());
+   TestingKubernetesResourceManager testRM = 
createAndStartResourceManager(flinkConfig);
 
 Review comment:
   This could work because we override the `createFlinkKubeClient` in 
`TestingKubernetesResourceManager`. Injecting the `kubeClient` into the 
constructor of `KubernetesResourceManager` also makes sense to me. I will go in 
this way.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-17078) Logging output is misleading when executing bin/flink -e kubernetes-session without specifying cluster-id

2020-04-09 Thread Canbin Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17078?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Canbin Zheng updated FLINK-17078:
-
Description: 
When executing the following command:
{code:java}
./bin/flink run -d -e kubernetes-session 
examples/streaming/SocketWindowWordCount.jar --hostname 172.16.0.6 --port 12345
{code}
The exception stack would be:
{quote}org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the 
rest endpoint of flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
 at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
 Caused by: java.lang.RuntimeException: 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the 
rest endpoint of flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:94)
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:118)
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:59)
 at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:63)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1756)
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106)
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
 at 
org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
 ... 8 more
 Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: Could 
not get the rest endpoint of flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
 ... 22 more
{quote}
The logging output is misleading, we'd better throw an exception indicating 
that people should explicitly specify the value of {{kubernetes.cluster-id}}.

 

 

  was:
When executing the following command:

 
{code:java}
./bin/flink run -d -e kubernetes-session 
examples/streaming/SocketWindowWordCount.jar --hostname 172.16.0.6 --port 12345
{code}
The exception stack would be:
{quote}org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the 
rest endpoint of flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
 at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.RuntimeException: 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the 
rest endpoint of flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:94)
 

[jira] [Created] (FLINK-17078) Logging output is misleading when executing bin/flink -e kubernetes-session without specifying cluster-id

2020-04-09 Thread Canbin Zheng (Jira)
Canbin Zheng created FLINK-17078:


 Summary: Logging output is misleading when executing bin/flink -e 
kubernetes-session without specifying cluster-id
 Key: FLINK-17078
 URL: https://issues.apache.org/jira/browse/FLINK-17078
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Kubernetes
Reporter: Canbin Zheng
 Fix For: 1.11.0


When executing the following command:

 
{code:java}
./bin/flink run -d -e kubernetes-session 
examples/streaming/SocketWindowWordCount.jar --hostname 172.16.0.6 --port 12345
{code}
The exception stack would be:
{quote}org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the 
rest endpoint of flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
 at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148)
 at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
 at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
 at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
 at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.RuntimeException: 
org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the 
rest endpoint of flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.lambda$createClusterClientProvider$0(KubernetesClusterDescriptor.java:94)
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:118)
 at 
org.apache.flink.kubernetes.KubernetesClusterDescriptor.retrieve(KubernetesClusterDescriptor.java:59)
 at 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:63)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1756)
 at 
org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106)
 at 
org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72)
 at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1643)
 at 
org.apache.flink.streaming.examples.socket.SocketWindowWordCount.main(SocketWindowWordCount.java:92)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
 ... 8 more
Caused by: org.apache.flink.client.deployment.ClusterRetrieveException: Could 
not get the rest endpoint of flink-cluster-6fa5f5dc-4b50-48c3-b57f-d91e686fa474
 ... 22 more
{quote}
The logging output is misleading, we'd better throw an exception indicating 
that people should explicitly specify the value of {{kubernetes.cluster-id}}.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-09 Thread GitBox
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] 
Make FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#discussion_r406578583
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -323,6 +330,18 @@ public void testGetCpuCoresNumSlots() {
assertThat(resourceManager.getCpuCores(configuration), is(3.0));
}
 
+   @Test
+   public void testCreateTaskManagerPodFailedAndRetry() throws Exception {
+   flinkKubeClient = spy(createFlinkKubeClient());
 
 Review comment:
   Actually, i just want to reuse the `verify` with timeout in `mockito`. So i 
use the `spy`. But i think your suggestion makes sense, i will use the testing 
implementations instead. Then i think we will need to explicitly wait for some 
time for the retry.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-09 Thread GitBox
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] 
Make FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#discussion_r406578583
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -323,6 +330,18 @@ public void testGetCpuCoresNumSlots() {
assertThat(resourceManager.getCpuCores(configuration), is(3.0));
}
 
+   @Test
+   public void testCreateTaskManagerPodFailedAndRetry() throws Exception {
+   flinkKubeClient = spy(createFlinkKubeClient());
 
 Review comment:
   Actually, i just want to reuse the `verify` with timeout in `mockito`. So i 
use the `spy`. But i think your suggestion makes sense, i will use the testing 
implementations instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-09 Thread GitBox
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] 
Make FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#discussion_r406578227
 
 

 ##
 File path: 
flink-kubernetes/src/test/java/org/apache/flink/kubernetes/KubernetesResourceManagerTest.java
 ##
 @@ -398,4 +421,19 @@ private void terminatePod(Pod pod) {
.build())
.build());
}
+
+   private FlinkKubeClient createFlinkKubeClient() {
+   ExecutorService kubeClientExecutorService = 
Executors.newDirectExecutorService();
+   return new Fabric8FlinkKubeClient(flinkConfig, kubeClient, () 
-> kubeClientExecutorService) {
+   @Override
+   public CompletableFuture 
createTaskManagerPod(KubernetesPod kubernetesPod) {
+   return CompletableFuture.runAsync(
+   () -> {
+   throw new 
RuntimeException("Exception");
+   },
+   kubeClientExecutorService
+   );
 
 Review comment:
   I am not what do you mean here. Since the `createTaskManagerPod` needs to 
return a `CompletableFuture`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] Make FlinkKubeClient and its implementations asynchronous

2020-04-09 Thread GitBox
wangyang0918 commented on a change in pull request #11427: [FLINK-15790][k8s] 
Make FlinkKubeClient and its implementations asynchronous
URL: https://github.com/apache/flink/pull/11427#discussion_r406577957
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##
 @@ -61,158 +61,180 @@
private final String clusterId;
private final String nameSpace;
 
-   public Fabric8FlinkKubeClient(Configuration flinkConfig, 
KubernetesClient client) {
+   private final ExecutorWrapper executorWrapper;
+
+   public Fabric8FlinkKubeClient(Configuration flinkConfig, 
KubernetesClient client, ExecutorWrapper executorWrapper) {
this.flinkConfig = checkNotNull(flinkConfig);
this.internalClient = checkNotNull(client);
this.clusterId = 
checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID));
 
this.nameSpace = 
flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
+
+   this.executorWrapper = executorWrapper;
}
 
@Override
-   public void createJobManagerComponent(KubernetesJobManagerSpecification 
kubernetesJMSpec) {
+   public CompletableFuture 
createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
final Deployment deployment = kubernetesJMSpec.getDeployment();
final List accompanyingResources = 
kubernetesJMSpec.getAccompanyingResources();
 
// create Deployment
LOG.debug("Start to create deployment with spec {}", 
deployment.getSpec().toString());
-   final Deployment createdDeployment = this.internalClient
-   .apps()
-   .deployments()
-   .inNamespace(this.nameSpace)
-   .create(deployment);
-
-   // Note that we should use the uid of the created Deployment 
for the OwnerReference.
-   setOwnerReference(createdDeployment, accompanyingResources);
 
-   this.internalClient
-   .resourceList(accompanyingResources)
-   .inNamespace(this.nameSpace)
-   .createOrReplace();
+   return CompletableFuture.runAsync(() -> {
+   final Deployment createdDeployment = this.internalClient
+   .apps()
+   .deployments()
+   .inNamespace(this.nameSpace)
+   .create(deployment);
+
+   // Note that we should use the uid of the created 
Deployment for the OwnerReference.
+   setOwnerReference(createdDeployment, 
accompanyingResources);
+
+   this.internalClient
+   .resourceList(accompanyingResources)
+   .inNamespace(this.nameSpace)
+   .createOrReplace();
+   }, executorWrapper.getExecutor());
}
 
@Override
public void createTaskManagerPod(KubernetesPod kubernetesPod) {
-   final Deployment masterDeployment = this.internalClient
-   .apps()
-   .deployments()
-   .inNamespace(this.nameSpace)
-   .withName(KubernetesUtils.getDeploymentName(clusterId))
-   .get();
-
-   if (masterDeployment == null) {
-   throw new RuntimeException(
-   "Failed to find Deployment named " + clusterId 
+ " in namespace " + this.nameSpace);
-   }
+   CompletableFuture.runAsync(() -> {
 
 Review comment:
   Thanks for your suggestion. I think the initial and max back-off time could 
be a follow-up ticket.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to 
the total number of slots
URL: https://github.com/apache/flink/pull/11615#issuecomment-607717339
 
 
   
   ## CI report:
   
   * 97223a194aa83228049de648853725b9728b7ccc Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157719880) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6975)
 
   * 2db316d977d0e790de8fa98b27dd219b68abb136 UNKNOWN
   * 0a168d7a43629f9b4aa5741b2a43bad4ae8c2237 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11671: [FLINK-17052] [client] Introduce PlanGenerator

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11671: [FLINK-17052] [client] Introduce 
PlanGenerator
URL: https://github.com/apache/flink/pull/11671#issuecomment-610971753
 
 
   
   ## CI report:
   
   * 0bb3e20d6406a3db0a0d735d2b78f77cadd7c59b Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/159593526) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7283)
 
   * f82a2583fb82c50376de97b616de11db678d88e6 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159725289) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7308)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11697: [hotfix][docs] fix some typos in elasticsearch connector

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11697: [hotfix][docs] fix some typos in 
elasticsearch connector
URL: https://github.com/apache/flink/pull/11697#issuecomment-611840689
 
 
   
   ## CI report:
   
   * 11d9d6d466e5bef9f71d3c9f14d237d1ef55b152 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159725321) 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11615: [FLINK-16605] Add max limitation to 
the total number of slots
URL: https://github.com/apache/flink/pull/11615#issuecomment-607717339
 
 
   
   ## CI report:
   
   * 97223a194aa83228049de648853725b9728b7ccc Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/157719880) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=6975)
 
   * 2db316d977d0e790de8fa98b27dd219b68abb136 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on issue #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-09 Thread GitBox
KarmaGYZ commented on issue #11615: [FLINK-16605] Add max limitation to the 
total number of slots
URL: https://github.com/apache/flink/pull/11615#issuecomment-611844110
 
 
   @xintongsong Thanks for the review. I've updated the PR, rebased it on 
#11320 .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots

2020-04-09 Thread GitBox
xintongsong commented on a change in pull request #11615: [FLINK-16605] Add max 
limitation to the total number of slots
URL: https://github.com/apache/flink/pull/11615#discussion_r406571178
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -375,6 +375,12 @@ public void registerTaskManager(final 
TaskExecutorConnection taskExecutorConnect
if 
(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {

reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
} else {
+   if (getNumberRegisteredSlots() + 
Math.max(getNumberPendingTaskManagerSlots(), numSlotsPerWorker) > maxSlotNum) {
+   LOG.warn("The total number of slots exceeds the 
max limitation, release the excess resource.");
+   
resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new 
FlinkException("The total number of slots exceeds the max limitation."));
+   return;
+   }
 
 Review comment:
   It seems to me that this approach implicitly assumes that `releaseResource` 
returns false means running on a standalone cluster, which is not always true. 
Without this assumption, continuing registering the slot when the release 
action failed seems quite against intuition.
   
   Instead of let the slot manager behaves differently according to the release 
action result, I would suggest to simply pass in infinite large values for the 
max limits when creating the slot manager. In this way, slot manager should not 
need to behave differently on standalone and active resource managers.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #11697: [hotfix][docs] fix some typos in elasticsearch connector

2020-04-09 Thread GitBox
flinkbot commented on issue #11697: [hotfix][docs] fix some typos in 
elasticsearch connector
URL: https://github.com/apache/flink/pull/11697#issuecomment-611840689
 
 
   
   ## CI report:
   
   * 11d9d6d466e5bef9f71d3c9f14d237d1ef55b152 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11671: [FLINK-17052] [client] Introduce PlanGenerator

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11671: [FLINK-17052] [client] Introduce 
PlanGenerator
URL: https://github.com/apache/flink/pull/11671#issuecomment-610971753
 
 
   
   ## CI report:
   
   * 0bb3e20d6406a3db0a0d735d2b78f77cadd7c59b Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/159593526) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7283)
 
   * f82a2583fb82c50376de97b616de11db678d88e6 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-15843) Gracefully shutdown TaskManagers on Kubernetes

2020-04-09 Thread Canbin Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-15843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Canbin Zheng updated FLINK-15843:
-
Parent: (was: FLINK-15788)
Issue Type: Improvement  (was: Sub-task)

> Gracefully shutdown TaskManagers on Kubernetes
> --
>
> Key: FLINK-15843
> URL: https://issues.apache.org/jira/browse/FLINK-15843
> Project: Flink
>  Issue Type: Improvement
>  Components: Deployment / Kubernetes
>Affects Versions: 1.10.0
>Reporter: Canbin Zheng
>Priority: Major
> Fix For: 1.11.0
>
>
> The current solution of stopping a TaskManager instance when JobManager sends 
> a deletion request is by directly calling 
> {{KubernetesClient.pods().withName().delete}}, thus that instance would be 
> violently killed with a _KILL_ signal and having no chance to clean up, which 
> could cause problems because we expect the process to gracefully terminate 
> when it is no longer needed.
> Refer to the guide of [Termination of Pods|#termination-of-pods], we know 
> that on Kubernetes a _TERM_ signal would be first sent to the main process in 
> each container, and may be followed up with a force _KILL_ signal if the 
> graceful shut-down period has expired; the Unix signal will be sent to the 
> process which has PID 1 ([Docker 
> Kill|https://docs.docker.com/engine/reference/commandline/kill/]), however, 
> the TaskManagerRunner process is spawned by 
> {color:#172b4d}/opt/flink/bin/kubernetes-entry.sh {color}and could never have 
> PID 1, so it would never receive the TERM signal.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot commented on issue #11697: [hotfix][docs] fix some typos in elasticsearch connector

2020-04-09 Thread GitBox
flinkbot commented on issue #11697: [hotfix][docs] fix some typos in 
elasticsearch connector
URL: https://github.com/apache/flink/pull/11697#issuecomment-611836537
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the 
@flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress 
of the review.
   
   
   ## Automated Checks
   Last check on commit 11d9d6d466e5bef9f71d3c9f14d237d1ef55b152 (Fri Apr 10 
01:51:43 UTC 2020)
   
   **Warnings:**
* Documentation files were touched, but no `.zh.md` files: Update Chinese 
documentation or file Jira ticket.
   
   
   Mention the bot in a comment to re-run the automated checks.
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review 
Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full 
explanation of the review process.
The Bot is tracking the review progress through labels. Labels are applied 
according to the order of the review items. For consensus, approval by a Flink 
committer of PMC member is required Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot approve description` to approve one or more aspects (aspects: 
`description`, `consensus`, `architecture` and `quality`)
- `@flinkbot approve all` to approve all aspects
- `@flinkbot approve-until architecture` to approve everything until 
`architecture`
- `@flinkbot attention @username1 [@username2 ..]` to require somebody's 
attention
- `@flinkbot disapprove architecture` to remove an approval you gave earlier
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] oliveryunchang opened a new pull request #11697: [hotfix][docs] fix some typos in elasticsearch connector

2020-04-09 Thread GitBox
oliveryunchang opened a new pull request #11697: [hotfix][docs] fix some typos 
in elasticsearch connector
URL: https://github.com/apache/flink/pull/11697
 
 
   
   
   ## What is the purpose of the change
   Correct properties typo in [Elasticsearch 
Connector](https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/connect.html#elasticsearch-connector)
 documentation.
   
   
   ## Brief change log
   
   - Change "back-off" to "backoff" in `connect.md` docs.
   
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce 
the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint
URL: https://github.com/apache/flink/pull/11696#issuecomment-611702183
 
 
   
   ## CI report:
   
   * 18fcfaf78cf5a5865e985a1f7ec24ba732d90807 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159701257) Azure: 
[CANCELED](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7306)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-16879) Disable the source-release-assembly execution goal when using the apache-release build profile

2020-04-09 Thread Tzu-Li (Gordon) Tai (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-16879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai closed FLINK-16879.
---
Fix Version/s: (was: statefun-2.1.0)
   (was: statefun-2.0.1)
   statefun-2.0.0
   Resolution: Fixed

Yes, sorry I forgot to close this one.

Fixed:

* master: 689e0c0ab80b2c541de2f55333fbaaf0880a1f11
* release-2.0: 7a6c8a6e21b65bdbba0b329a60917ca362e9171a

> Disable the source-release-assembly execution goal when using the 
> apache-release build profile
> --
>
> Key: FLINK-16879
> URL: https://issues.apache.org/jira/browse/FLINK-16879
> Project: Flink
>  Issue Type: Task
>  Components: Release System, Stateful Functions
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Tzu-Li (Gordon) Tai
>Priority: Major
>  Labels: pull-request-available
> Fix For: statefun-2.0.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The {{apache-release}} profile defined in the Apache Parent POM defines a 
> {{source-release-assembly}} execution that packages a source release 
> distribution to be published to Maven.
> We should disable this, because we use our own tools to package source 
> release distributions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-16345) Computed column can not refer time attribute column

2020-04-09 Thread xingoo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16345?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17080146#comment-17080146
 ] 

xingoo commented on FLINK-16345:


I fix the code like 1.10.1: fa181e7759bca90bc83dc9f0eceac0f504386129.

But when I use join in streaming, I found this exception:
{code:java}
//代码占位符
Flink SQL> CREATE TABLE kafka_test1 (
>   id varchar,
>   a varchar,
>   b int,
>   ts as PROCTIME()
> ) WITH (
>   'connector.type' = 'kafka',       
>   'connector.version' = '0.11',
>   'connector.topic' = 'test',
>   'connector.properties.zookeeper.connect' = 'localnode2:2181',
>   'connector.properties.bootstrap.servers' = 'localnode2:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'latest-offset',
>   'format.type' = 'json'
> )
> ;
[INFO] Table has been created.


Flink SQL> select a.*,b.* from kafka_test1 a join hbase_test1 FOR SYSTEM_TIME 
AS OF a.ts as b on a.id = b.rowkey;
{code}
exception:
{code:java}
//代码占位符
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve 
datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" a, INTEGER b, TIMESTAMP(3) NOT NULL ts, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, 
RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, 
INTEGER b) f) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id, VARCHAR(2147483647) 
CHARACTER SET "UTF-16LE" a, INTEGER b, TIME ATTRIBUTE(PROCTIME) NOT NULL ts, 
VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey, 
RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, 
INTEGER b) f) NOT NULL
rel:
LogicalProject(id=[$0], a=[$1], b=[$2], ts=[$3], rowkey=[$4], f=[$5])
  LogicalCorrelate(correlation=[$cor1], joinType=[inner], requiredColumns=[{0, 
3}])
    LogicalProject(id=[$0], a=[$1], b=[$2], ts=[PROCTIME()])
      LogicalTableScan(table=[[tgou, collie, kafka_test1, source: 
[Kafka011TableSource(id, a, b)]]])
    LogicalFilter(condition=[=($cor1.id, $0)])
      LogicalSnapshot(period=[$cor1.ts])
        LogicalTableScan(table=[[tgou, collie, hbase_test1, source: 
[HBaseTableSource[schema=[rowkey, f], projectFields=null)
{code}
Is this code can not work with proctime?

 

> Computed column can not refer time attribute column 
> 
>
> Key: FLINK-16345
> URL: https://issues.apache.org/jira/browse/FLINK-16345
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.10.0
>Reporter: Leonard Xu
>Assignee: Jark Wu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
>  Time Spent: 40m
>  Remaining Estimate: 0h
>
> If a computed column refer a time attribute column, computed column will lose 
>  time attribute and cause validation fail.
> {code:java}
> CREATE TABLE orders (
>   order_id STRING,
>   order_time TIMESTAMP(3),
>   amount DOUBLE,
>   amount_kg as amount * 1000,
>   // can not select computed column standard_ts which from column order_time 
> that used as WATERMARK
>   standard_ts as order_time + INTERVAL '8' HOUR,
>   WATERMARK FOR order_time AS order_time
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = '0.10',
>   'connector.topic' = 'flink_orders',
>   'connector.properties.zookeeper.connect' = 'localhost:2181',
>   'connector.properties.bootstrap.servers' = 'localhost:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'json',
>   'format.derive-schema' = 'true'
> );
> {code}
> The query `select amount_kg from orders` runs normally,  
> the` he query `select standard_ts from orders` throws a validation exception 
> message as following:
> {noformat}
> [ERROR] Could not execute SQL statement. Reason:
>  java.lang.AssertionError: Conversion to relational algebra failed to 
> preserve datatypes:
>  validated type:
>  RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME 
> ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIMESTAMP(3) 
> ts) NOT NULL
>  converted type:
>  RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" order_id, TIME 
> ATTRIBUTE(ROWTIME) order_time, DOUBLE amount, DOUBLE amount_kg, TIME 
> ATTRIBUTE(ROWTIME) ts) NOT NULL
>  rel:
>  LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[$3], 
> ts=[$4])
>  LogicalWatermarkAssigner(rowtime=[order_time], watermark=[$1])
>  LogicalProject(order_id=[$0], order_time=[$1], amount=[$2], amount_kg=[*($2, 
> 1000)], ts=[+($1, 2880:INTERVAL HOUR)])
>  LogicalTableScan(table=[[default_catalog, default_database, orders, source: 
> 

[jira] [Created] (FLINK-17077) FLINK_CONF_DIR environment variable to locate flink-conf.yaml in Docker container

2020-04-09 Thread Eui Heo (Jira)
Eui Heo created FLINK-17077:
---

 Summary: FLINK_CONF_DIR environment variable to locate 
flink-conf.yaml in Docker container
 Key: FLINK-17077
 URL: https://issues.apache.org/jira/browse/FLINK-17077
 Project: Flink
  Issue Type: Improvement
  Components: Deployment / Docker
Reporter: Eui Heo


To use flink-conf.yaml outside Flink home directory, we should use 
FLINK_CONF_DIR.
But despite of FLINK_CONF_DIR is provided, docker-entrypoint.sh in official 
flink-docker doesn't know FLINK_CONF_DIR and it is ignored when append 
additional flink properties to flink-conf.yaml. It would be good to use 
FLINK_CONF_DIR for the location of flink-conf.yaml, if user provide it.

https://github.com/apache/flink-docker/blob/master/docker-entrypoint.sh#L23



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11695: [FLINK-11523] Use OutputTag.typeInfo in harness

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11695: [FLINK-11523] Use OutputTag.typeInfo 
in harness
URL: https://github.com/apache/flink/pull/11695#issuecomment-611675716
 
 
   
   ## CI report:
   
   * 0f3c1cbfa554f3ba00bebcf688d155681d784c99 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159669415) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7300)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#issuecomment-603776093
 
 
   
   ## CI report:
   
   * dee2b337e5e72d8f7b1f5098b74f2958d000fb3c UNKNOWN
   * 80b7f76f24b5fb6704a4b2292543f8764ec19053 UNKNOWN
   * 35ec088ef24e841beae78c33488975d2d8d0a40d UNKNOWN
   * 334458afaa98b601eec12c490e71caa8274be055 UNKNOWN
   * 559816dcea4999dd46b09ee1091ddc0e1a5a7a54 UNKNOWN
   * 6cd76afe4990f56a5166e1592374aa8f86fe883f UNKNOWN
   * a7dd8d3321593ab8211487c0ad27eb299e876a8a Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/159695914) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7305)
 
   * ac996f5c703560ecbe6cad7394cc8ad54d6f0029 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11663: [FLINK-16820][jdbc] support reading timestamp, data, and time in JDBCTableSource

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11663: [FLINK-16820][jdbc] support reading 
timestamp, data, and time in JDBCTableSource
URL: https://github.com/apache/flink/pull/11663#issuecomment-610652042
 
 
   
   ## CI report:
   
   * f4247f80edd8aaebfe9f0d3fedc8e4cedfbb450d Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159692950) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7304)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce 
the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint
URL: https://github.com/apache/flink/pull/11696#issuecomment-611702183
 
 
   
   ## CI report:
   
   * 18fcfaf78cf5a5865e985a1f7ec24ba732d90807 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159701257) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7306)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11695: [FLINK-11523] Use OutputTag.typeInfo in harness

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11695: [FLINK-11523] Use OutputTag.typeInfo 
in harness
URL: https://github.com/apache/flink/pull/11695#issuecomment-611675716
 
 
   
   ## CI report:
   
   * 0f3c1cbfa554f3ba00bebcf688d155681d784c99 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159669415) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7300)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11663: [FLINK-16820][jdbc] support reading timestamp, data, and time in JDBCTableSource

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11663: [FLINK-16820][jdbc] support reading 
timestamp, data, and time in JDBCTableSource
URL: https://github.com/apache/flink/pull/11663#issuecomment-610652042
 
 
   
   ## CI report:
   
   * f4247f80edd8aaebfe9f0d3fedc8e4cedfbb450d Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159692950) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7304)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11695: [FLINK-11523] Use OutputTag.typeInfo in harness

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11695: [FLINK-11523] Use OutputTag.typeInfo 
in harness
URL: https://github.com/apache/flink/pull/11695#issuecomment-611675716
 
 
   
   ## CI report:
   
   * 0f3c1cbfa554f3ba00bebcf688d155681d784c99 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159669415) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7300)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] alexeyt820 commented on issue #11695: [FLINK-11523] Use OutputTag.typeInfo in harness

2020-04-09 Thread GitBox
alexeyt820 commented on issue #11695: [FLINK-11523] Use OutputTag.typeInfo in 
harness
URL: https://github.com/apache/flink/pull/11695#issuecomment-611782147
 
 
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-17076) Revamp Kafka Connector Documentation

2020-04-09 Thread Seth Wiesman (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-17076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Wiesman updated FLINK-17076:
-
Description: 
The Kafka connector is one of the most popular connectors in Flink. The 
documentation has grown organically over the years as new versions and features 
have become supported. The page is still based on the kafka 0.8 connector with 
asides for newer versions. For instance, the first paragraph says "For most 
users, the FlinkKafkaConsumer08 (part of flink-connector-kafka) is appropriate."

Since Kafka 1.0 was released in 2017, I believe the page should be restructured 
around the universal connector and Kafka(De)SerializationSchema's with clear 
examples.  


  was:
The Kafka connector is one of the most popular connectors in Flink. The 
documentation has grown organically over the years as new versions and features 
have become supported. The page is still based on the kafka 0.8 connector with 
asides for newer versions. 

Since Kafka 1.0 was released in 2017, I believe the page should be restructured 
around the universal connector and Kafka(De)SerializationSchema's with clear 
examples.  



> Revamp Kafka Connector Documentation
> 
>
> Key: FLINK-17076
> URL: https://issues.apache.org/jira/browse/FLINK-17076
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / Kafka, Documentation
>Reporter: Seth Wiesman
>Priority: Major
>
> The Kafka connector is one of the most popular connectors in Flink. The 
> documentation has grown organically over the years as new versions and 
> features have become supported. The page is still based on the kafka 0.8 
> connector with asides for newer versions. For instance, the first paragraph 
> says "For most users, the FlinkKafkaConsumer08 (part of 
> flink-connector-kafka) is appropriate."
> Since Kafka 1.0 was released in 2017, I believe the page should be 
> restructured around the universal connector and 
> Kafka(De)SerializationSchema's with clear examples.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17076) Revamp Kafka Connector Documentation

2020-04-09 Thread Seth Wiesman (Jira)
Seth Wiesman created FLINK-17076:


 Summary: Revamp Kafka Connector Documentation
 Key: FLINK-17076
 URL: https://issues.apache.org/jira/browse/FLINK-17076
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kafka, Documentation
Reporter: Seth Wiesman


The Kafka connector is one of the most popular connectors in Flink. The 
documentation has grown organically over the years as new versions and features 
have become supported. The page is still based on the kafka 0.8 connector with 
asides for newer versions. 

Since Kafka 1.0 was released in 2017, I believe the page should be restructured 
around the universal connector and Kafka(De)SerializationSchema's with clear 
examples.  




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce 
the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint
URL: https://github.com/apache/flink/pull/11696#issuecomment-611702183
 
 
   
   ## CI report:
   
   * b0c89809b4f83438cf21736a1d668cc14a05c908 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/159678569) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7303)
 
   * 18fcfaf78cf5a5865e985a1f7ec24ba732d90807 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159701257) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7306)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11515: [FLINK-16744][task] implement channel state persistence for unaligned checkpoints

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11515: [FLINK-16744][task] implement 
channel state persistence for unaligned checkpoints
URL: https://github.com/apache/flink/pull/11515#issuecomment-604128992
 
 
   
   ## CI report:
   
   * 22e12ad32ac71e45f7c40fd08ad0cc4498664613 UNKNOWN
   * 8492dd04cdb1712069f6d1c1abf39acbae60aaa2 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159676092) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7301)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce 
the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint
URL: https://github.com/apache/flink/pull/11696#issuecomment-611702183
 
 
   
   ## CI report:
   
   * b0c89809b4f83438cf21736a1d668cc14a05c908 Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/159678569) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7303)
 
   * 18fcfaf78cf5a5865e985a1f7ec24ba732d90807 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159701257) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7306)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce 
the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint
URL: https://github.com/apache/flink/pull/11696#issuecomment-611702183
 
 
   
   ## CI report:
   
   * b0c89809b4f83438cf21736a1d668cc14a05c908 Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159678569) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7303)
 
   * 18fcfaf78cf5a5865e985a1f7ec24ba732d90807 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11515: [FLINK-16744][task] implement channel state persistence for unaligned checkpoints

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11515: [FLINK-16744][task] implement 
channel state persistence for unaligned checkpoints
URL: https://github.com/apache/flink/pull/11515#issuecomment-604128992
 
 
   
   ## CI report:
   
   * 22e12ad32ac71e45f7c40fd08ad0cc4498664613 UNKNOWN
   * 8492dd04cdb1712069f6d1c1abf39acbae60aaa2 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159676092) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7301)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11694: [FLINK-17064][table-planner] Improve literals conversion in ExpressionConverter

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11694: [FLINK-17064][table-planner] Improve 
literals conversion in ExpressionConverter
URL: https://github.com/apache/flink/pull/11694#issuecomment-611675489
 
 
   
   ## CI report:
   
   * 0831cb2823d0bcd39be4d83b8b31d6906d7734f8 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159669352) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7299)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#issuecomment-603776093
 
 
   
   ## CI report:
   
   * dee2b337e5e72d8f7b1f5098b74f2958d000fb3c UNKNOWN
   * 80b7f76f24b5fb6704a4b2292543f8764ec19053 UNKNOWN
   * 35ec088ef24e841beae78c33488975d2d8d0a40d UNKNOWN
   * 334458afaa98b601eec12c490e71caa8274be055 UNKNOWN
   * 559816dcea4999dd46b09ee1091ddc0e1a5a7a54 UNKNOWN
   * 6cd76afe4990f56a5166e1592374aa8f86fe883f UNKNOWN
   * a7dd8d3321593ab8211487c0ad27eb299e876a8a Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/159695914) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7305)
 
   * ac996f5c703560ecbe6cad7394cc8ad54d6f0029 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] kl0u edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint

2020-04-09 Thread GitBox
kl0u edited a comment on issue #11696: [FLINK-16658][FLINK-16660] Introduce the 
ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint
URL: https://github.com/apache/flink/pull/11696#issuecomment-611759520
 
 
   @flinkbot run azure
   @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] kl0u commented on issue #11696: [FLINK-16658][FLINK-16660] Introduce the ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint

2020-04-09 Thread GitBox
kl0u commented on issue #11696: [FLINK-16658][FLINK-16660] Introduce the 
ApplicationDispatcherBootstrap and wire it to StandaloneJobEntrypoint
URL: https://github.com/apache/flink/pull/11696#issuecomment-611759520
 
 
   @flinkbot run azure @flinkbot run travis


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11695: [FLINK-11523] Use OutputTag.typeInfo in harness

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11695: [FLINK-11523] Use OutputTag.typeInfo 
in harness
URL: https://github.com/apache/flink/pull/11695#issuecomment-611675716
 
 
   
   ## CI report:
   
   * 0f3c1cbfa554f3ba00bebcf688d155681d784c99 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/159669415) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7300)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#issuecomment-603776093
 
 
   
   ## CI report:
   
   * dee2b337e5e72d8f7b1f5098b74f2958d000fb3c UNKNOWN
   * 80b7f76f24b5fb6704a4b2292543f8764ec19053 UNKNOWN
   * 35ec088ef24e841beae78c33488975d2d8d0a40d UNKNOWN
   * 334458afaa98b601eec12c490e71caa8274be055 UNKNOWN
   * 559816dcea4999dd46b09ee1091ddc0e1a5a7a54 UNKNOWN
   * 6cd76afe4990f56a5166e1592374aa8f86fe883f UNKNOWN
   * 71a489567e8b02abdf0ccc7ec56f85d93549eedb Travis: 
[CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/159678447) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7302)
 
   * a7dd8d3321593ab8211487c0ad27eb299e876a8a Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159695914) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7305)
 
   * ac996f5c703560ecbe6cad7394cc8ad54d6f0029 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-12692) Support disk spilling in HeapKeyedStateBackend

2020-04-09 Thread David Morin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-12692?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17080003#comment-17080003
 ] 

David Morin commented on FLINK-12692:
-

Hi,

very interested by this feature. Any update ?

Even for a trial version in flink-packages

> Support disk spilling in HeapKeyedStateBackend
> --
>
> Key: FLINK-12692
> URL: https://issues.apache.org/jira/browse/FLINK-12692
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / State Backends
>Reporter: Yu Li
>Assignee: Yu Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> {{HeapKeyedStateBackend}} is one of the two {{KeyedStateBackends}} in Flink, 
> since state lives as Java objects on the heap and the de/serialization only 
> happens during state snapshot and restore, it outperforms 
> {{RocksDBKeyedStateBackend}} when all data could reside in memory.
> However, along with the advantage, {{HeapKeyedStateBackend}} also has its 
> shortcomings, and the most painful one is the difficulty to estimate the 
> maximum heap size (Xmx) to set, and we will suffer from GC impact once the 
> heap memory is not enough to hold all state data. There’re several 
> (inevitable) causes for such scenario, including (but not limited to):
> * Memory overhead of Java object representation (tens of times of the 
> serialized data size).
> * Data flood caused by burst traffic.
> * Data accumulation caused by source malfunction.
> To resolve this problem, we propose a solution to support spilling state data 
> to disk before heap memory is exhausted. We will monitor the heap usage and 
> choose the coldest data to spill, and reload them when heap memory is 
> regained after data removing or TTL expiration, automatically.
> More details please refer to the design doc and mailing list discussion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #11667: [FLINK-15936] Harden TaskExecutorTest#testSlotAcceptance

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11667: [FLINK-15936] Harden 
TaskExecutorTest#testSlotAcceptance
URL: https://github.com/apache/flink/pull/11667#issuecomment-610810071
 
 
   
   ## CI report:
   
   * 82ce0e1dcd7ab74da0e33aeeb8c02dd1ef789e61 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159663627) Azure: 
[SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7298)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint

2020-04-09 Thread GitBox
AHeise commented on a change in pull request #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#discussion_r406475213
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##
 @@ -542,6 +573,16 @@ public void onBuffer(Buffer buffer, int sequenceNumber, 
int backlog) throws IOEx
if (backlog >= 0) {
onSenderBacklog(backlog);
}
+
+   if (checkpointBarrier != null) {
+   receivedCheckpointId = 
checkpointBarrier.getId();
+   }
+   if (checkpointBarrier != null && 
inputGate.bufferReceivedListener != null) {
+   
inputGate.bufferReceivedListener.notifyBarrierReceived(checkpointBarrier, 
channelInfo);
+   }
+   if (notifyReceivedBuffer != null && 
inputGate.bufferReceivedListener != null) {
 
 Review comment:
   That's a tricky one. `receivedCheckpointId < lastRequestedCheckpointId` can 
only be true if the checkpoint has been triggered through `Unaligner`, which in 
turn guarantees that `bufferReceivedListener != null`. 
   
   Aside from your suggestions, we could also add this assumption to the place, 
where we create `notifyReceivedBuffer` to make it more explicit. (So don't 
retain if `bufferReceivedListener == null`). Then we would just trust branch 
prediction to effectively get rid of the check. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11694: [FLINK-17064][table-planner] Improve literals conversion in ExpressionConverter

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11694: [FLINK-17064][table-planner] Improve 
literals conversion in ExpressionConverter
URL: https://github.com/apache/flink/pull/11694#issuecomment-611675489
 
 
   
   ## CI report:
   
   * 0831cb2823d0bcd39be4d83b8b31d6906d7734f8 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159669352) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7299)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11663: [FLINK-16820][jdbc] support reading timestamp, data, and time in JDBCTableSource

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11663: [FLINK-16820][jdbc] support reading 
timestamp, data, and time in JDBCTableSource
URL: https://github.com/apache/flink/pull/11663#issuecomment-610652042
 
 
   
   ## CI report:
   
   * 854993a43d90bc56a70ea6f23af0585ae0ce61d8 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/159178437) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7178)
 
   * f4247f80edd8aaebfe9f0d3fedc8e4cedfbb450d Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159692950) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7304)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#issuecomment-603776093
 
 
   
   ## CI report:
   
   * dee2b337e5e72d8f7b1f5098b74f2958d000fb3c UNKNOWN
   * 80b7f76f24b5fb6704a4b2292543f8764ec19053 UNKNOWN
   * 35ec088ef24e841beae78c33488975d2d8d0a40d UNKNOWN
   * 334458afaa98b601eec12c490e71caa8274be055 UNKNOWN
   * 559816dcea4999dd46b09ee1091ddc0e1a5a7a54 UNKNOWN
   * 6cd76afe4990f56a5166e1592374aa8f86fe883f UNKNOWN
   * 71a489567e8b02abdf0ccc7ec56f85d93549eedb Travis: 
[PENDING](https://travis-ci.com/github/flink-ci/flink/builds/159678447) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7302)
 
   * a7dd8d3321593ab8211487c0ad27eb299e876a8a UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint

2020-04-09 Thread GitBox
AHeise commented on a change in pull request #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#discussion_r406472565
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 ##
 @@ -206,6 +214,23 @@ void retriggerSubpartitionRequest(int subpartitionIndex) 
throws IOException {
return Optional.of(new BufferAndAvailability(next, 
moreAvailable, getSenderBacklog()));
}
 
+   @Override
+   public List getInflightBuffers(long checkpointId) throws 
IOException {
+   synchronized (receivedBuffers) {
+   final List inflightBuffers = new 
ArrayList<>(receivedBuffers.size());
+   for (Buffer buffer : receivedBuffers) {
+   if (!buffer.isBuffer() && 
parseCheckpointBarrierOrNull(buffer) != null) {
+   break;
+   }
+   inflightBuffers.add(buffer.retainBuffer());
+   }
+
+   triggeredCheckpointId = checkpointId;
 
 Review comment:
   Added the `checkState` and I agree that the naming is not ideal.
   
   I renamed `getInflightBuffers` to `requestInflightBuffers` (as suggested), 
and used `lastRequestedCheckpointId` instead of `triggeredCheckpointId` to 
highlight its usage.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] AHeise commented on a change in pull request #11507: [FLINK-16587] Add basic CheckpointBarrierHandler for unaligned checkpoint

2020-04-09 Thread GitBox
AHeise commented on a change in pull request #11507: [FLINK-16587] Add basic 
CheckpointBarrierHandler for unaligned checkpoint
URL: https://github.com/apache/flink/pull/11507#discussion_r406469115
 
 

 ##
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
 ##
 @@ -0,0 +1,430 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link CheckpointBarrierUnaligner} is used for triggering checkpoint while 
reading the first barrier
+ * and keeping track of the number of received barriers and consumed barriers.
+ */
+@Internal
+@NotThreadSafe
+public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointBarrierUnaligner.class);
+
+   private final String taskName;
+
+   /**
+* Tag the state of which input channel has pending in-flight buffers; 
that is, already received buffers that
+* predate the checkpoint barrier of the current checkpoint.
+*/
+   private final boolean[] hasInflightBuffers;
+
+   private int numBarrierConsumed;
+
+   /**
+* Contains the offsets of the channel indices for each gate when 
flattening the channels of all gates.
+*
+* For example, consider 3 gates with 4 channels, {@code 
gateChannelOffsets = [0, 4, 8]}.
+*/
+   private final int[] gateChannelOffsets;
+
+   private final InputChannelInfo[] channelInfos;
+
+   /**
+* The checkpoint id to guarantee that we would trigger only one 
checkpoint when reading the same barrier from
+* different channels.
+*
+* Note: this checkpoint is valid in respect to consumed 
barriers in contrast to
+* {@link ThreadSafeUnaligner#currentReceivedCheckpointId}.
+*/
+   private long currentConsumedCheckpointId = -1L;
+
+   /** Encapsulates state that is shared between netty threads and task 
thread. */
+   private final ThreadSafeUnaligner threadSafeUnaligner;
+
+   CheckpointBarrierUnaligner(
+   int[] numberOfInputChannelsPerGate,
+   ChannelStateWriter channelStateWriter,
+   String taskName,
+   AbstractInvokable toNotifyOnCheckpoint) {
+   super(toNotifyOnCheckpoint);
+
+   this.taskName = taskName;
+
+   final int numGates = numberOfInputChannelsPerGate.length;
+
+   gateChannelOffsets = new int[numGates];
+   for (int index = 1; index < numGates; index++) {
+   gateChannelOffsets[index] = gateChannelOffsets[index - 
1] + numberOfInputChannelsPerGate[index - 1];
+   }
+
+   final int totalNumChannels = gateChannelOffsets[numGates - 1] + 
numberOfInputChannelsPerGate[numGates - 1];
+   hasInflightBuffers = new boolean[totalNumChannels];
+
+   

[GitHub] [flink] flinkbot edited a comment on issue #11663: [FLINK-16820][jdbc] support reading timestamp, data, and time in JDBCTableSource

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11663: [FLINK-16820][jdbc] support reading 
timestamp, data, and time in JDBCTableSource
URL: https://github.com/apache/flink/pull/11663#issuecomment-610652042
 
 
   
   ## CI report:
   
   * 854993a43d90bc56a70ea6f23af0585ae0ce61d8 Travis: 
[FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/159178437) Azure: 
[FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7178)
 
   * f4247f80edd8aaebfe9f0d3fedc8e4cedfbb450d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on issue #11663: [FLINK-16820][jdbc] support reading timestamp, data, and time in JDBCTableSource

2020-04-09 Thread GitBox
bowenli86 commented on issue #11663: [FLINK-16820][jdbc] support reading 
timestamp, data, and time in JDBCTableSource
URL: https://github.com/apache/flink/pull/11663#issuecomment-611734963
 
 
   @wuchong thanks for your review! I've addressed the feedback and pls take 
another look


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #11663: [FLINK-16820][jdbc] support reading timestamp, data, and time in JDBCTableSource

2020-04-09 Thread GitBox
bowenli86 commented on a change in pull request #11663: [FLINK-16820][jdbc] 
support reading timestamp, data, and time in JDBCTableSource
URL: https://github.com/apache/flink/pull/11663#discussion_r406450408
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
 ##
 @@ -407,7 +407,6 @@ public int minTimestampPrecision() {
//  
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in 
LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
-   
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
 
 Review comment:
   they are orthogonal. Flink's pg catalog/source/sink support timestamp with 
local time zone, however it cannot be used due to limitations of legacy type 
system. once we completely switch to new type system, it should be good to go.
   
   Unsupported types defined here should be the ones that PG doesn't support at 
all.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] bowenli86 commented on a change in pull request #11663: [FLINK-16820][jdbc] support reading timestamp, data, and time in JDBCTableSource

2020-04-09 Thread GitBox
bowenli86 commented on a change in pull request #11663: [FLINK-16820][jdbc] 
support reading timestamp, data, and time in JDBCTableSource
URL: https://github.com/apache/flink/pull/11663#discussion_r406450408
 
 

 ##
 File path: 
flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/dialect/JDBCDialects.java
 ##
 @@ -407,7 +407,6 @@ public int minTimestampPrecision() {
//  
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in 
LegacyTypeInfoDataTypeConverter.
return Arrays.asList(
LogicalTypeRoot.BINARY,
-   
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
 
 Review comment:
   they are orthogonal. Flink's pg catalog/source/sink support timestamp with 
local time zone, however it cannot be used due to limitations of legacy type 
system. once we completely switch to new type system, it should be good to go


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #11667: [FLINK-15936] Harden TaskExecutorTest#testSlotAcceptance

2020-04-09 Thread GitBox
flinkbot edited a comment on issue #11667: [FLINK-15936] Harden 
TaskExecutorTest#testSlotAcceptance
URL: https://github.com/apache/flink/pull/11667#issuecomment-610810071
 
 
   
   ## CI report:
   
   * 82ce0e1dcd7ab74da0e33aeeb8c02dd1ef789e61 Travis: 
[SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/159663627) Azure: 
[PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7298)
 
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run travis` re-run the last Travis build
- `@flinkbot run azure` re-run the last Azure build
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   4   5   6   >