[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-10-05 Thread GitBox


ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r499941214



##
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##
@@ -53,361 +54,386 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\

Review comment:
   Oh yeah, `--test_pipeline_options` is now required (even though it 
should be possible to leave it empty). Boyuan, would you mind filing a PR to 
fix this?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-10-01 Thread GitBox


ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r498358861



##
File path: sdks/python/scripts/run_pytest.sh
##
@@ -29,10 +29,16 @@ posargs=$2
 
 # Run with pytest-xdist and without.
 pytest -o junit_suite_name=${envname} \
-  --junitxml=pytest_${envname}.xml -m 'not no_xdist' -n 6 --pyargs ${posargs}
+  --junitxml=pytest_${envname}.xml -m 'not no_xdist' -n 6 \
+  --ignore=apache_beam/runners/portability/flink_runner_test.py \

Review comment:
   Done 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463348975



##
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#   --environment_type=DOCKER"] \
+# [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  # --flink_job_server_jar=/path/to/job_server.jar \
-  # --environment_type=docker \
-  # --extra_experiments=beam_experiments \
-  # [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-  '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-  '--streaming',
-  default=False,
-  action='store_true',
-  help='Job type. batch or streaming')
-  parser.add_argument(
-  '--environment_type',
-  default='loopback',
-  help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-  '--extra_experiments',
-  default=[],
-  action='append',
-  help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-  known_args.flink_job_server_jar or
-  job_server.JavaJarJobServer.path_to_beam_jar(
-  'runners:flink:%s:job-server:shadowJar' %
-  FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-  known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-_use_grpc = True
-_use_subprocesses = True
-
-conf_dir = None
-expansion_port = None
-
-@classmethod
-def tearDownClass(cls):
-  if cls.conf_dir and exists(cls.conf_dir):
-_LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-rmtree(cls.conf_dir)
-  super(FlinkRunnerTest, cls).tearDownClass()
-
-@classmethod
-def _create_conf_dir(cls):
-  """Create (and save a static reference to) a "conf dir", used to provide
-   metrics configs and verify metrics output
-
-   It gets cleaned up when the suite is done executing"""
-
-  if hasattr(cls, 'conf_dir'):
-cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-# path for a FileReporter to write metrics to
-cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-# path to write Flink configuration to
-conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-with open(conf_path, 'w') as f:
-  f.write(
-  linesep.join([
-  'metrics.reporters: file',
-  'metrics.reporter.file.class: %s' % file_reporter,
-  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-  'metrics.scope.operator: ',
-  ]))
-
-@classmethod
-def _subprocess_command(cls, job_port, expansion_port):
-  # will be cleaned up at the end of this method, and recreated and used by
-  # the job server
-  tmp_dir = mkdtemp(prefix='flinktest')
-
-  cls._create_conf_dir()
-  cls.expansion_port = expansion_port
-
-  try:
-return [
-'java',
-'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-'-jar',
-flink_job_server_jar,
-'--flink-master',
-'[local]',
-'--flink-conf-dir',
-cls.conf_dir,
-'--artifacts-dir',
-tmp_dir,
-'--job-port',
-str(job_port),
-'--artifact-port',
-'0',
-'--expansion-port',
-str(expansion_port),
-]
-  finally:
-rmtree(tmp_dir)
-
-@classmethod
-def get_runner(cls):
-  return portable_runner.PortableRunner()
-
-@classmethod
-def get_expansion_service(cls):
-  # TODO Move expansion address resides into PipelineOptions
-  return 'localhost:%s' % cls.expansion_port
-
-def create_options(self):
-  options = super(FlinkRunnerTest, self).create_o

[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463348975



##
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#   --environment_type=DOCKER"] \
+# [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  # --flink_job_server_jar=/path/to/job_server.jar \
-  # --environment_type=docker \
-  # --extra_experiments=beam_experiments \
-  # [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-  '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-  '--streaming',
-  default=False,
-  action='store_true',
-  help='Job type. batch or streaming')
-  parser.add_argument(
-  '--environment_type',
-  default='loopback',
-  help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-  '--extra_experiments',
-  default=[],
-  action='append',
-  help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-  known_args.flink_job_server_jar or
-  job_server.JavaJarJobServer.path_to_beam_jar(
-  'runners:flink:%s:job-server:shadowJar' %
-  FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-  known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-_use_grpc = True
-_use_subprocesses = True
-
-conf_dir = None
-expansion_port = None
-
-@classmethod
-def tearDownClass(cls):
-  if cls.conf_dir and exists(cls.conf_dir):
-_LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-rmtree(cls.conf_dir)
-  super(FlinkRunnerTest, cls).tearDownClass()
-
-@classmethod
-def _create_conf_dir(cls):
-  """Create (and save a static reference to) a "conf dir", used to provide
-   metrics configs and verify metrics output
-
-   It gets cleaned up when the suite is done executing"""
-
-  if hasattr(cls, 'conf_dir'):
-cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-# path for a FileReporter to write metrics to
-cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-# path to write Flink configuration to
-conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-with open(conf_path, 'w') as f:
-  f.write(
-  linesep.join([
-  'metrics.reporters: file',
-  'metrics.reporter.file.class: %s' % file_reporter,
-  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-  'metrics.scope.operator: ',
-  ]))
-
-@classmethod
-def _subprocess_command(cls, job_port, expansion_port):
-  # will be cleaned up at the end of this method, and recreated and used by
-  # the job server
-  tmp_dir = mkdtemp(prefix='flinktest')
-
-  cls._create_conf_dir()
-  cls.expansion_port = expansion_port
-
-  try:
-return [
-'java',
-'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-'-jar',
-flink_job_server_jar,
-'--flink-master',
-'[local]',
-'--flink-conf-dir',
-cls.conf_dir,
-'--artifacts-dir',
-tmp_dir,
-'--job-port',
-str(job_port),
-'--artifact-port',
-'0',
-'--expansion-port',
-str(expansion_port),
-]
-  finally:
-rmtree(tmp_dir)
-
-@classmethod
-def get_runner(cls):
-  return portable_runner.PortableRunner()
-
-@classmethod
-def get_expansion_service(cls):
-  # TODO Move expansion address resides into PipelineOptions
-  return 'localhost:%s' % cls.expansion_port
-
-def create_options(self):
-  options = super(FlinkRunnerTest, self).create_o

[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463346402



##
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#   --environment_type=DOCKER"] \
+# [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  # --flink_job_server_jar=/path/to/job_server.jar \
-  # --environment_type=docker \
-  # --extra_experiments=beam_experiments \
-  # [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-  '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-  '--streaming',
-  default=False,
-  action='store_true',
-  help='Job type. batch or streaming')
-  parser.add_argument(
-  '--environment_type',
-  default='loopback',
-  help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-  '--extra_experiments',
-  default=[],
-  action='append',
-  help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-  known_args.flink_job_server_jar or
-  job_server.JavaJarJobServer.path_to_beam_jar(
-  'runners:flink:%s:job-server:shadowJar' %
-  FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-  known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-_use_grpc = True
-_use_subprocesses = True
-
-conf_dir = None
-expansion_port = None
-
-@classmethod
-def tearDownClass(cls):
-  if cls.conf_dir and exists(cls.conf_dir):
-_LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-rmtree(cls.conf_dir)
-  super(FlinkRunnerTest, cls).tearDownClass()
-
-@classmethod
-def _create_conf_dir(cls):
-  """Create (and save a static reference to) a "conf dir", used to provide
-   metrics configs and verify metrics output
-
-   It gets cleaned up when the suite is done executing"""
-
-  if hasattr(cls, 'conf_dir'):
-cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-# path for a FileReporter to write metrics to
-cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-# path to write Flink configuration to
-conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-with open(conf_path, 'w') as f:
-  f.write(
-  linesep.join([
-  'metrics.reporters: file',
-  'metrics.reporter.file.class: %s' % file_reporter,
-  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-  'metrics.scope.operator: ',
-  ]))
-
-@classmethod
-def _subprocess_command(cls, job_port, expansion_port):
-  # will be cleaned up at the end of this method, and recreated and used by
-  # the job server
-  tmp_dir = mkdtemp(prefix='flinktest')
-
-  cls._create_conf_dir()
-  cls.expansion_port = expansion_port
-
-  try:
-return [
-'java',
-'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-'-jar',
-flink_job_server_jar,
-'--flink-master',
-'[local]',
-'--flink-conf-dir',
-cls.conf_dir,
-'--artifacts-dir',
-tmp_dir,
-'--job-port',
-str(job_port),
-'--artifact-port',
-'0',
-'--expansion-port',
-str(expansion_port),
-]
-  finally:
-rmtree(tmp_dir)
-
-@classmethod
-def get_runner(cls):
-  return portable_runner.PortableRunner()
-
-@classmethod
-def get_expansion_service(cls):
-  # TODO Move expansion address resides into PipelineOptions
-  return 'localhost:%s' % cls.expansion_port
-
-def create_options(self):
-  options = super(FlinkRunnerTest, self).create_o

[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

2020-07-30 Thread GitBox


ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463345888



##
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+# [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar 
\
+#   --environment_type=DOCKER"] \
+# [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  # --flink_job_server_jar=/path/to/job_server.jar \
-  # --environment_type=docker \
-  # --extra_experiments=beam_experiments \
-  # [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-  '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-  '--streaming',
-  default=False,
-  action='store_true',
-  help='Job type. batch or streaming')
-  parser.add_argument(
-  '--environment_type',
-  default='loopback',
-  help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-  '--extra_experiments',
-  default=[],
-  action='append',
-  help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-  known_args.flink_job_server_jar or
-  job_server.JavaJarJobServer.path_to_beam_jar(
-  'runners:flink:%s:job-server:shadowJar' %
-  FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-  known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-_use_grpc = True
-_use_subprocesses = True
-
-conf_dir = None
-expansion_port = None
-
-@classmethod
-def tearDownClass(cls):
-  if cls.conf_dir and exists(cls.conf_dir):
-_LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-rmtree(cls.conf_dir)
-  super(FlinkRunnerTest, cls).tearDownClass()
-
-@classmethod
-def _create_conf_dir(cls):
-  """Create (and save a static reference to) a "conf dir", used to provide
-   metrics configs and verify metrics output
-
-   It gets cleaned up when the suite is done executing"""
-
-  if hasattr(cls, 'conf_dir'):
-cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-# path for a FileReporter to write metrics to
-cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-# path to write Flink configuration to
-conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-with open(conf_path, 'w') as f:
-  f.write(
-  linesep.join([
-  'metrics.reporters: file',
-  'metrics.reporter.file.class: %s' % file_reporter,
-  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-  'metrics.scope.operator: ',
-  ]))
-
-@classmethod
-def _subprocess_command(cls, job_port, expansion_port):
-  # will be cleaned up at the end of this method, and recreated and used by
-  # the job server
-  tmp_dir = mkdtemp(prefix='flinktest')
-
-  cls._create_conf_dir()
-  cls.expansion_port = expansion_port
-
-  try:
-return [
-'java',
-'-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-'-jar',
-flink_job_server_jar,
-'--flink-master',
-'[local]',
-'--flink-conf-dir',
-cls.conf_dir,
-'--artifacts-dir',
-tmp_dir,
-'--job-port',
-str(job_port),
-'--artifact-port',
-'0',
-'--expansion-port',
-str(expansion_port),
-]
-  finally:
-rmtree(tmp_dir)
-
-@classmethod
-def get_runner(cls):
-  return portable_runner.PortableRunner()
-
-@classmethod
-def get_expansion_service(cls):
-  # TODO Move expansion address resides into PipelineOptions
-  return 'localhost:%s' % cls.expansion_port
-
-def create_options(self):
-  options = super(FlinkRunnerTest, self).create_o