[GitHub] [flink] flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing
flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing URL: https://github.com/apache/flink/pull/7013#issuecomment-532533051 ## CI report: * c81a918723a3204483b152909c3fc93f16a23ce0 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128117334) * f93c0c3ba82f53db498c46f22a4fb3bf9a730451 : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/128122205) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode. URL: https://github.com/apache/flink/pull/9655#discussion_r325490458 ## File path: flink-python/pyflink/fn_execution/boot.py ## @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This script is a python implementation of the "boot.go" script in "beam-sdks-python-container" +project of Apache Beam, see in: + +https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go + +It is implemented in golang and will introduce unnecessary dependencies if used in pure python +project. So we add a python implementation which will be used when the python worker runs in +process mode. It downloads and installs users' python artifacts, then launches the python SDK +harness of Apache Beam. +""" +import argparse +import hashlib +import os +from subprocess import call + +import grpc +import logging +import sys + +from apache_beam.portability.api.beam_provision_api_pb2_grpc import ProvisionServiceStub +from apache_beam.portability.api.beam_provision_api_pb2 import GetProvisionInfoRequest +from apache_beam.portability.api.beam_artifact_api_pb2_grpc import ArtifactRetrievalServiceStub +from apache_beam.portability.api.beam_artifact_api_pb2 import (GetManifestRequest, + GetArtifactRequest) +from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor + +from google.protobuf import json_format, text_format + +parser = argparse.ArgumentParser() + +parser.add_argument("--id", default="", help="Local identifier (required).") +parser.add_argument("--logging_endpoint", default="", +help="Logging endpoint (required).") +parser.add_argument("--artifact_endpoint", default="", +help="Artifact endpoint (required).") +parser.add_argument("--provision_endpoint", default="", +help="Provision endpoint (required).") +parser.add_argument("--control_endpoint", default="", +help="Control endpoint (required).") +parser.add_argument("--semi_persist_dir", default="/tmp", +help="Local semi-persistent directory (optional).") + +args = parser.parse_args() + +worker_id = args.id +logging_endpoint = args.logging_endpoint +artifact_endpoint = args.artifact_endpoint +provision_endpoint = args.provision_endpoint +control_endpoint = args.control_endpoint +semi_persist_dir = args.semi_persist_dir + +if worker_id == "": +logging.fatal("No id provided.") + +if logging_endpoint == "": +logging.fatal("No logging endpoint provided.") + +if artifact_endpoint == "": +logging.fatal("No artifact endpoint provided.") + +if provision_endpoint == "": +logging.fatal("No provision endpoint provided.") + +if control_endpoint == "": +logging.fatal("No control endpoint provided.") + +logging.info("Initializing python harness: %s" % " ".join(sys.argv)) + +metadata = [("worker_id", worker_id)] + +# read job information from provision stub +with grpc.insecure_channel(provision_endpoint) as channel: +client = ProvisionServiceStub(channel=channel) +info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info +options = json_format.MessageToJson(info.pipeline_options) + +staged_dir = os.path.join(semi_persist_dir, "staged") +files = [] + +# download files +with grpc.insecure_channel(artifact_endpoint) as channel: +client = ArtifactRetrievalServiceStub(channel=channel) +# get file list via retrieval token +response = client.GetManifest(GetManifestRequest(retrieval_token=info.retrieval_token), + metadata=metadata) +artifacts = response.manifest.artifact +# download files and check hash values +for artifact in artifacts: +name = artifact.name +files.append(name) Review comment: why we append names here. It seems the files have not been used by any
[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode. URL: https://github.com/apache/flink/pull/9655#discussion_r325477942 ## File path: flink-python/pyflink/fn_execution/boot.py ## @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This script is a python implementation of the "boot.go" script in "beam-sdks-python-container" +project of Apache Beam, see in: + +https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go + +It is implemented in golang and will introduce unnecessary dependencies if used in pure python +project. So we add a python implementation which will be used when the python worker runs in +process mode. It downloads and installs users' python artifacts, then launches the python SDK +harness of Apache Beam. +""" +import argparse +import hashlib +import os +from subprocess import call + +import grpc +import logging +import sys + +from apache_beam.portability.api.beam_provision_api_pb2_grpc import ProvisionServiceStub +from apache_beam.portability.api.beam_provision_api_pb2 import GetProvisionInfoRequest +from apache_beam.portability.api.beam_artifact_api_pb2_grpc import ArtifactRetrievalServiceStub +from apache_beam.portability.api.beam_artifact_api_pb2 import (GetManifestRequest, + GetArtifactRequest) +from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor + +from google.protobuf import json_format, text_format + +parser = argparse.ArgumentParser() + +parser.add_argument("--id", default="", help="Local identifier (required).") +parser.add_argument("--logging_endpoint", default="", +help="Logging endpoint (required).") +parser.add_argument("--artifact_endpoint", default="", +help="Artifact endpoint (required).") +parser.add_argument("--provision_endpoint", default="", +help="Provision endpoint (required).") +parser.add_argument("--control_endpoint", default="", +help="Control endpoint (required).") +parser.add_argument("--semi_persist_dir", default="/tmp", +help="Local semi-persistent directory (optional).") + +args = parser.parse_args() + +worker_id = args.id +logging_endpoint = args.logging_endpoint +artifact_endpoint = args.artifact_endpoint +provision_endpoint = args.provision_endpoint +control_endpoint = args.control_endpoint +semi_persist_dir = args.semi_persist_dir + +if worker_id == "": +logging.fatal("No id provided.") Review comment: We should also exit for these required arguments while logging.fatal only prints some logs. Also, add some validation tests to verify these errors. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode. URL: https://github.com/apache/flink/pull/9655#discussion_r325502497 ## File path: flink-python/pyflink/fn_execution/boot.py ## @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This script is a python implementation of the "boot.go" script in "beam-sdks-python-container" +project of Apache Beam, see in: + +https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go + +It is implemented in golang and will introduce unnecessary dependencies if used in pure python +project. So we add a python implementation which will be used when the python worker runs in +process mode. It downloads and installs users' python artifacts, then launches the python SDK +harness of Apache Beam. +""" +import argparse +import hashlib +import os +from subprocess import call + +import grpc +import logging +import sys + +from apache_beam.portability.api.beam_provision_api_pb2_grpc import ProvisionServiceStub +from apache_beam.portability.api.beam_provision_api_pb2 import GetProvisionInfoRequest +from apache_beam.portability.api.beam_artifact_api_pb2_grpc import ArtifactRetrievalServiceStub +from apache_beam.portability.api.beam_artifact_api_pb2 import (GetManifestRequest, + GetArtifactRequest) +from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor + +from google.protobuf import json_format, text_format + +parser = argparse.ArgumentParser() + +parser.add_argument("--id", default="", help="Local identifier (required).") +parser.add_argument("--logging_endpoint", default="", +help="Logging endpoint (required).") +parser.add_argument("--artifact_endpoint", default="", +help="Artifact endpoint (required).") +parser.add_argument("--provision_endpoint", default="", +help="Provision endpoint (required).") +parser.add_argument("--control_endpoint", default="", +help="Control endpoint (required).") +parser.add_argument("--semi_persist_dir", default="/tmp", +help="Local semi-persistent directory (optional).") + +args = parser.parse_args() + +worker_id = args.id +logging_endpoint = args.logging_endpoint +artifact_endpoint = args.artifact_endpoint +provision_endpoint = args.provision_endpoint +control_endpoint = args.control_endpoint +semi_persist_dir = args.semi_persist_dir + +if worker_id == "": +logging.fatal("No id provided.") + +if logging_endpoint == "": +logging.fatal("No logging endpoint provided.") + +if artifact_endpoint == "": +logging.fatal("No artifact endpoint provided.") + +if provision_endpoint == "": +logging.fatal("No provision endpoint provided.") + +if control_endpoint == "": +logging.fatal("No control endpoint provided.") + +logging.info("Initializing python harness: %s" % " ".join(sys.argv)) + +metadata = [("worker_id", worker_id)] + +# read job information from provision stub +with grpc.insecure_channel(provision_endpoint) as channel: +client = ProvisionServiceStub(channel=channel) +info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info +options = json_format.MessageToJson(info.pipeline_options) + +staged_dir = os.path.join(semi_persist_dir, "staged") +files = [] + +# download files +with grpc.insecure_channel(artifact_endpoint) as channel: +client = ArtifactRetrievalServiceStub(channel=channel) +# get file list via retrieval token +response = client.GetManifest(GetManifestRequest(retrieval_token=info.retrieval_token), + metadata=metadata) +artifacts = response.manifest.artifact +# download files and check hash values +for artifact in artifacts: +name = artifact.name +files.append(name) +permissions = artifact.permissions +sha256 = artifact.sha256 +
[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode. URL: https://github.com/apache/flink/pull/9655#discussion_r325509399 ## File path: flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py ## @@ -0,0 +1,140 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import json +import os +import socket +import subprocess +import sys +import tempfile +import time +from stat import ST_MODE + +import grpc +from apache_beam.portability.api.beam_artifact_api_pb2 import GetManifestResponse, ArtifactChunk +from apache_beam.portability.api.beam_artifact_api_pb2_grpc import ( +ArtifactRetrievalServiceServicer, add_ArtifactRetrievalServiceServicer_to_server) +from apache_beam.portability.api.beam_provision_api_pb2 import (ProvisionInfo, + GetProvisionInfoResponse) +from apache_beam.portability.api.beam_provision_api_pb2_grpc import ( +ProvisionServiceServicer, add_ProvisionServiceServicer_to_server) +from concurrent import futures +from google.protobuf import json_format + +from pyflink.fn_execution.tests.process_mode_test_data import (manifest, file_data, + test_provision_info_json) +from pyflink.testing.test_case_utils import PyFlinkTestCase + + +class PythonBootTests(PyFlinkTestCase): + +def setUp(self): +manifest_response = json_format.Parse(manifest, GetManifestResponse()) +artifact_chunks = dict() +for file_name in file_data: +artifact_chunks[file_name] = json_format.Parse(file_data[file_name], ArtifactChunk()) +provision_info = json_format.Parse(test_provision_info_json, ProvisionInfo()) +response = GetProvisionInfoResponse(info=provision_info) + +def get_unused_port(): +sock = socket.socket() +sock.bind(('', 0)) +port = sock.getsockname()[1] +sock.close() +return port + +class ArtifactService(ArtifactRetrievalServiceServicer): +def GetManifest(self, request, context): +return manifest_response + +def GetArtifact(self, request, context): +yield artifact_chunks[request.name] Review comment: Can we also use return here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode. URL: https://github.com/apache/flink/pull/9655#discussion_r325446663 ## File path: flink-dist/src/main/flink-bin/bin/pyflink-udf-runner.sh ## @@ -0,0 +1,47 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` +. "$bin"/find-flink-home.sh + +_FLINK_HOME_DETERMINED=1 + +. "$FLINK_HOME"/bin/config.sh + +if [[ "$FLINK_IDENT_STRING" = "" ]]; then +FLINK_IDENT_STRING="$USER" +fi + +if [[ -z "$python" ]]; then +python="python" +fi + +# Add pyflink & py4j to PYTHONPATH +PYFLINK_ZIP="$FLINK_OPT_DIR/python/pyflink.zip" +if [[ ! ${PYTHONPATH} =~ ${PYFLINK_ZIP} ]]; then +export PYTHONPATH="$PYFLINK_ZIP:$PYTHONPATH" +fi +PY4J_ZIP=`echo "$FLINK_OPT_DIR"/python/py4j-*-src.zip` +if [[ ! ${PYTHONPATH} =~ ${PY4J_ZIP} ]]; then +export PYTHONPATH="$PY4J_ZIP:$PYTHONPATH" +fi + +log="$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-python-udf-boot-$HOSTNAME.log" + +${python} -m pyflink.fn_execution.boot $@ 2>&1 | tee ${log} Review comment: Use `tee -a` to append log? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode. URL: https://github.com/apache/flink/pull/9655#discussion_r325510604 ## File path: flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py ## @@ -0,0 +1,140 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import json +import os +import socket +import subprocess +import sys +import tempfile +import time +from stat import ST_MODE + +import grpc +from apache_beam.portability.api.beam_artifact_api_pb2 import GetManifestResponse, ArtifactChunk +from apache_beam.portability.api.beam_artifact_api_pb2_grpc import ( +ArtifactRetrievalServiceServicer, add_ArtifactRetrievalServiceServicer_to_server) +from apache_beam.portability.api.beam_provision_api_pb2 import (ProvisionInfo, + GetProvisionInfoResponse) +from apache_beam.portability.api.beam_provision_api_pb2_grpc import ( +ProvisionServiceServicer, add_ProvisionServiceServicer_to_server) +from concurrent import futures +from google.protobuf import json_format + +from pyflink.fn_execution.tests.process_mode_test_data import (manifest, file_data, + test_provision_info_json) +from pyflink.testing.test_case_utils import PyFlinkTestCase + + +class PythonBootTests(PyFlinkTestCase): + +def setUp(self): +manifest_response = json_format.Parse(manifest, GetManifestResponse()) +artifact_chunks = dict() +for file_name in file_data: +artifact_chunks[file_name] = json_format.Parse(file_data[file_name], ArtifactChunk()) +provision_info = json_format.Parse(test_provision_info_json, ProvisionInfo()) +response = GetProvisionInfoResponse(info=provision_info) + +def get_unused_port(): +sock = socket.socket() +sock.bind(('', 0)) +port = sock.getsockname()[1] +sock.close() +return port + +class ArtifactService(ArtifactRetrievalServiceServicer): +def GetManifest(self, request, context): +return manifest_response + +def GetArtifact(self, request, context): +yield artifact_chunks[request.name] + +def start_test_artifact_server(): +server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) +add_ArtifactRetrievalServiceServicer_to_server(ArtifactService(), server) +port = get_unused_port() +server.add_insecure_port('[::]:' + str(port)) +server.start() +return server, port + +class ProvisionService(ProvisionServiceServicer): +def GetProvisionInfo(self, request, context): +return response + +def start_test_provision_server(): +server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) +add_ProvisionServiceServicer_to_server(ProvisionService(), server) +port = get_unused_port() +server.add_insecure_port('[::]:' + str(port)) +server.start() +return server, port + +self.artifact_server, self.artifact_port = start_test_artifact_server() +self.provision_server, self.provision_port = start_test_provision_server() + +self.env = dict(os.environ) +self.env["python"] = sys.executable +self.env["FLINK_BOOT_TESTING"] = "1" + +def check_downloaded_files(self, staged_dir, manifest): +expected_files_info = json.loads(manifest)["manifest"]["artifact"] +files = os.listdir(staged_dir) +self.assertEqual(len(expected_files_info), len(files)) +checked = 0 +for file_name in files: +for file_info in expected_files_info: +if file_name == file_info["name"]: +self.assertEqual( +oct(os.stat(os.path.join(staged_dir, file_name))[ST_MODE])[-3:], +
[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode. URL: https://github.com/apache/flink/pull/9655#discussion_r325491976 ## File path: flink-python/pyflink/fn_execution/boot.py ## @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This script is a python implementation of the "boot.go" script in "beam-sdks-python-container" +project of Apache Beam, see in: + +https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go + +It is implemented in golang and will introduce unnecessary dependencies if used in pure python +project. So we add a python implementation which will be used when the python worker runs in +process mode. It downloads and installs users' python artifacts, then launches the python SDK +harness of Apache Beam. +""" +import argparse +import hashlib +import os +from subprocess import call + +import grpc +import logging +import sys + +from apache_beam.portability.api.beam_provision_api_pb2_grpc import ProvisionServiceStub +from apache_beam.portability.api.beam_provision_api_pb2 import GetProvisionInfoRequest +from apache_beam.portability.api.beam_artifact_api_pb2_grpc import ArtifactRetrievalServiceStub +from apache_beam.portability.api.beam_artifact_api_pb2 import (GetManifestRequest, + GetArtifactRequest) +from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor + +from google.protobuf import json_format, text_format + +parser = argparse.ArgumentParser() + +parser.add_argument("--id", default="", help="Local identifier (required).") +parser.add_argument("--logging_endpoint", default="", +help="Logging endpoint (required).") +parser.add_argument("--artifact_endpoint", default="", +help="Artifact endpoint (required).") +parser.add_argument("--provision_endpoint", default="", +help="Provision endpoint (required).") +parser.add_argument("--control_endpoint", default="", +help="Control endpoint (required).") +parser.add_argument("--semi_persist_dir", default="/tmp", +help="Local semi-persistent directory (optional).") + +args = parser.parse_args() + +worker_id = args.id +logging_endpoint = args.logging_endpoint +artifact_endpoint = args.artifact_endpoint +provision_endpoint = args.provision_endpoint +control_endpoint = args.control_endpoint +semi_persist_dir = args.semi_persist_dir + +if worker_id == "": +logging.fatal("No id provided.") + +if logging_endpoint == "": +logging.fatal("No logging endpoint provided.") + +if artifact_endpoint == "": +logging.fatal("No artifact endpoint provided.") + +if provision_endpoint == "": +logging.fatal("No provision endpoint provided.") + +if control_endpoint == "": +logging.fatal("No control endpoint provided.") + +logging.info("Initializing python harness: %s" % " ".join(sys.argv)) + +metadata = [("worker_id", worker_id)] + +# read job information from provision stub +with grpc.insecure_channel(provision_endpoint) as channel: +client = ProvisionServiceStub(channel=channel) +info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info +options = json_format.MessageToJson(info.pipeline_options) + +staged_dir = os.path.join(semi_persist_dir, "staged") +files = [] + +# download files +with grpc.insecure_channel(artifact_endpoint) as channel: +client = ArtifactRetrievalServiceStub(channel=channel) +# get file list via retrieval token +response = client.GetManifest(GetManifestRequest(retrieval_token=info.retrieval_token), + metadata=metadata) +artifacts = response.manifest.artifact +# download files and check hash values +for artifact in artifacts: +name = artifact.name +files.append(name) +permissions = artifact.permissions +sha256 = artifact.sha256 +
[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode. URL: https://github.com/apache/flink/pull/9655#discussion_r325504848 ## File path: flink-python/pyflink/fn_execution/tests/test_process_mode_boot.py ## @@ -0,0 +1,140 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import json +import os +import socket +import subprocess +import sys +import tempfile +import time +from stat import ST_MODE + +import grpc +from apache_beam.portability.api.beam_artifact_api_pb2 import GetManifestResponse, ArtifactChunk +from apache_beam.portability.api.beam_artifact_api_pb2_grpc import ( +ArtifactRetrievalServiceServicer, add_ArtifactRetrievalServiceServicer_to_server) +from apache_beam.portability.api.beam_provision_api_pb2 import (ProvisionInfo, + GetProvisionInfoResponse) +from apache_beam.portability.api.beam_provision_api_pb2_grpc import ( +ProvisionServiceServicer, add_ProvisionServiceServicer_to_server) +from concurrent import futures +from google.protobuf import json_format + +from pyflink.fn_execution.tests.process_mode_test_data import (manifest, file_data, + test_provision_info_json) +from pyflink.testing.test_case_utils import PyFlinkTestCase + + +class PythonBootTests(PyFlinkTestCase): + +def setUp(self): +manifest_response = json_format.Parse(manifest, GetManifestResponse()) +artifact_chunks = dict() +for file_name in file_data: +artifact_chunks[file_name] = json_format.Parse(file_data[file_name], ArtifactChunk()) +provision_info = json_format.Parse(test_provision_info_json, ProvisionInfo()) +response = GetProvisionInfoResponse(info=provision_info) + +def get_unused_port(): +sock = socket.socket() +sock.bind(('', 0)) +port = sock.getsockname()[1] +sock.close() +return port + +class ArtifactService(ArtifactRetrievalServiceServicer): +def GetManifest(self, request, context): +return manifest_response + +def GetArtifact(self, request, context): +yield artifact_chunks[request.name] + +def start_test_artifact_server(): +server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) +add_ArtifactRetrievalServiceServicer_to_server(ArtifactService(), server) +port = get_unused_port() +server.add_insecure_port('[::]:' + str(port)) +server.start() +return server, port + +class ProvisionService(ProvisionServiceServicer): +def GetProvisionInfo(self, request, context): +return response + +def start_test_provision_server(): +server = grpc.server(futures.ThreadPoolExecutor(max_workers=1)) +add_ProvisionServiceServicer_to_server(ProvisionService(), server) +port = get_unused_port() +server.add_insecure_port('[::]:' + str(port)) +server.start() +return server, port + +self.artifact_server, self.artifact_port = start_test_artifact_server() +self.provision_server, self.provision_port = start_test_provision_server() + +self.env = dict(os.environ) +self.env["python"] = sys.executable Review comment: Remove this since we use `sys.executable` in boot.py? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode. URL: https://github.com/apache/flink/pull/9655#discussion_r325232945 ## File path: flink-dist/src/main/flink-bin/bin/pyflink-udf-runner.sh ## @@ -0,0 +1,47 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +bin=`dirname "$0"` +bin=`cd "$bin"; pwd` +. "$bin"/find-flink-home.sh + +_FLINK_HOME_DETERMINED=1 + +. "$FLINK_HOME"/bin/config.sh + +if [[ "$FLINK_IDENT_STRING" = "" ]]; then Review comment: Replace `"$FLINK_IDENT_STRING" = ""` with `-z "$FLINK_IDENT_STRING"`? If these two are same, we can just use one to simply keep consistent with the `if [[ -z "$python" ]]` below. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode.
hequn8128 commented on a change in pull request #9655: [FLINK-14017][python] Support to start up Python worker in process mode. URL: https://github.com/apache/flink/pull/9655#discussion_r325501314 ## File path: flink-python/pyflink/fn_execution/boot.py ## @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This script is a python implementation of the "boot.go" script in "beam-sdks-python-container" +project of Apache Beam, see in: + +https://github.com/apache/beam/blob/release-2.14.0/sdks/python/container/boot.go + +It is implemented in golang and will introduce unnecessary dependencies if used in pure python +project. So we add a python implementation which will be used when the python worker runs in +process mode. It downloads and installs users' python artifacts, then launches the python SDK +harness of Apache Beam. +""" +import argparse +import hashlib +import os +from subprocess import call + +import grpc +import logging +import sys + +from apache_beam.portability.api.beam_provision_api_pb2_grpc import ProvisionServiceStub +from apache_beam.portability.api.beam_provision_api_pb2 import GetProvisionInfoRequest +from apache_beam.portability.api.beam_artifact_api_pb2_grpc import ArtifactRetrievalServiceStub +from apache_beam.portability.api.beam_artifact_api_pb2 import (GetManifestRequest, + GetArtifactRequest) +from apache_beam.portability.api.endpoints_pb2 import ApiServiceDescriptor + +from google.protobuf import json_format, text_format + +parser = argparse.ArgumentParser() + +parser.add_argument("--id", default="", help="Local identifier (required).") +parser.add_argument("--logging_endpoint", default="", +help="Logging endpoint (required).") +parser.add_argument("--artifact_endpoint", default="", +help="Artifact endpoint (required).") +parser.add_argument("--provision_endpoint", default="", +help="Provision endpoint (required).") +parser.add_argument("--control_endpoint", default="", +help="Control endpoint (required).") +parser.add_argument("--semi_persist_dir", default="/tmp", +help="Local semi-persistent directory (optional).") + +args = parser.parse_args() + +worker_id = args.id +logging_endpoint = args.logging_endpoint +artifact_endpoint = args.artifact_endpoint +provision_endpoint = args.provision_endpoint +control_endpoint = args.control_endpoint +semi_persist_dir = args.semi_persist_dir + +if worker_id == "": +logging.fatal("No id provided.") + +if logging_endpoint == "": +logging.fatal("No logging endpoint provided.") + +if artifact_endpoint == "": +logging.fatal("No artifact endpoint provided.") + +if provision_endpoint == "": +logging.fatal("No provision endpoint provided.") + +if control_endpoint == "": +logging.fatal("No control endpoint provided.") + +logging.info("Initializing python harness: %s" % " ".join(sys.argv)) + +metadata = [("worker_id", worker_id)] + +# read job information from provision stub +with grpc.insecure_channel(provision_endpoint) as channel: +client = ProvisionServiceStub(channel=channel) +info = client.GetProvisionInfo(GetProvisionInfoRequest(), metadata=metadata).info +options = json_format.MessageToJson(info.pipeline_options) + +staged_dir = os.path.join(semi_persist_dir, "staged") +files = [] + +# download files +with grpc.insecure_channel(artifact_endpoint) as channel: +client = ArtifactRetrievalServiceStub(channel=channel) +# get file list via retrieval token +response = client.GetManifest(GetManifestRequest(retrieval_token=info.retrieval_token), + metadata=metadata) +artifacts = response.manifest.artifact +# download files and check hash values +for artifact in artifacts: +name = artifact.name +files.append(name) +permissions = artifact.permissions +sha256 = artifact.sha256 +
[GitHub] [flink] flinkbot edited a comment on issue #9653: [FLINK-14014][python] Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution
flinkbot edited a comment on issue #9653: [FLINK-14014][python] Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution URL: https://github.com/apache/flink/pull/9653#issuecomment-529525552 ## CI report: * 852555dbd31eb4e36009c99a22771a28544a0d5e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/126471470) * 9b9156b15508efd40c5a8e5427a8cddb469913f9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/126547856) * 48d9619d06297af85382bde44c5cff9688364459 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/127053703) * fde6b67e4060b486d32f6e63e5bd8458ac1e5ac5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127058420) * 6b4727221ed393d2d18a99ad220c87ee224093b4 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127130673) * d367d3cbaa29d722741257e07603e9d33b03da28 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/127965086) * 3bac78e2cd1d22241dd82b5ec7c42cb66039412d : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/12796) * a0876069203faaff034af1aa2fcad831d5fec343 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127975554) * 0475b1342c19c2831f42dad3e4f43162f72526ab : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/128116060) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing
flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing URL: https://github.com/apache/flink/pull/7013#issuecomment-532533051 ## CI report: * c81a918723a3204483b152909c3fc93f16a23ce0 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128117334) * f93c0c3ba82f53db498c46f22a4fb3bf9a730451 : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-14114) Shift down ClusterClient#timeout to RestClusterClient
TisonKun created FLINK-14114: Summary: Shift down ClusterClient#timeout to RestClusterClient Key: FLINK-14114 URL: https://issues.apache.org/jira/browse/FLINK-14114 Project: Flink Issue Type: Sub-task Components: Client / Job Submission Reporter: TisonKun Fix For: 1.10.0 {{ClusterClient#timeout}} is only used in {{RestClusterClient}}, even without this prerequisite we can always shift down {{timeout}} field to subclasses of {{ClusterClient}}. It is towards an interface-ized {{ClusterClient}}. By side effect, we could reduce the dependency to parsing duration with Scala Duration on the fly. CC [~till.rohrmann] [~zhuzh] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-14070) Use TimeUtils to parse duration configs
[ https://issues.apache.org/jira/browse/FLINK-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932133#comment-16932133 ] TisonKun commented on FLINK-14070: -- Hi [~zhuzh] did you start working on this thread? I'd like to create a JIRA about shift down {{ClusterClient#timeout}} to {{RestClusterClient}}, which might reduce the usage of parsing duration configs by Scala {{Duration}}. Generally it is a separated task but it likely has some conflict with this one. So I think it is better to reach you out first to see if you prefer 1. concurrently start these 2 thread and resolve possible conflict(if it occurs, it should be nit to resolve). 2. sequentially start working on 2 thread. Alternatively, after create the task described above and if you are interested in working it as well, I can assign that ticket to you and you are the coordinator. > Use TimeUtils to parse duration configs > --- > > Key: FLINK-14070 > URL: https://issues.apache.org/jira/browse/FLINK-14070 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Affects Versions: 1.10.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Fix For: 1.10.0 > > > FLINK-14069 makes TimeUtils able to parse all time unit labels supported by > scala Duration. > We can now use TimeUtils to parse duration configs instead of using scala > Duration. > Some config descriptors referring scala FiniteDuration should be updated as > well. > This is one step for Flink core to get rid of scala dependencies. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-13987) add new logs api, see more log files and can see logs by pages
[ https://issues.apache.org/jira/browse/FLINK-13987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932102#comment-16932102 ] vinoyang commented on FLINK-13987: -- I have linked it to this issue a week ago, please see FLINK-11782. > add new logs api, see more log files and can see logs by pages > --- > > Key: FLINK-13987 > URL: https://issues.apache.org/jira/browse/FLINK-13987 > Project: Flink > Issue Type: New Feature > Components: Runtime / REST >Reporter: lining >Priority: Major > > As the job running, the log files are becoming large. > Current log api returns all contents,it will block or not work when file is > large.It's unfriendly for user. > As application runs on jvm, sometime user need see log of gc, but there > aren't this contents. > Above all, we need new apis: > * list taskmanager all log file > ** /taskmanagers/taskmanagerid/logs > ** > {code:java} > { > "logs": [ > { > "name": "taskmanager.log", > "size": 12529 > } > ] > } {code} > * see taskmanager log file by range > ** /taskmanagers/taskmanagerid/logs/:filename?start=[start]=[count] > ** > {code:java} > { > "data": "logcontent", > "file_size": 342882 > } > {code} > * list jobmanager all log file > ** /jobmanager/logs > ** > {code:java} > { > "logs": [ > { > "name": "jobmanager.log", > "size": 12529 > } > ] > } > {code} > * see jobmanager log file by range > ** /jobmanager/logs/:filename?start=[start]=[count] > ** > {code:java} > { > "data": "logcontent", > "file_size": 342882 > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-5601) Window operator does not checkpoint watermarks
[ https://issues.apache.org/jira/browse/FLINK-5601?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932103#comment-16932103 ] Jiayi Liao commented on FLINK-5601: --- [~aljoscha] Any new idea for this issue? Theoratically, watermark should be state of a flink job. To solve the backward-compatibility problem, Maybe we can add an option of checkpointing watermark? > Window operator does not checkpoint watermarks > -- > > Key: FLINK-5601 > URL: https://issues.apache.org/jira/browse/FLINK-5601 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Affects Versions: 1.5.0, 1.6.0, 1.7.0, 1.8.0, 1.9.0 >Reporter: Ufuk Celebi >Assignee: Jiayi Liao >Priority: Critical > Labels: pull-request-available > > During release testing [~stefanrichte...@gmail.com] and I noticed that > watermarks are not checkpointed in the window operator. > This can lead to non determinism when restoring checkpoints. I was running an > adjusted {{SessionWindowITCase}} via Kafka for testing migration and > rescaling and ran into failures, because the data generator required > determinisitic behaviour. > What happened was that on restore it could happen that late elements were not > dropped, because the watermarks needed to be re-established after restore > first. > [~aljoscha] Do you know whether there is a special reason for explicitly not > checkpointing watermarks? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with @Deprecated annotation
tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with @Deprecated annotation URL: https://github.com/apache/flink/pull/9691#discussion_r325500227 ## File path: flink-core/src/test/java/org/apache/flink/configuration/ConfigOptionTest.java ## @@ -86,4 +87,32 @@ public void testDeprecationFlagForMixedAlternativeKeys() { assertThat(fallbackKeys, containsInAnyOrder("fallback1", "fallback2")); assertThat(deprecatedKeys, containsInAnyOrder("deprecated1", "deprecated2")); } + + @Test + public void testDeprecationForDeprecatedKeys() { + String[] deprecatedKeys = new String[] { "deprecated1", "deprecated2" }; + final ConfigOption optionWithDeprecatedKeys = ConfigOptions + .key("key") + .defaultValue(0) + .withDeprecatedKeys(deprecatedKeys) + .withFallbackKeys("fallback1"); + + assertTrue(optionWithDeprecatedKeys.hasDeprecatedKeys()); + int counter = 0; + for (final String deprecatedKey : optionWithDeprecatedKeys.deprecatedKeys()) { + counter++; + assertTrue(Arrays.stream(deprecatedKeys).anyMatch(item -> item.equals(deprecatedKey))); + } + + assertEquals(2, counter); + + final ConfigOption optionWithFallbackKeys = ConfigOptions + .key("key") + .defaultValue(0) + .withFallbackKeys("fallback1"); Review comment: Let's split this test up into a separate one. It is usually better to have smaller and more targeted tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with @Deprecated annotation
tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with @Deprecated annotation URL: https://github.com/apache/flink/pull/9691#discussion_r325500646 ## File path: flink-core/src/test/java/org/apache/flink/configuration/ConfigOptionTest.java ## @@ -86,4 +87,32 @@ public void testDeprecationFlagForMixedAlternativeKeys() { assertThat(fallbackKeys, containsInAnyOrder("fallback1", "fallback2")); assertThat(deprecatedKeys, containsInAnyOrder("deprecated1", "deprecated2")); } + + @Test + public void testDeprecationForDeprecatedKeys() { + String[] deprecatedKeys = new String[] { "deprecated1", "deprecated2" }; + final ConfigOption optionWithDeprecatedKeys = ConfigOptions + .key("key") + .defaultValue(0) + .withDeprecatedKeys(deprecatedKeys) + .withFallbackKeys("fallback1"); + + assertTrue(optionWithDeprecatedKeys.hasDeprecatedKeys()); + int counter = 0; + for (final String deprecatedKey : optionWithDeprecatedKeys.deprecatedKeys()) { + counter++; + assertTrue(Arrays.stream(deprecatedKeys).anyMatch(item -> item.equals(deprecatedKey))); + } Review comment: nit: Instead of manually going over the `deprecatedKeys` one could convert them and the input keys into a `Set` and use `assertEquals`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with @Deprecated annotation
tillrohrmann commented on a change in pull request #9691: [FLINK-13965] Keep hasDeprecatedKeys and deprecatedKeys methods in ConfigOption and mark it with @Deprecated annotation URL: https://github.com/apache/flink/pull/9691#discussion_r325500814 ## File path: flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java ## @@ -195,6 +196,31 @@ public T defaultValue() { return defaultValue; } + /** +* Checks whether this option has deprecated keys. +* @return True if the option has deprecated keys, false if not. +* @deprecated Replaced by {@link #hasFallbackKeys()} +*/ + @Deprecated + public boolean hasDeprecatedKeys() { + return fallbackKeys == EMPTY ? true : Review comment: I think one needs to return `false` if `fallbackKeys` are `EMPTY`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-13949) Delete deduplicating JobVertexDetailsInfo.VertexTaskDetail
[ https://issues.apache.org/jira/browse/FLINK-13949?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-13949. --- Fix Version/s: 1.10.0 Resolution: Done Done via 88a918bcb22808947537b34cb6d9dd396f08b51d > Delete deduplicating JobVertexDetailsInfo.VertexTaskDetail > -- > > Key: FLINK-13949 > URL: https://issues.apache.org/jira/browse/FLINK-13949 > Project: Flink > Issue Type: Improvement > Components: Runtime / REST >Reporter: lining >Assignee: lining >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > As there is SubtaskExecutionAttemptDetailsInfo for subtask, so we can use it > replace JobVertexDetailsInfo.VertexTaskDetail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r325475374 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -474,60 +403,121 @@ public void releaseAll(Object owner) { return; } - // BEGIN CRITICAL SECTION --- - synchronized (lock) { - if (isShutDown) { - throw new IllegalStateException("Memory manager has been shut down."); - } - - // get all segments - final Set segments = allocatedSegments.remove(owner); + Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); - // all segments may have been freed previously individually - if (segments == null || segments.isEmpty()) { - return; - } + // get all segments + Set segments = allocatedSegments.remove(owner); - // free each segment - if (isPreAllocated) { - for (MemorySegment seg : segments) { - memoryPool.returnSegmentToPool(seg); - } - } - else { - for (MemorySegment seg : segments) { - seg.free(); - } - numNonAllocatedPages += segments.size(); - } + // all segments may have been freed previously individually + if (segments == null || segments.isEmpty()) { + return; + } - segments.clear(); + // free each segment + EnumMap releasedMemory = new EnumMap<>(MemoryType.class); + for (MemorySegment segment : segments) { + releaseSegment(segment, releasedMemory); } - // END CRITICAL SECTION --- + budgetByType.releaseBudgetForKeys(releasedMemory); + + segments.clear(); } - // - // Properties, sizes and size conversions - // + /** +* Reserves memory of a certain type for an owner from this memory manager. +* +* @param owner The owner to associate with the memory reservation, for the fallback release. +* @param memoryType type of memory to reserve (heap / off-heap). +* @param size size of memory to reserve. +* @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount +* of memory any more. +*/ + public void reserveMemory(Object owner, MemoryType memoryType, long size) throws MemoryAllocationException { + checkMemoryReservationPreconditions(owner, memoryType, size); + if (size == 0L) { + return; + } + + long acquiredMemory = budgetByType.acquireBudgetForKey(memoryType, size); + if (acquiredMemory < size) { + throw new MemoryAllocationException( + String.format("Could not allocate %d bytes. Only %d bytes are remaining.", size, acquiredMemory)); + } + + reservedMemory.compute(owner, (o, reservations) -> { + Map newReservations = reservations; + if (reservations == null) { + newReservations = new EnumMap<>(MemoryType.class); + newReservations.put(memoryType, size); + } else { + reservations.compute( + memoryType, + (mt, currentlyReserved) -> currentlyReserved == null ? size : currentlyReserved + size); + } + return newReservations; + }); + + Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down."); + } /** -* Gets the type of memory (heap / off-heap) managed by this memory manager. +* Releases memory of a certain type from an owner to this memory manager. * -* @return The type of memory managed by this memory manager. +*
[GitHub] [flink] KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r325472245 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -394,74 +318,79 @@ public void release(Collection segments) { return; } - // BEGIN CRITICAL SECTION --- - synchronized (lock) { - if (isShutDown) { - throw new IllegalStateException("Memory manager has been shut down."); - } + Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); - // since concurrent modifications to the collection - // can disturb the release, we need to try potentially multiple times - boolean successfullyReleased = false; - do { - final Iterator segmentsIterator = segments.iterator(); + EnumMap releasedMemory = new EnumMap<>(MemoryType.class); - Object lastOwner = null; - Set segsForOwner = null; + // since concurrent modifications to the collection + // can disturb the release, we need to try potentially multiple times + boolean successfullyReleased = false; + do { + Iterator segmentsIterator = segments.iterator(); - try { - // go over all segments - while (segmentsIterator.hasNext()) { - - final MemorySegment seg = segmentsIterator.next(); - if (seg == null || seg.isFreed()) { - continue; - } - - final Object owner = seg.getOwner(); - - try { - // get the list of segments by this owner only if it is a different owner than for - // the previous one (or it is the first segment) - if (lastOwner != owner) { - lastOwner = owner; - segsForOwner = this.allocatedSegments.get(owner); - } - - // remove the segment from the list - if (segsForOwner != null) { - segsForOwner.remove(seg); - if (segsForOwner.isEmpty()) { - this.allocatedSegments.remove(owner); - } - } - - if (isPreAllocated) { - memoryPool.returnSegmentToPool(seg); - } - else { - seg.free(); - numNonAllocatedPages++; - } - } - catch (Throwable t) { - throw new RuntimeException( - "Error removing book-keeping reference to allocated memory segment.", t); - } + //noinspection ProhibitedExceptionCaught + try { + MemorySegment segment = null; + while (segment == null && segmentsIterator.hasNext()) { + segment = segmentsIterator.next(); + if (segment.isFreed()) { + segment = null; } + } + while (segment != null) { +
[GitHub] [flink] KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r325473963 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -474,60 +403,121 @@ public void releaseAll(Object owner) { return; } - // BEGIN CRITICAL SECTION --- - synchronized (lock) { - if (isShutDown) { - throw new IllegalStateException("Memory manager has been shut down."); - } - - // get all segments - final Set segments = allocatedSegments.remove(owner); + Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); - // all segments may have been freed previously individually - if (segments == null || segments.isEmpty()) { - return; - } + // get all segments + Set segments = allocatedSegments.remove(owner); - // free each segment - if (isPreAllocated) { - for (MemorySegment seg : segments) { - memoryPool.returnSegmentToPool(seg); - } - } - else { - for (MemorySegment seg : segments) { - seg.free(); - } - numNonAllocatedPages += segments.size(); - } + // all segments may have been freed previously individually + if (segments == null || segments.isEmpty()) { + return; + } - segments.clear(); + // free each segment + EnumMap releasedMemory = new EnumMap<>(MemoryType.class); + for (MemorySegment segment : segments) { + releaseSegment(segment, releasedMemory); } - // END CRITICAL SECTION --- + budgetByType.releaseBudgetForKeys(releasedMemory); + + segments.clear(); } - // - // Properties, sizes and size conversions - // + /** +* Reserves memory of a certain type for an owner from this memory manager. +* +* @param owner The owner to associate with the memory reservation, for the fallback release. +* @param memoryType type of memory to reserve (heap / off-heap). +* @param size size of memory to reserve. +* @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount +* of memory any more. +*/ + public void reserveMemory(Object owner, MemoryType memoryType, long size) throws MemoryAllocationException { + checkMemoryReservationPreconditions(owner, memoryType, size); + if (size == 0L) { + return; + } + + long acquiredMemory = budgetByType.acquireBudgetForKey(memoryType, size); + if (acquiredMemory < size) { + throw new MemoryAllocationException( + String.format("Could not allocate %d bytes. Only %d bytes are remaining.", size, acquiredMemory)); + } + + reservedMemory.compute(owner, (o, reservations) -> { + Map newReservations = reservations; + if (reservations == null) { + newReservations = new EnumMap<>(MemoryType.class); + newReservations.put(memoryType, size); + } else { + reservations.compute( + memoryType, + (mt, currentlyReserved) -> currentlyReserved == null ? size : currentlyReserved + size); + } + return newReservations; + }); + + Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down."); + } /** -* Gets the type of memory (heap / off-heap) managed by this memory manager. +* Releases memory of a certain type from an owner to this memory manager. * -* @return The type of memory managed by this memory manager. +*
[GitHub] [flink] tillrohrmann closed pull request #9699: [FLINK-13949][rest]Delete deduplicating JobVertexDetailsInfo.VertexTa…
tillrohrmann closed pull request #9699: [FLINK-13949][rest]Delete deduplicating JobVertexDetailsInfo.VertexTa… URL: https://github.com/apache/flink/pull/9699 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools URL: https://github.com/apache/flink/pull/9693#discussion_r325466726 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java ## @@ -257,133 +216,98 @@ public boolean verifyEmpty() { * of memory pages any more. */ public List allocatePages(Object owner, int numPages) throws MemoryAllocationException { - final ArrayList segs = new ArrayList(numPages); - allocatePages(owner, segs, numPages); - return segs; + List segments = new ArrayList<>(numPages); + allocatePages(owner, segments, numPages); + return segments; } /** -* Allocates a set of memory segments from this memory manager. If the memory manager pre-allocated the -* segments, they will be taken from the pool of memory segments. Otherwise, they will be allocated -* as part of this call. +* Allocates a set of memory segments from this memory manager. +* +* The returned segments can have any memory type. The total allocated memory for each type will not exceed its +* size limit, announced in the constructor. * * @param owner The owner to associate with the memory segment, for the fallback release. * @param target The list into which to put the allocated memory pages. * @param numPages The number of pages to allocate. * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount * of memory pages any more. */ - public void allocatePages(Object owner, List target, int numPages) - throws MemoryAllocationException { + public void allocatePages( + Object owner, + Collection target, + int numPages) throws MemoryAllocationException { // sanity check - if (owner == null) { - throw new IllegalArgumentException("The memory owner must not be null."); - } + Preconditions.checkNotNull(owner, "The memory owner must not be null."); + Preconditions.checkState(!isShutDown, "Memory manager has been shut down."); // reserve array space, if applicable if (target instanceof ArrayList) { ((ArrayList) target).ensureCapacity(numPages); } - // BEGIN CRITICAL SECTION --- - synchronized (lock) { - if (isShutDown) { - throw new IllegalStateException("Memory manager has been shut down."); - } - - // in the case of pre-allocated memory, the 'numNonAllocatedPages' is zero, in the - // lazy case, the 'freeSegments.size()' is zero. - if (numPages > (memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages)) { - throw new MemoryAllocationException("Could not allocate " + numPages + " pages. Only " + - (memoryPool.getNumberOfAvailableMemorySegments() + numNonAllocatedPages) - + " pages are remaining."); - } - - Set segmentsForOwner = allocatedSegments.get(owner); - if (segmentsForOwner == null) { - segmentsForOwner = new HashSet(numPages); - allocatedSegments.put(owner, segmentsForOwner); - } + // in the case of pre-allocated memory, the 'numNonAllocatedPages' is zero, in the Review comment: Since we remove memory preallocation, does this comment still valid? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing
flinkbot edited a comment on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing URL: https://github.com/apache/flink/pull/7013#issuecomment-532533051 ## CI report: * c81a918723a3204483b152909c3fc93f16a23ce0 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128117334) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] tillrohrmann commented on issue #9699: [FLINK-13949][rest]Delete deduplicating JobVertexDetailsInfo.VertexTa…
tillrohrmann commented on issue #9699: [FLINK-13949][rest]Delete deduplicating JobVertexDetailsInfo.VertexTa… URL: https://github.com/apache/flink/pull/9699#issuecomment-532537561 Thanks a lot. Merging this PR now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-14069) Enable TimeUtils to parse all time units labels supported by scala Duration
[ https://issues.apache.org/jira/browse/FLINK-14069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-14069. --- Resolution: Done Done via d3137b29c3837d34ab734b1182c15b32572e41ba 155179fb5a21afd1dd69e28a2f851b51d5d5872e > Enable TimeUtils to parse all time units labels supported by scala Duration > --- > > Key: FLINK-14069 > URL: https://issues.apache.org/jira/browse/FLINK-14069 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Configuration >Affects Versions: 1.10.0 >Reporter: Zhu Zhu >Assignee: Zhu Zhu >Priority: Major > Labels: pull-request-available > Fix For: 1.10.0 > > Time Spent: 20m > Remaining Estimate: 0h > > Currently we are using scala Duration to parse duration configs. > The supported time unit labels are > { > DAYS -> "d day", > HOURS -> "h hour", > MINUTES -> "min minute", > SECONDS -> "s sec second", > MILLISECONDS -> "ms milli millisecond", > MICROSECONDS -> "µs micro microsecond", > NANOSECONDS -> "ns nano nanosecond" > } > We want to use Flink {{TimeUtils}} to parse the duration configuration, as a > step to let flink core get rid of scala dependencies. > In order not to break existing jobs, {{TimeUtils}} must be able to parse all > time unit labels supported by scala Duration. > Current TimeUtils supported time unit labels are "h", "min", "s" and "ms". > We need to enrich it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] tillrohrmann closed pull request #9686: [FLINK-14069] [core] Enable TimeUtils for all time units labels supported by scala Duration
tillrohrmann closed pull request #9686: [FLINK-14069] [core] Enable TimeUtils for all time units labels supported by scala Duration URL: https://github.com/apache/flink/pull/9686 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot edited a comment on issue #9689: [FLINK-7151] add a basic function ddl
flinkbot edited a comment on issue #9689: [FLINK-7151] add a basic function ddl URL: https://github.com/apache/flink/pull/9689#issuecomment-531747685 ## CI report: * bd2624914db1147588ea838ae542333c310290cc : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127790175) * b5523d10152123f45cf883e446872b90532879c3 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/128113059) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-11936) Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter issue.
[ https://issues.apache.org/jira/browse/FLINK-11936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16932096#comment-16932096 ] Danny Chan commented on FLINK-11936: [~walterddr], since CALCITE-3008 has not been resolved yet, we may still keep the AuxiliaryConverter class in flink-table-planner when upgrade to Calcite release 1.21.0 > Remove AuxiliaryConverter pull-in from calcite and fix auxiliary converter > issue. > - > > Key: FLINK-11936 > URL: https://issues.apache.org/jira/browse/FLINK-11936 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > > AuxiliaryConverter was pulled in FLINK-6409. Since CALCITE-1761 has been > fixed, we should sync back with the calcite version. > After a quick glance, I think it is not so simple to just delete the class so > I opened a follow up Jira on this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [flink] flinkbot edited a comment on issue #9653: [FLINK-14014][python] Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution
flinkbot edited a comment on issue #9653: [FLINK-14014][python] Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution URL: https://github.com/apache/flink/pull/9653#issuecomment-529525552 ## CI report: * 852555dbd31eb4e36009c99a22771a28544a0d5e : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/126471470) * 9b9156b15508efd40c5a8e5427a8cddb469913f9 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/126547856) * 48d9619d06297af85382bde44c5cff9688364459 : CANCELED [Build](https://travis-ci.com/flink-ci/flink/builds/127053703) * fde6b67e4060b486d32f6e63e5bd8458ac1e5ac5 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127058420) * 6b4727221ed393d2d18a99ad220c87ee224093b4 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127130673) * d367d3cbaa29d722741257e07603e9d33b03da28 : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/127965086) * 3bac78e2cd1d22241dd82b5ec7c42cb66039412d : FAILURE [Build](https://travis-ci.com/flink-ci/flink/builds/12796) * a0876069203faaff034af1aa2fcad831d5fec343 : SUCCESS [Build](https://travis-ci.com/flink-ci/flink/builds/127975554) * 0475b1342c19c2831f42dad3e4f43162f72526ab : PENDING [Build](https://travis-ci.com/flink-ci/flink/builds/128116060) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] flinkbot commented on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing
flinkbot commented on issue #7013: [FLINK-5601][Checkpointing] Watermark checkpointing URL: https://github.com/apache/flink/pull/7013#issuecomment-532533051 ## CI report: * c81a918723a3204483b152909c3fc93f16a23ce0 : UNKNOWN This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services