[GitHub] [flink] flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing

2019-09-18 Thread GitBox
flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark 
checkpointing
URL: https://github.com/apache/flink/pull/7013#issuecomment-532533051
 
 
   
   ## CI report:
   
   * c81a918723a3204483b152909c3fc93f16a23ce0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128117334)
   * f93c0c3ba82f53db498c46f22a4fb3bf9a730451 : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128122205)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.

2019-09-18 Thread GitBox
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] 
Support to start up Python worker in process mode.
URL: https://github.com/apache/flink/pull/9655#discussion_r325490458
 
 

 ##
 File path: flink-python/pyflink/fn_execution/boot.py
 ##
 @@ -0,0 +1,155 @@
+#!/usr/bin/env python
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This script is a python implementation of the "boot.go" script in 
"beam-sdks-python-container"
+project of Apache Beam, see in:
+
+https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go
+
+It is implemented in golang and will introduce unnecessary dependencies if 
used in pure python
+project. So we add a python implementation which will be used when the python 
worker runs in
+process mode. It downloads and installs users' python artifacts, then launches 
the python SDK
+harness of Apache Beam.
+"""
+import argparse
+import hashlib
+import os
+from subprocess import call
+
+import grpc
+import logging
+import sys
+
+from apache_beam.portability.api.beam_provision_api_pb2_grpc import 
ProvisionServiceStub
+from apache_beam.portability.api.beam_provision_api_pb2 import 
GetProvisionInfoRequest
+from apache_beam.portability.api.beam_artifact_api_pb2_grpc import 
ArtifactRetrievalServiceStub
+from apache_beam.portability.api.beam_artifact_api_pb2 import 
(GetManifestRequest,
+   
GetArtifactRequest)
+from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor
+
+from google.protobuf import json_format, text_format
+
+parser = argparse.ArgumentParser()
+
+parser.add_argument("--id", default="", help="Local identifier (required).")
+parser.add_argument("--logging_endpoint", default="",
+help="Logging endpoint (required).")
+parser.add_argument("--artifact_endpoint", default="",
+help="Artifact endpoint (required).")
+parser.add_argument("--provision_endpoint", default="",
+help="Provision endpoint (required).")
+parser.add_argument("--control_endpoint", default="",
+help="Control endpoint (required).")
+parser.add_argument("--semi_persist_dir", default="/tmp",
+help="Local semi-persistent directory (optional).")
+
+args = parser.parse_args()
+
+worker_id = args.id
+logging_endpoint = args.logging_endpoint
+artifact_endpoint = args.artifact_endpoint
+provision_endpoint = args.provision_endpoint
+control_endpoint = args.control_endpoint
+semi_persist_dir = args.semi_persist_dir
+
+if worker_id == "":
+logging.fatal("No id provided.")
+
+if logging_endpoint == "":
+logging.fatal("No logging endpoint provided.")
+
+if artifact_endpoint == "":
+logging.fatal("No artifact endpoint provided.")
+
+if provision_endpoint == "":
+logging.fatal("No provision endpoint provided.")
+
+if control_endpoint == "":
+logging.fatal("No control endpoint provided.")
+
+logging.info("Initializing python harness: %s" % " ".join(sys.argv))
+
+metadata = [("worker_id", worker_id)]
+
+# read job information from provision stub
+with grpc.insecure_channel(provision_endpoint) as channel:
+client = ProvisionServiceStub(channel=channel)
+info = client.GetProvisionInfo(GetProvisionInfoRequest(), 
metadata=metadata).info
+options = json_format.MessageToJson(info.pipeline_options)
+
+staged_dir = os.path.join(semi_persist_dir, "staged")
+files = []
+
+# download files
+with grpc.insecure_channel(artifact_endpoint) as channel:
+client = ArtifactRetrievalServiceStub(channel=channel)
+# get file list via retrieval token
+response = 
client.GetManifest(GetManifestRequest(retrieval_token=info.retrieval_token),
+  metadata=metadata)
+artifacts = response.manifest.artifact
+# download files and check hash values
+for artifact in artifacts:
+name = artifact.name
+files.append(name)
 
 Review comment:
   why we append names here. It seems the files have not been used by any 

[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.

2019-09-18 Thread GitBox
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] 
Support to start up Python worker in process mode.
URL: https://github.com/apache/flink/pull/9655#discussion_r325477942
 
 

 ##
 File path: flink-python/pyflink/fn_execution/boot.py
 ##
 @@ -0,0 +1,155 @@
+#!/usr/bin/env python
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This script is a python implementation of the "boot.go" script in 
"beam-sdks-python-container"
+project of Apache Beam, see in:
+
+https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go
+
+It is implemented in golang and will introduce unnecessary dependencies if 
used in pure python
+project. So we add a python implementation which will be used when the python 
worker runs in
+process mode. It downloads and installs users' python artifacts, then launches 
the python SDK
+harness of Apache Beam.
+"""
+import argparse
+import hashlib
+import os
+from subprocess import call
+
+import grpc
+import logging
+import sys
+
+from apache_beam.portability.api.beam_provision_api_pb2_grpc import 
ProvisionServiceStub
+from apache_beam.portability.api.beam_provision_api_pb2 import 
GetProvisionInfoRequest
+from apache_beam.portability.api.beam_artifact_api_pb2_grpc import 
ArtifactRetrievalServiceStub
+from apache_beam.portability.api.beam_artifact_api_pb2 import 
(GetManifestRequest,
+   
GetArtifactRequest)
+from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor
+
+from google.protobuf import json_format, text_format
+
+parser = argparse.ArgumentParser()
+
+parser.add_argument("--id", default="", help="Local identifier (required).")
+parser.add_argument("--logging_endpoint", default="",
+help="Logging endpoint (required).")
+parser.add_argument("--artifact_endpoint", default="",
+help="Artifact endpoint (required).")
+parser.add_argument("--provision_endpoint", default="",
+help="Provision endpoint (required).")
+parser.add_argument("--control_endpoint", default="",
+help="Control endpoint (required).")
+parser.add_argument("--semi_persist_dir", default="/tmp",
+help="Local semi-persistent directory (optional).")
+
+args = parser.parse_args()
+
+worker_id = args.id
+logging_endpoint = args.logging_endpoint
+artifact_endpoint = args.artifact_endpoint
+provision_endpoint = args.provision_endpoint
+control_endpoint = args.control_endpoint
+semi_persist_dir = args.semi_persist_dir
+
+if worker_id == "":
+logging.fatal("No id provided.")
 
 Review comment:
   We should also exit for these required arguments while logging.fatal only 
prints some logs.
   Also, add some validation tests to verify these errors.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.

2019-09-18 Thread GitBox
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] 
Support to start up Python worker in process mode.
URL: https://github.com/apache/flink/pull/9655#discussion_r325502497
 
 

 ##
 File path: flink-python/pyflink/fn_execution/boot.py
 ##
 @@ -0,0 +1,155 @@
+#!/usr/bin/env python
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This script is a python implementation of the "boot.go" script in 
"beam-sdks-python-container"
+project of Apache Beam, see in:
+
+https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go
+
+It is implemented in golang and will introduce unnecessary dependencies if 
used in pure python
+project. So we add a python implementation which will be used when the python 
worker runs in
+process mode. It downloads and installs users' python artifacts, then launches 
the python SDK
+harness of Apache Beam.
+"""
+import argparse
+import hashlib
+import os
+from subprocess import call
+
+import grpc
+import logging
+import sys
+
+from apache_beam.portability.api.beam_provision_api_pb2_grpc import 
ProvisionServiceStub
+from apache_beam.portability.api.beam_provision_api_pb2 import 
GetProvisionInfoRequest
+from apache_beam.portability.api.beam_artifact_api_pb2_grpc import 
ArtifactRetrievalServiceStub
+from apache_beam.portability.api.beam_artifact_api_pb2 import 
(GetManifestRequest,
+   
GetArtifactRequest)
+from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor
+
+from google.protobuf import json_format, text_format
+
+parser = argparse.ArgumentParser()
+
+parser.add_argument("--id", default="", help="Local identifier (required).")
+parser.add_argument("--logging_endpoint", default="",
+help="Logging endpoint (required).")
+parser.add_argument("--artifact_endpoint", default="",
+help="Artifact endpoint (required).")
+parser.add_argument("--provision_endpoint", default="",
+help="Provision endpoint (required).")
+parser.add_argument("--control_endpoint", default="",
+help="Control endpoint (required).")
+parser.add_argument("--semi_persist_dir", default="/tmp",
+help="Local semi-persistent directory (optional).")
+
+args = parser.parse_args()
+
+worker_id = args.id
+logging_endpoint = args.logging_endpoint
+artifact_endpoint = args.artifact_endpoint
+provision_endpoint = args.provision_endpoint
+control_endpoint = args.control_endpoint
+semi_persist_dir = args.semi_persist_dir
+
+if worker_id == "":
+logging.fatal("No id provided.")
+
+if logging_endpoint == "":
+logging.fatal("No logging endpoint provided.")
+
+if artifact_endpoint == "":
+logging.fatal("No artifact endpoint provided.")
+
+if provision_endpoint == "":
+logging.fatal("No provision endpoint provided.")
+
+if control_endpoint == "":
+logging.fatal("No control endpoint provided.")
+
+logging.info("Initializing python harness: %s" % " ".join(sys.argv))
+
+metadata = [("worker_id", worker_id)]
+
+# read job information from provision stub
+with grpc.insecure_channel(provision_endpoint) as channel:
+client = ProvisionServiceStub(channel=channel)
+info = client.GetProvisionInfo(GetProvisionInfoRequest(), 
metadata=metadata).info
+options = json_format.MessageToJson(info.pipeline_options)
+
+staged_dir = os.path.join(semi_persist_dir, "staged")
+files = []
+
+# download files
+with grpc.insecure_channel(artifact_endpoint) as channel:
+client = ArtifactRetrievalServiceStub(channel=channel)
+# get file list via retrieval token
+response = 
client.GetManifest(GetManifestRequest(retrieval_token=info.retrieval_token),
+  metadata=metadata)
+artifacts = response.manifest.artifact
+# download files and check hash values
+for artifact in artifacts:
+name = artifact.name
+files.append(name)
+permissions = artifact.permissions
+sha256 = artifact.sha256
+

[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.

2019-09-18 Thread GitBox
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] 
Support to start up Python worker in process mode.
URL: https://github.com/apache/flink/pull/9655#discussion_r325509399
 
 

 ##
 File path: flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
 ##
 @@ -0,0 +1,140 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+import json
+import os
+import socket
+import subprocess
+import sys
+import tempfile
+import time
+from stat import ST_MODE
+
+import grpc
+from apache_beam.portability.api.beam_artifact_api_pb2 import 
GetManifestResponse, ArtifactChunk
+from apache_beam.portability.api.beam_artifact_api_pb2_grpc import (
+ArtifactRetrievalServiceServicer, 
add_ArtifactRetrievalServiceServicer_to_server)
+from apache_beam.portability.api.beam_provision_api_pb2 import (ProvisionInfo,
+
GetProvisionInfoResponse)
+from apache_beam.portability.api.beam_provision_api_pb2_grpc import (
+ProvisionServiceServicer, add_ProvisionServiceServicer_to_server)
+from concurrent import futures
+from google.protobuf import json_format
+
+from pyflink.fn_execution.tests.process_mode_test_data import (manifest, 
file_data,
+   
test_provision_info_json)
+from pyflink.testing.test_case_utils import PyFlinkTestCase
+
+
+class PythonBootTests(PyFlinkTestCase):
+
+def setUp(self):
+manifest_response = json_format.Parse(manifest, GetManifestResponse())
+artifact_chunks = dict()
+for file_name in file_data:
+artifact_chunks[file_name] = 
json_format.Parse(file_data[file_name], ArtifactChunk())
+provision_info = json_format.Parse(test_provision_info_json, 
ProvisionInfo())
+response = GetProvisionInfoResponse(info=provision_info)
+
+def get_unused_port():
+sock = socket.socket()
+sock.bind(('', 0))
+port = sock.getsockname()[1]
+sock.close()
+return port
+
+class ArtifactService(ArtifactRetrievalServiceServicer):
+def GetManifest(self, request, context):
+return manifest_response
+
+def GetArtifact(self, request, context):
+yield artifact_chunks[request.name]
 
 Review comment:
   Can we also use return here? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.

2019-09-18 Thread GitBox
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] 
Support to start up Python worker in process mode.
URL: https://github.com/apache/flink/pull/9655#discussion_r325446663
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/pyflink-udf-runner.sh
 ##
 @@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/find-flink-home.sh
+
+_FLINK_HOME_DETERMINED=1
+
+. "$FLINK_HOME"/bin/config.sh
+
+if [[ "$FLINK_IDENT_STRING" = "" ]]; then
+FLINK_IDENT_STRING="$USER"
+fi
+
+if [[ -z "$python" ]]; then
+python="python"
+fi
+
+# Add pyflink & py4j to PYTHONPATH
+PYFLINK_ZIP="$FLINK_OPT_DIR/python/pyflink.zip"
+if [[ ! ${PYTHONPATH} =~ ${PYFLINK_ZIP} ]]; then
+export PYTHONPATH="$PYFLINK_ZIP:$PYTHONPATH"
+fi
+PY4J_ZIP=`echo "$FLINK_OPT_DIR"/python/py4j-*-src.zip`
+if [[ ! ${PYTHONPATH} =~ ${PY4J_ZIP} ]]; then
+export PYTHONPATH="$PY4J_ZIP:$PYTHONPATH"
+fi
+
+log="$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-python-udf-boot-$HOSTNAME.log"
+
+${python} -m pyflink.fn_execution.boot $@ 2>&1 | tee ${log}
 
 Review comment:
   Use `tee -a` to append log? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.

2019-09-18 Thread GitBox
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] 
Support to start up Python worker in process mode.
URL: https://github.com/apache/flink/pull/9655#discussion_r325510604
 
 

 ##
 File path: flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
 ##
 @@ -0,0 +1,140 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+import json
+import os
+import socket
+import subprocess
+import sys
+import tempfile
+import time
+from stat import ST_MODE
+
+import grpc
+from apache_beam.portability.api.beam_artifact_api_pb2 import 
GetManifestResponse, ArtifactChunk
+from apache_beam.portability.api.beam_artifact_api_pb2_grpc import (
+ArtifactRetrievalServiceServicer, 
add_ArtifactRetrievalServiceServicer_to_server)
+from apache_beam.portability.api.beam_provision_api_pb2 import (ProvisionInfo,
+
GetProvisionInfoResponse)
+from apache_beam.portability.api.beam_provision_api_pb2_grpc import (
+ProvisionServiceServicer, add_ProvisionServiceServicer_to_server)
+from concurrent import futures
+from google.protobuf import json_format
+
+from pyflink.fn_execution.tests.process_mode_test_data import (manifest, 
file_data,
+   
test_provision_info_json)
+from pyflink.testing.test_case_utils import PyFlinkTestCase
+
+
+class PythonBootTests(PyFlinkTestCase):
+
+def setUp(self):
+manifest_response = json_format.Parse(manifest, GetManifestResponse())
+artifact_chunks = dict()
+for file_name in file_data:
+artifact_chunks[file_name] = 
json_format.Parse(file_data[file_name], ArtifactChunk())
+provision_info = json_format.Parse(test_provision_info_json, 
ProvisionInfo())
+response = GetProvisionInfoResponse(info=provision_info)
+
+def get_unused_port():
+sock = socket.socket()
+sock.bind(('', 0))
+port = sock.getsockname()[1]
+sock.close()
+return port
+
+class ArtifactService(ArtifactRetrievalServiceServicer):
+def GetManifest(self, request, context):
+return manifest_response
+
+def GetArtifact(self, request, context):
+yield artifact_chunks[request.name]
+
+def start_test_artifact_server():
+server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
+add_ArtifactRetrievalServiceServicer_to_server(ArtifactService(), 
server)
+port = get_unused_port()
+server.add_insecure_port('[::]:' + str(port))
+server.start()
+return server, port
+
+class ProvisionService(ProvisionServiceServicer):
+def GetProvisionInfo(self, request, context):
+return response
+
+def start_test_provision_server():
+server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
+add_ProvisionServiceServicer_to_server(ProvisionService(), server)
+port = get_unused_port()
+server.add_insecure_port('[::]:' + str(port))
+server.start()
+return server, port
+
+self.artifact_server, self.artifact_port = start_test_artifact_server()
+self.provision_server, self.provision_port = 
start_test_provision_server()
+
+self.env = dict(os.environ)
+self.env["python"] = sys.executable
+self.env["FLINK_BOOT_TESTING"] = "1"
+
+def check_downloaded_files(self, staged_dir, manifest):
+expected_files_info = json.loads(manifest)["manifest"]["artifact"]
+files = os.listdir(staged_dir)
+self.assertEqual(len(expected_files_info), len(files))
+checked = 0
+for file_name in files:
+for file_info in expected_files_info:
+if file_name == file_info["name"]:
+self.assertEqual(
+oct(os.stat(os.path.join(staged_dir, 
file_name))[ST_MODE])[-3:],
+  

[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.

2019-09-18 Thread GitBox
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] 
Support to start up Python worker in process mode.
URL: https://github.com/apache/flink/pull/9655#discussion_r325491976
 
 

 ##
 File path: flink-python/pyflink/fn_execution/boot.py
 ##
 @@ -0,0 +1,155 @@
+#!/usr/bin/env python
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This script is a python implementation of the "boot.go" script in 
"beam-sdks-python-container"
+project of Apache Beam, see in:
+
+https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go
+
+It is implemented in golang and will introduce unnecessary dependencies if 
used in pure python
+project. So we add a python implementation which will be used when the python 
worker runs in
+process mode. It downloads and installs users' python artifacts, then launches 
the python SDK
+harness of Apache Beam.
+"""
+import argparse
+import hashlib
+import os
+from subprocess import call
+
+import grpc
+import logging
+import sys
+
+from apache_beam.portability.api.beam_provision_api_pb2_grpc import 
ProvisionServiceStub
+from apache_beam.portability.api.beam_provision_api_pb2 import 
GetProvisionInfoRequest
+from apache_beam.portability.api.beam_artifact_api_pb2_grpc import 
ArtifactRetrievalServiceStub
+from apache_beam.portability.api.beam_artifact_api_pb2 import 
(GetManifestRequest,
+   
GetArtifactRequest)
+from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor
+
+from google.protobuf import json_format, text_format
+
+parser = argparse.ArgumentParser()
+
+parser.add_argument("--id", default="", help="Local identifier (required).")
+parser.add_argument("--logging_endpoint", default="",
+help="Logging endpoint (required).")
+parser.add_argument("--artifact_endpoint", default="",
+help="Artifact endpoint (required).")
+parser.add_argument("--provision_endpoint", default="",
+help="Provision endpoint (required).")
+parser.add_argument("--control_endpoint", default="",
+help="Control endpoint (required).")
+parser.add_argument("--semi_persist_dir", default="/tmp",
+help="Local semi-persistent directory (optional).")
+
+args = parser.parse_args()
+
+worker_id = args.id
+logging_endpoint = args.logging_endpoint
+artifact_endpoint = args.artifact_endpoint
+provision_endpoint = args.provision_endpoint
+control_endpoint = args.control_endpoint
+semi_persist_dir = args.semi_persist_dir
+
+if worker_id == "":
+logging.fatal("No id provided.")
+
+if logging_endpoint == "":
+logging.fatal("No logging endpoint provided.")
+
+if artifact_endpoint == "":
+logging.fatal("No artifact endpoint provided.")
+
+if provision_endpoint == "":
+logging.fatal("No provision endpoint provided.")
+
+if control_endpoint == "":
+logging.fatal("No control endpoint provided.")
+
+logging.info("Initializing python harness: %s" % " ".join(sys.argv))
+
+metadata = [("worker_id", worker_id)]
+
+# read job information from provision stub
+with grpc.insecure_channel(provision_endpoint) as channel:
+client = ProvisionServiceStub(channel=channel)
+info = client.GetProvisionInfo(GetProvisionInfoRequest(), 
metadata=metadata).info
+options = json_format.MessageToJson(info.pipeline_options)
+
+staged_dir = os.path.join(semi_persist_dir, "staged")
+files = []
+
+# download files
+with grpc.insecure_channel(artifact_endpoint) as channel:
+client = ArtifactRetrievalServiceStub(channel=channel)
+# get file list via retrieval token
+response = 
client.GetManifest(GetManifestRequest(retrieval_token=info.retrieval_token),
+  metadata=metadata)
+artifacts = response.manifest.artifact
+# download files and check hash values
+for artifact in artifacts:
+name = artifact.name
+files.append(name)
+permissions = artifact.permissions
+sha256 = artifact.sha256
+

[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.

2019-09-18 Thread GitBox
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] 
Support to start up Python worker in process mode.
URL: https://github.com/apache/flink/pull/9655#discussion_r325504848
 
 

 ##
 File path: flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py
 ##
 @@ -0,0 +1,140 @@
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+import hashlib
+import json
+import os
+import socket
+import subprocess
+import sys
+import tempfile
+import time
+from stat import ST_MODE
+
+import grpc
+from apache_beam.portability.api.beam_artifact_api_pb2 import 
GetManifestResponse, ArtifactChunk
+from apache_beam.portability.api.beam_artifact_api_pb2_grpc import (
+ArtifactRetrievalServiceServicer, 
add_ArtifactRetrievalServiceServicer_to_server)
+from apache_beam.portability.api.beam_provision_api_pb2 import (ProvisionInfo,
+
GetProvisionInfoResponse)
+from apache_beam.portability.api.beam_provision_api_pb2_grpc import (
+ProvisionServiceServicer, add_ProvisionServiceServicer_to_server)
+from concurrent import futures
+from google.protobuf import json_format
+
+from pyflink.fn_execution.tests.process_mode_test_data import (manifest, 
file_data,
+   
test_provision_info_json)
+from pyflink.testing.test_case_utils import PyFlinkTestCase
+
+
+class PythonBootTests(PyFlinkTestCase):
+
+def setUp(self):
+manifest_response = json_format.Parse(manifest, GetManifestResponse())
+artifact_chunks = dict()
+for file_name in file_data:
+artifact_chunks[file_name] = 
json_format.Parse(file_data[file_name], ArtifactChunk())
+provision_info = json_format.Parse(test_provision_info_json, 
ProvisionInfo())
+response = GetProvisionInfoResponse(info=provision_info)
+
+def get_unused_port():
+sock = socket.socket()
+sock.bind(('', 0))
+port = sock.getsockname()[1]
+sock.close()
+return port
+
+class ArtifactService(ArtifactRetrievalServiceServicer):
+def GetManifest(self, request, context):
+return manifest_response
+
+def GetArtifact(self, request, context):
+yield artifact_chunks[request.name]
+
+def start_test_artifact_server():
+server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
+add_ArtifactRetrievalServiceServicer_to_server(ArtifactService(), 
server)
+port = get_unused_port()
+server.add_insecure_port('[::]:' + str(port))
+server.start()
+return server, port
+
+class ProvisionService(ProvisionServiceServicer):
+def GetProvisionInfo(self, request, context):
+return response
+
+def start_test_provision_server():
+server = grpc.server(futures.ThreadPoolExecutor(max_workers=1))
+add_ProvisionServiceServicer_to_server(ProvisionService(), server)
+port = get_unused_port()
+server.add_insecure_port('[::]:' + str(port))
+server.start()
+return server, port
+
+self.artifact_server, self.artifact_port = start_test_artifact_server()
+self.provision_server, self.provision_port = 
start_test_provision_server()
+
+self.env = dict(os.environ)
+self.env["python"] = sys.executable
 
 Review comment:
   Remove this since we use `sys.executable` in boot.py?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.

2019-09-18 Thread GitBox
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] 
Support to start up Python worker in process mode.
URL: https://github.com/apache/flink/pull/9655#discussion_r325232945
 
 

 ##
 File path: flink-dist/src/main/flink-bin/bin/pyflink-udf-runner.sh
 ##
 @@ -0,0 +1,47 @@
+#!/usr/bin/env bash
+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+. "$bin"/find-flink-home.sh
+
+_FLINK_HOME_DETERMINED=1
+
+. "$FLINK_HOME"/bin/config.sh
+
+if [[ "$FLINK_IDENT_STRING" = "" ]]; then
 
 Review comment:
   Replace `"$FLINK_IDENT_STRING" = ""` with `-z "$FLINK_IDENT_STRING"`? If 
these two are same, we can just use one to simply keep consistent with the `if 
[[ -z "$python" ]]` below.
   What do you think?
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.

2019-09-18 Thread GitBox
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] 
Support to start up Python worker in process mode.
URL: https://github.com/apache/flink/pull/9655#discussion_r325501314
 
 

 ##
 File path: flink-python/pyflink/fn_execution/boot.py
 ##
 @@ -0,0 +1,155 @@
+#!/usr/bin/env python
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This script is a python implementation of the "boot.go" script in 
"beam-sdks-python-container"
+project of Apache Beam, see in:
+
+https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go
+
+It is implemented in golang and will introduce unnecessary dependencies if 
used in pure python
+project. So we add a python implementation which will be used when the python 
worker runs in
+process mode. It downloads and installs users' python artifacts, then launches 
the python SDK
+harness of Apache Beam.
+"""
+import argparse
+import hashlib
+import os
+from subprocess import call
+
+import grpc
+import logging
+import sys
+
+from apache_beam.portability.api.beam_provision_api_pb2_grpc import 
ProvisionServiceStub
+from apache_beam.portability.api.beam_provision_api_pb2 import 
GetProvisionInfoRequest
+from apache_beam.portability.api.beam_artifact_api_pb2_grpc import 
ArtifactRetrievalServiceStub
+from apache_beam.portability.api.beam_artifact_api_pb2 import 
(GetManifestRequest,
+   
GetArtifactRequest)
+from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor
+
+from google.protobuf import json_format, text_format
+
+parser = argparse.ArgumentParser()
+
+parser.add_argument("--id", default="", help="Local identifier (required).")
+parser.add_argument("--logging_endpoint", default="",
+help="Logging endpoint (required).")
+parser.add_argument("--artifact_endpoint", default="",
+help="Artifact endpoint (required).")
+parser.add_argument("--provision_endpoint", default="",
+help="Provision endpoint (required).")
+parser.add_argument("--control_endpoint", default="",
+help="Control endpoint (required).")
+parser.add_argument("--semi_persist_dir", default="/tmp",
+help="Local semi-persistent directory (optional).")
+
+args = parser.parse_args()
+
+worker_id = args.id
+logging_endpoint = args.logging_endpoint
+artifact_endpoint = args.artifact_endpoint
+provision_endpoint = args.provision_endpoint
+control_endpoint = args.control_endpoint
+semi_persist_dir = args.semi_persist_dir
+
+if worker_id == "":
+logging.fatal("No id provided.")
+
+if logging_endpoint == "":
+logging.fatal("No logging endpoint provided.")
+
+if artifact_endpoint == "":
+logging.fatal("No artifact endpoint provided.")
+
+if provision_endpoint == "":
+logging.fatal("No provision endpoint provided.")
+
+if control_endpoint == "":
+logging.fatal("No control endpoint provided.")
+
+logging.info("Initializing python harness: %s" % " ".join(sys.argv))
+
+metadata = [("worker_id", worker_id)]
+
+# read job information from provision stub
+with grpc.insecure_channel(provision_endpoint) as channel:
+client = ProvisionServiceStub(channel=channel)
+info = client.GetProvisionInfo(GetProvisionInfoRequest(), 
metadata=metadata).info
+options = json_format.MessageToJson(info.pipeline_options)
+
+staged_dir = os.path.join(semi_persist_dir, "staged")
+files = []
+
+# download files
+with grpc.insecure_channel(artifact_endpoint) as channel:
+client = ArtifactRetrievalServiceStub(channel=channel)
+# get file list via retrieval token
+response = 
client.GetManifest(GetManifestRequest(retrieval_token=info.retrieval_token),
+  metadata=metadata)
+artifacts = response.manifest.artifact
+# download files and check hash values
+for artifact in artifacts:
+name = artifact.name
+files.append(name)
+permissions = artifact.permissions
+sha256 = artifact.sha256
+

[GitHub] [flink] flinkbot edited a comment on issue #9653: [FLINK-14014][python] Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution

2019-09-18 Thread GitBox
flinkbot edited a comment on issue #9653: [FLINK-14014][python] Introduce 
PythonScalarFunctionRunner to handle the communication with Python worker for 
Python ScalarFunction execution
URL: https://github.com/apache/flink/pull/9653#issuecomment-529525552
 
 
   
   ## CI report:
   
   * 852555dbd31eb4e36009c99a22771a28544a0d5e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/126471470)
   * 9b9156b15508efd40c5a8e5427a8cddb469913f9 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/126547856)
   * 48d9619d06297af85382bde44c5cff9688364459 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/127053703)
   * fde6b67e4060b486d32f6e63e5bd8458ac1e5ac5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127058420)
   * 6b4727221ed393d2d18a99ad220c87ee224093b4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127130673)
   * d367d3cbaa29d722741257e07603e9d33b03da28 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/127965086)
   * 3bac78e2cd1d22241dd82b5ec7c42cb66039412d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/12796)
   * a0876069203faaff034af1aa2fcad831d5fec343 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127975554)
   * 0475b1342c19c2831f42dad3e4f43162f72526ab : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/128116060)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing

2019-09-18 Thread GitBox
flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark 
checkpointing
URL: https://github.com/apache/flink/pull/7013#issuecomment-532533051
 
 
   
   ## CI report:
   
   * c81a918723a3204483b152909c3fc93f16a23ce0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128117334)
   * f93c0c3ba82f53db498c46f22a4fb3bf9a730451 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-14114) Shift down ClusterClient#timeout to RestClusterClient

2019-09-18 Thread TisonKun (Jira)
TisonKun created FLINK-14114:


 Summary: Shift down ClusterClient#timeout to RestClusterClient
 Key: FLINK-14114
 URL: https://issues.apache.org/jira/browse/FLINK-14114
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission
Reporter: TisonKun
 Fix For: 1.10.0


{{ClusterClient#timeout}} is only used in {{RestClusterClient}}, even without 
this prerequisite we can always shift down {{timeout}} field to subclasses of 
{{ClusterClient}}. It is towards an interface-ized {{ClusterClient}}. By side 
effect, we could reduce the dependency to parsing duration with Scala Duration 
on the fly.

CC [~till.rohrmann] [~zhuzh]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-14070) Use TimeUtils to parse duration configs

2019-09-18 Thread TisonKun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932133#comment-16932133
 ] 

TisonKun commented on FLINK-14070:
--

Hi [~zhuzh] did you start working on this thread? I'd like to create a JIRA 
about shift down {{ClusterClient#timeout}} to {{RestClusterClient}}, which 
might reduce the usage of parsing duration configs by Scala {{Duration}}. 
Generally it is a separated task but it likely has some conflict with this one. 
So I think it is better to reach you out first to see if you prefer

1. concurrently start these 2 thread and resolve possible conflict(if it 
occurs, it should be nit to resolve).
2. sequentially start working on 2 thread.

Alternatively, after create the task described above and if you are interested 
in working it as well, I can assign that ticket to you and you are the 
coordinator.

> Use TimeUtils to parse duration configs
> ---
>
> Key: FLINK-14070
> URL: https://issues.apache.org/jira/browse/FLINK-14070
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
> Fix For: 1.10.0
>
>
> FLINK-14069 makes TimeUtils able to parse all time unit labels supported by 
> scala Duration.
> We can now use TimeUtils to parse duration configs instead of using scala 
> Duration.
> Some config descriptors referring scala FiniteDuration should be updated as 
> well.
> This is one step for Flink core to get rid of scala dependencies.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-13987) add new logs api, see more log files and can see logs by pages

2019-09-18 Thread vinoyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-13987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932102#comment-16932102
 ] 

vinoyang commented on FLINK-13987:
--

I have linked it to this issue a week ago, please see FLINK-11782.

> add new logs api, see more log files and can see logs by pages 
> ---
>
> Key: FLINK-13987
> URL: https://issues.apache.org/jira/browse/FLINK-13987
> Project: Flink
>  Issue Type: New Feature
>  Components: Runtime / REST
>Reporter: lining
>Priority: Major
>
> As the job running, the log files are becoming large.
> Current log api returns all contents,it will block or not work when file is 
> large.It's unfriendly for user.
> As application runs on jvm, sometime user need see log of gc, but there 
> aren't this contents.
> Above all, we need new apis:
>  *  list taskmanager all log file
>  ** /taskmanagers/taskmanagerid/logs
>  ** 
> {code:java}
> {
>   "logs": [
> {
>   "name": "taskmanager.log",
>   "size": 12529
> }
>   ]
> } {code}
>  * see taskmanager log file by range
>  ** /taskmanagers/taskmanagerid/logs/:filename?start=[start]=[count]
>  ** 
> {code:java}
>  {
> "data": "logcontent",
> "file_size": 342882
>  }
> {code}
>  * list jobmanager all log file
>  ** /jobmanager/logs
>  ** 
> {code:java}
> {
>   "logs": [
> {
>   "name": "jobmanager.log",
>   "size": 12529
> }
>   ]
> }
> {code}
>  * see jobmanager log file by range
>  ** /jobmanager/logs/:filename?start=[start]=[count]
>  ** 
> {code:java}
> {
>   "data": "logcontent",
>   "file_size": 342882
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-5601) Window operator does not checkpoint watermarks

2019-09-18 Thread Jiayi Liao (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932103#comment-16932103
 ] 

Jiayi Liao commented on FLINK-5601:
---

[~aljoscha] Any new idea for this issue?  Theoratically, watermark should be 
state of a flink job. To solve the backward-compatibility problem, Maybe we can 
add an option of checkpointing watermark?

> Window operator does not checkpoint watermarks
> --
>
> Key: FLINK-5601
> URL: https://issues.apache.org/jira/browse/FLINK-5601
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing
>Affects Versions: 1.5.0, 1.6.0, 1.7.0, 1.8.0, 1.9.0
>Reporter: Ufuk Celebi
>Assignee: Jiayi Liao
>Priority: Critical
>  Labels: pull-request-available
>
> During release testing [~stefanrichte...@gmail.com] and I noticed that 
> watermarks are not checkpointed in the window operator.
> This can lead to non determinism when restoring checkpoints. I was running an 
> adjusted {{SessionWindowITCase}} via Kafka for testing migration and 
> rescaling and ran into failures, because the data generator required 
> determinisitic behaviour.
> What happened was that on restore it could happen that late elements were not 
> dropped, because the watermarks needed to be re-established after restore 
> first.
> [~aljoscha] Do you know whether there is a special reason for explicitly not 
> checkpointing watermarks?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with @Deprecated annotation

2019-09-18 Thread GitBox
tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep 
hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with 
@Deprecated annotation
URL: https://github.com/apache/flink/pull/9691#discussion_r325500227
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/configuration/ConfigOptionTest.java
 ##
 @@ -86,4 +87,32 @@ public void testDeprecationFlagForMixedAlternativeKeys() {
assertThat(fallbackKeys, containsInAnyOrder("fallback1", 
"fallback2"));
assertThat(deprecatedKeys, containsInAnyOrder("deprecated1", 
"deprecated2"));
}
+
+   @Test
+   public void testDeprecationForDeprecatedKeys() {
+   String[] deprecatedKeys = new String[] { "deprecated1", 
"deprecated2" };
+   final ConfigOption optionWithDeprecatedKeys = 
ConfigOptions
+   .key("key")
+   .defaultValue(0)
+   .withDeprecatedKeys(deprecatedKeys)
+   .withFallbackKeys("fallback1");
+
+   assertTrue(optionWithDeprecatedKeys.hasDeprecatedKeys());
+   int counter = 0;
+   for (final String deprecatedKey : 
optionWithDeprecatedKeys.deprecatedKeys()) {
+   counter++;
+   assertTrue(Arrays.stream(deprecatedKeys).anyMatch(item 
-> item.equals(deprecatedKey)));
+   }
+
+   assertEquals(2, counter);
+
+   final ConfigOption optionWithFallbackKeys = 
ConfigOptions
+   .key("key")
+   .defaultValue(0)
+   .withFallbackKeys("fallback1");
 
 Review comment:
   Let's split this test up into a separate one. It is usually better to have 
smaller and more targeted tests.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with @Deprecated annotation

2019-09-18 Thread GitBox
tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep 
hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with 
@Deprecated annotation
URL: https://github.com/apache/flink/pull/9691#discussion_r325500646
 
 

 ##
 File path: 
flink-core/src/test/java/org/apache/flink/configuration/ConfigOptionTest.java
 ##
 @@ -86,4 +87,32 @@ public void testDeprecationFlagForMixedAlternativeKeys() {
assertThat(fallbackKeys, containsInAnyOrder("fallback1", 
"fallback2"));
assertThat(deprecatedKeys, containsInAnyOrder("deprecated1", 
"deprecated2"));
}
+
+   @Test
+   public void testDeprecationForDeprecatedKeys() {
+   String[] deprecatedKeys = new String[] { "deprecated1", 
"deprecated2" };
+   final ConfigOption optionWithDeprecatedKeys = 
ConfigOptions
+   .key("key")
+   .defaultValue(0)
+   .withDeprecatedKeys(deprecatedKeys)
+   .withFallbackKeys("fallback1");
+
+   assertTrue(optionWithDeprecatedKeys.hasDeprecatedKeys());
+   int counter = 0;
+   for (final String deprecatedKey : 
optionWithDeprecatedKeys.deprecatedKeys()) {
+   counter++;
+   assertTrue(Arrays.stream(deprecatedKeys).anyMatch(item 
-> item.equals(deprecatedKey)));
+   }
 
 Review comment:
   nit: Instead of manually going over the `deprecatedKeys` one could convert 
them and the input keys into a `Set` and use `assertEquals`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with @Deprecated annotation

2019-09-18 Thread GitBox
tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep 
hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with 
@Deprecated annotation
URL: https://github.com/apache/flink/pull/9691#discussion_r325500814
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java
 ##
 @@ -195,6 +196,31 @@ public T defaultValue() {
return defaultValue;
}
 
+   /**
+* Checks whether this option has deprecated keys.
+* @return True if the option has deprecated keys, false if not.
+* @deprecated Replaced by {@link #hasFallbackKeys()}
+*/
+   @Deprecated
+   public boolean hasDeprecatedKeys() {
+   return fallbackKeys == EMPTY ? true :
 
 Review comment:
   I think one needs to return `false` if `fallbackKeys` are `EMPTY`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-13949) Delete deduplicating JobVertexDetailsInfo.VertexTaskDetail

2019-09-18 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-13949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-13949.
---
Fix Version/s: 1.10.0
   Resolution: Done

Done via 88a918bcb22808947537b34cb6d9dd396f08b51d

> Delete deduplicating JobVertexDetailsInfo.VertexTaskDetail
> --
>
> Key: FLINK-13949
> URL: https://issues.apache.org/jira/browse/FLINK-13949
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / REST
>Reporter: lining
>Assignee: lining
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> As there is SubtaskExecutionAttemptDetailsInfo for subtask, so we can use it 
> replace JobVertexDetailsInfo.VertexTaskDetail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-18 Thread GitBox
KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r325475374
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -474,60 +403,121 @@ public void releaseAll(Object owner) {
return;
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
-
-   // get all segments
-   final Set segments = 
allocatedSegments.remove(owner);
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-   // all segments may have been freed previously 
individually
-   if (segments == null || segments.isEmpty()) {
-   return;
-   }
+   // get all segments
+   Set segments = allocatedSegments.remove(owner);
 
-   // free each segment
-   if (isPreAllocated) {
-   for (MemorySegment seg : segments) {
-   memoryPool.returnSegmentToPool(seg);
-   }
-   }
-   else {
-   for (MemorySegment seg : segments) {
-   seg.free();
-   }
-   numNonAllocatedPages += segments.size();
-   }
+   // all segments may have been freed previously individually
+   if (segments == null || segments.isEmpty()) {
+   return;
+   }
 
-   segments.clear();
+   // free each segment
+   EnumMap releasedMemory = new 
EnumMap<>(MemoryType.class);
+   for (MemorySegment segment : segments) {
+   releaseSegment(segment, releasedMemory);
}
-   //  END CRITICAL SECTION ---
+   budgetByType.releaseBudgetForKeys(releasedMemory);
+
+   segments.clear();
}
 
-   // 

-   //  Properties, sizes and size conversions
-   // 

+   /**
+* Reserves memory of a certain type for an owner from this memory 
manager.
+*
+* @param owner The owner to associate with the memory reservation, for 
the fallback release.
+* @param memoryType type of memory to reserve (heap / off-heap).
+* @param size size of memory to reserve.
+* @throws MemoryAllocationException Thrown, if this memory manager 
does not have the requested amount
+*   of memory any more.
+*/
+   public void reserveMemory(Object owner, MemoryType memoryType, long 
size) throws MemoryAllocationException {
+   checkMemoryReservationPreconditions(owner, memoryType, size);
+   if (size == 0L) {
+   return;
+   }
+
+   long acquiredMemory = 
budgetByType.acquireBudgetForKey(memoryType, size);
+   if (acquiredMemory < size) {
+   throw new MemoryAllocationException(
+   String.format("Could not allocate %d bytes. 
Only %d bytes are remaining.", size, acquiredMemory));
+   }
+
+   reservedMemory.compute(owner, (o, reservations) -> {
+   Map newReservations = reservations;
+   if (reservations == null) {
+   newReservations = new 
EnumMap<>(MemoryType.class);
+   newReservations.put(memoryType, size);
+   } else {
+   reservations.compute(
+   memoryType,
+   (mt, currentlyReserved) -> 
currentlyReserved == null ? size : currentlyReserved + size);
+   }
+   return newReservations;
+   });
+
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
concurrently shut down.");
+   }
 
/**
-* Gets the type of memory (heap / off-heap) managed by this memory 
manager.
+* Releases memory of a certain type from an owner to this memory 
manager.
 *
-* @return The type of memory managed by this memory manager.
+* 

[GitHub] [flink] KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-18 Thread GitBox
KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r325472245
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -394,74 +318,79 @@ public void release(Collection segments) {
return;
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-   // since concurrent modifications to the collection
-   // can disturb the release, we need to try potentially 
multiple times
-   boolean successfullyReleased = false;
-   do {
-   final Iterator segmentsIterator 
= segments.iterator();
+   EnumMap releasedMemory = new 
EnumMap<>(MemoryType.class);
 
-   Object lastOwner = null;
-   Set segsForOwner = null;
+   // since concurrent modifications to the collection
+   // can disturb the release, we need to try potentially multiple 
times
+   boolean successfullyReleased = false;
+   do {
+   Iterator segmentsIterator = 
segments.iterator();
 
-   try {
-   // go over all segments
-   while (segmentsIterator.hasNext()) {
-
-   final MemorySegment seg = 
segmentsIterator.next();
-   if (seg == null || 
seg.isFreed()) {
-   continue;
-   }
-
-   final Object owner = 
seg.getOwner();
-
-   try {
-   // get the list of 
segments by this owner only if it is a different owner than for
-   // the previous one (or 
it is the first segment)
-   if (lastOwner != owner) 
{
-   lastOwner = 
owner;
-   segsForOwner = 
this.allocatedSegments.get(owner);
-   }
-
-   // remove the segment 
from the list
-   if (segsForOwner != 
null) {
-   
segsForOwner.remove(seg);
-   if 
(segsForOwner.isEmpty()) {
-   
this.allocatedSegments.remove(owner);
-   }
-   }
-
-   if (isPreAllocated) {
-   
memoryPool.returnSegmentToPool(seg);
-   }
-   else {
-   seg.free();
-   
numNonAllocatedPages++;
-   }
-   }
-   catch (Throwable t) {
-   throw new 
RuntimeException(
-   "Error 
removing book-keeping reference to allocated memory segment.", t);
-   }
+   //noinspection ProhibitedExceptionCaught
+   try {
+   MemorySegment segment = null;
+   while (segment == null && 
segmentsIterator.hasNext()) {
+   segment = segmentsIterator.next();
+   if (segment.isFreed()) {
+   segment = null;
}
+   }
+   while (segment != null) {
+

[GitHub] [flink] KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-18 Thread GitBox
KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r325473963
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -474,60 +403,121 @@ public void releaseAll(Object owner) {
return;
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
-
-   // get all segments
-   final Set segments = 
allocatedSegments.remove(owner);
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
-   // all segments may have been freed previously 
individually
-   if (segments == null || segments.isEmpty()) {
-   return;
-   }
+   // get all segments
+   Set segments = allocatedSegments.remove(owner);
 
-   // free each segment
-   if (isPreAllocated) {
-   for (MemorySegment seg : segments) {
-   memoryPool.returnSegmentToPool(seg);
-   }
-   }
-   else {
-   for (MemorySegment seg : segments) {
-   seg.free();
-   }
-   numNonAllocatedPages += segments.size();
-   }
+   // all segments may have been freed previously individually
+   if (segments == null || segments.isEmpty()) {
+   return;
+   }
 
-   segments.clear();
+   // free each segment
+   EnumMap releasedMemory = new 
EnumMap<>(MemoryType.class);
+   for (MemorySegment segment : segments) {
+   releaseSegment(segment, releasedMemory);
}
-   //  END CRITICAL SECTION ---
+   budgetByType.releaseBudgetForKeys(releasedMemory);
+
+   segments.clear();
}
 
-   // 

-   //  Properties, sizes and size conversions
-   // 

+   /**
+* Reserves memory of a certain type for an owner from this memory 
manager.
+*
+* @param owner The owner to associate with the memory reservation, for 
the fallback release.
+* @param memoryType type of memory to reserve (heap / off-heap).
+* @param size size of memory to reserve.
+* @throws MemoryAllocationException Thrown, if this memory manager 
does not have the requested amount
+*   of memory any more.
+*/
+   public void reserveMemory(Object owner, MemoryType memoryType, long 
size) throws MemoryAllocationException {
+   checkMemoryReservationPreconditions(owner, memoryType, size);
+   if (size == 0L) {
+   return;
+   }
+
+   long acquiredMemory = 
budgetByType.acquireBudgetForKey(memoryType, size);
+   if (acquiredMemory < size) {
+   throw new MemoryAllocationException(
+   String.format("Could not allocate %d bytes. 
Only %d bytes are remaining.", size, acquiredMemory));
+   }
+
+   reservedMemory.compute(owner, (o, reservations) -> {
+   Map newReservations = reservations;
+   if (reservations == null) {
+   newReservations = new 
EnumMap<>(MemoryType.class);
+   newReservations.put(memoryType, size);
+   } else {
+   reservations.compute(
+   memoryType,
+   (mt, currentlyReserved) -> 
currentlyReserved == null ? size : currentlyReserved + size);
+   }
+   return newReservations;
+   });
+
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
concurrently shut down.");
+   }
 
/**
-* Gets the type of memory (heap / off-heap) managed by this memory 
manager.
+* Releases memory of a certain type from an owner to this memory 
manager.
 *
-* @return The type of memory managed by this memory manager.
+* 

[GitHub] [flink] tillrohrmann closed pull request #9699: [FLINK-13949][rest]Delete deduplicating JobVertexDetailsInfo.VertexTa…

2019-09-18 Thread GitBox
tillrohrmann closed pull request #9699: [FLINK-13949][rest]Delete deduplicating 
JobVertexDetailsInfo.VertexTa…
URL: https://github.com/apache/flink/pull/9699
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

2019-09-18 Thread GitBox
KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate 
on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r325466726
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##
 @@ -257,133 +216,98 @@ public boolean verifyEmpty() {
 *   of memory pages any more.
 */
public List allocatePages(Object owner, int numPages) 
throws MemoryAllocationException {
-   final ArrayList segs = new 
ArrayList(numPages);
-   allocatePages(owner, segs, numPages);
-   return segs;
+   List segments = new ArrayList<>(numPages);
+   allocatePages(owner, segments, numPages);
+   return segments;
}
 
/**
-* Allocates a set of memory segments from this memory manager. If the 
memory manager pre-allocated the
-* segments, they will be taken from the pool of memory segments. 
Otherwise, they will be allocated
-* as part of this call.
+* Allocates a set of memory segments from this memory manager.
+*
+* The returned segments can have any memory type. The total 
allocated memory for each type will not exceed its
+* size limit, announced in the constructor.
 *
 * @param owner The owner to associate with the memory segment, for the 
fallback release.
 * @param target The list into which to put the allocated memory pages.
 * @param numPages The number of pages to allocate.
 * @throws MemoryAllocationException Thrown, if this memory manager 
does not have the requested amount
 *   of memory pages any more.
 */
-   public void allocatePages(Object owner, List target, int 
numPages)
-   throws MemoryAllocationException {
+   public void allocatePages(
+   Object owner,
+   Collection target,
+   int numPages) throws MemoryAllocationException {
// sanity check
-   if (owner == null) {
-   throw new IllegalArgumentException("The memory owner 
must not be null.");
-   }
+   Preconditions.checkNotNull(owner, "The memory owner must not be 
null.");
+   Preconditions.checkState(!isShutDown, "Memory manager has been 
shut down.");
 
// reserve array space, if applicable
if (target instanceof ArrayList) {
((ArrayList) 
target).ensureCapacity(numPages);
}
 
-   //  BEGIN CRITICAL SECTION 
---
-   synchronized (lock) {
-   if (isShutDown) {
-   throw new IllegalStateException("Memory manager 
has been shut down.");
-   }
-
-   // in the case of pre-allocated memory, the 
'numNonAllocatedPages' is zero, in the
-   // lazy case, the 'freeSegments.size()' is zero.
-   if (numPages > 
(memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)) {
-   throw new MemoryAllocationException("Could not 
allocate " + numPages + " pages. Only " +
-   
(memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)
-   + " pages are remaining.");
-   }
-
-   Set segmentsForOwner = 
allocatedSegments.get(owner);
-   if (segmentsForOwner == null) {
-   segmentsForOwner = new 
HashSet(numPages);
-   allocatedSegments.put(owner, segmentsForOwner);
-   }
+   // in the case of pre-allocated memory, the 
'numNonAllocatedPages' is zero, in the
 
 Review comment:
   Since we remove memory preallocation, does this comment still valid?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing

2019-09-18 Thread GitBox
flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark 
checkpointing
URL: https://github.com/apache/flink/pull/7013#issuecomment-532533051
 
 
   
   ## CI report:
   
   * c81a918723a3204483b152909c3fc93f16a23ce0 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128117334)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] tillrohrmann commented on issue #9699: [FLINK-13949][rest]Delete deduplicating JobVertexDetailsInfo.VertexTa…

2019-09-18 Thread GitBox
tillrohrmann commented on issue #9699: [FLINK-13949][rest]Delete deduplicating 
JobVertexDetailsInfo.VertexTa…
URL: https://github.com/apache/flink/pull/9699#issuecomment-532537561
 
 
   Thanks a lot. Merging this PR now.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-14069) Enable TimeUtils to parse all time units labels supported by scala Duration

2019-09-18 Thread Till Rohrmann (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-14069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-14069.
---
Resolution: Done

Done via

d3137b29c3837d34ab734b1182c15b32572e41ba
155179fb5a21afd1dd69e28a2f851b51d5d5872e

> Enable TimeUtils to parse all time units labels supported by scala Duration
> ---
>
> Key: FLINK-14069
> URL: https://issues.apache.org/jira/browse/FLINK-14069
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Configuration
>Affects Versions: 1.10.0
>Reporter: Zhu Zhu
>Assignee: Zhu Zhu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.10.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> Currently we are using scala Duration to parse duration configs. 
> The supported time unit labels are
> {
> DAYS -> "d day",
> HOURS -> "h hour",
> MINUTES -> "min minute",
> SECONDS -> "s sec second",
> MILLISECONDS -> "ms milli millisecond",
> MICROSECONDS -> "µs micro microsecond",
> NANOSECONDS -> "ns nano nanosecond"
> }
> We want to use Flink {{TimeUtils}} to parse the duration configuration, as a 
> step to let flink core get rid of scala dependencies. 
> In order not to break existing jobs, {{TimeUtils}} must be able to parse all 
> time unit labels supported by scala Duration.
> Current TimeUtils supported time unit labels are "h", "min", "s" and "ms".
> We need to enrich it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] tillrohrmann closed pull request #9686: [FLINK-14069] [core] Enable TimeUtils for all time units labels supported by scala Duration

2019-09-18 Thread GitBox
tillrohrmann closed pull request #9686: [FLINK-14069] [core] Enable TimeUtils 
for all time units labels supported by scala Duration
URL: https://github.com/apache/flink/pull/9686
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot edited a comment on issue #9689: [FLINK-7151] add a basic function ddl

2019-09-18 Thread GitBox
flinkbot edited a comment on issue #9689: [FLINK-7151] add a basic function ddl
URL: https://github.com/apache/flink/pull/9689#issuecomment-531747685
 
 
   
   ## CI report:
   
   * bd2624914db1147588ea838ae542333c310290cc : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127790175)
   * b5523d10152123f45cf883e446872b90532879c3 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/128113059)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-11936) Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter issue.

2019-09-18 Thread Danny Chan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-11936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932096#comment-16932096
 ] 

Danny Chan commented on FLINK-11936:


[~walterddr], since CALCITE-3008 has not been resolved yet, we may still keep 
the AuxiliaryConverter class in flink-table-planner when upgrade to Calcite 
release 1.21.0

> Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter 
> issue.
> -
>
> Key: FLINK-11936
> URL: https://issues.apache.org/jira/browse/FLINK-11936
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>
> AuxiliaryConverter was pulled in FLINK-6409. Since CALCITE-1761 has been 
> fixed, we should sync back with the calcite version.
> After a quick glance, I think it is not so simple to just delete the class so 
> I opened a follow up Jira on this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [flink] flinkbot edited a comment on issue #9653: [FLINK-14014][python] Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution

2019-09-18 Thread GitBox
flinkbot edited a comment on issue #9653: [FLINK-14014][python] Introduce 
PythonScalarFunctionRunner to handle the communication with Python worker for 
Python ScalarFunction execution
URL: https://github.com/apache/flink/pull/9653#issuecomment-529525552
 
 
   
   ## CI report:
   
   * 852555dbd31eb4e36009c99a22771a28544a0d5e : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/126471470)
   * 9b9156b15508efd40c5a8e5427a8cddb469913f9 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/126547856)
   * 48d9619d06297af85382bde44c5cff9688364459 : CANCELED 
[Build](https://travis-ci.com/flink-ci/flink/builds/127053703)
   * fde6b67e4060b486d32f6e63e5bd8458ac1e5ac5 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127058420)
   * 6b4727221ed393d2d18a99ad220c87ee224093b4 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127130673)
   * d367d3cbaa29d722741257e07603e9d33b03da28 : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/127965086)
   * 3bac78e2cd1d22241dd82b5ec7c42cb66039412d : FAILURE 
[Build](https://travis-ci.com/flink-ci/flink/builds/12796)
   * a0876069203faaff034af1aa2fcad831d5fec343 : SUCCESS 
[Build](https://travis-ci.com/flink-ci/flink/builds/127975554)
   * 0475b1342c19c2831f42dad3e4f43162f72526ab : PENDING 
[Build](https://travis-ci.com/flink-ci/flink/builds/128116060)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] flinkbot commented on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing

2019-09-18 Thread GitBox
flinkbot commented on issue #7013: [FLINK-5601][Checkpointing] Watermark 
checkpointing
URL: https://github.com/apache/flink/pull/7013#issuecomment-532533051
 
 
   
   ## CI report:
   
   * c81a918723a3204483b152909c3fc93f16a23ce0 : UNKNOWN
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


<    1   2   3   4