Hi Kyle,
As reported earlier, LOOPBACK with Portable Runner/Job Server works fine.
Further to that, I tried PortableRunner with additional options as follows:
"--runner=PortableRunner",
"--job_endpoint=embed",
"--environment_config=apache/beam_python3.6_sdk"
And I get an error message (see attachment) similar to what I get with Spark
and Flink Runners where clusters are external.
thanks,
Buvana
________________________________
From: Ramanan, Buvana (Nokia - US/Murray Hill)
<[email protected]>
Sent: Thursday, May 28, 2020 11:47 PM
To: [email protected] <[email protected]>
Subject: Re: HDFS I/O with Beam on Spark and Flink runners - consistent Error
messages
Hello Kyle,
That works. Produces the expected output.
-Buvana
________________________________
From: Kyle Weaver <[email protected]>
Sent: Thursday, May 28, 2020 9:19 PM
To: [email protected] <[email protected]>
Subject: Re: HDFS I/O with Beam on Spark and Flink runners - consistent Error
messages
Hi Buvana,
I suspect this is a bug. If you can try running your pipeline again with these
changes:
1. Remove `--spark-master-url spark://YYYYYYYY:7077` from your Docker run
command.
2. Add `--environment_type=LOOPBACK` to your pipeline options.
It will help us confirm the cause of the issue.
On Thu, May 28, 2020 at 7:12 PM Ramanan, Buvana (Nokia - US/Murray Hill)
<[email protected]<mailto:[email protected]>>
wrote:
Kyle, Max, All,
I am desperately trying to get Beam working on at least one of the runners of
Flink or Spark. Facing failures in both cases with similar message.
Flink runner issue (Beam v 2.19.0) was reported yesterday with a permalink:
https://lists.apache.org/thread.html/r4977083014eb2d252710ad24ed32d5ff3c402ba161e7b36328a3bd87%40%3Cuser.beam.apache.org%3E
Also came across this related discussion:
https://lists.apache.org/thread.html/a9b6f019b22a65640c272a9497a3c9cc34d68cc2b5c1c9bdebc7ff38%40%3Cuser.beam.apache.org%3E
I get a similar error message with Spark Runner as I got with the Flink Runner
(although its now the newer version of Beam). I paste my environment details,
code and the error message below. Code runs fine on Direct Runner.
HADOOP_CONF_DIR is configured aptly before running Spark Master and Slave.
I hope to make some headway soon. Please help – may be I have to downgrade to a
lower version of Beam where this issue did not exist; if so, plmk the version #
Thank you,
Regards,
Buvana
Spark Runner scenario:
Beam version 2.21.0 on both the client end and the Job server ends.
Docker Spark Job Server:
https://hub.docker.com/r/apache/beam_spark_job_server
docker run --net=host apache/beam_spark_job_server:latest --job-host XXXXXXX
--job-port 8099 --spark-master-url spark://YYYYYYYY:7077
Client code:
options = PipelineOptions([
"--hdfs_host=ZZZZZZZZZ",
"--hdfs_user=hdfs",
"--hdfs_port=50070",
"--runner=PortableRunner",
"--job_endpoint=XXXXXXXXX:8099"
])
p = beam.Pipeline(options=options)
input_file_hdfs = "hdfs://user/buvana/manifest"
lines = p | 'ReadMyFile' >> beam.io.ReadFromText(input_file_hdfs)
res = lines | "WriteMyFile" >>
beam.io.WriteToText("hdfs://user/buvana/copy-manifest", ".csv")
p.run()
Error message at the Spark Master UI:
worker/operations.py", line 195, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 670, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 671, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 963, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1045, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line
421, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 961, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 814, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/transforms/core.py",
line 1501, in <lambda>
wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/io/iobase.py",
line 1005, in <lambda>
lambda _, sink: sink.initialize_write(), self.sink)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
line 135, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line
167, in initialize_write
tmp_dir = self._create_temp_dir(file_path_prefix)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsink.py", line
172, in _create_temp_dir
base_path, last_component = FileSystems.split(file_path_prefix)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 151, in split
filesystem = FileSystems.get_filesystem(path)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 113, in get_filesystem
return systems[0](pipeline_options=options)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
line 114, in __init__
raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set [while running
'WriteMyFile/Write/WriteImpl/InitializeWrite']
WARNING:root:Make sure that locally built Python SDK docker image has Python
3.6 interpreter.
Using default tag: latest
latest: Pulling from apache/beam_python3.6_sdk
Digest: sha256:48bd82920212ce2acea17d142048aa1c667f47c82b35c04b134df4638d7b8926
Status: Image is up to date for apache/beam_python3.6_sdk:latest
docker.io/apache/beam_python3.6_sdk:latest
ERROR:root:('info', pipeline_options {
fields {
key: "beam:option:artifact_port:v1"
value {
string_value: "0"
}
}
fields {
key: "beam:option:dataflow_endpoint:v1"
value {
string_value: "https://dataflow.googleapis.com"
}
}
fields {
key: "beam:option:direct_num_workers:v1"
value {
string_value: "1"
}
}
fields {
key: "beam:option:direct_runner_bundle_repeat:v1"
value {
string_value: "0"
}
}
fields {
key: "beam:option:direct_runner_use_stacked_bundle:v1"
value {
bool_value: true
}
}
fields {
key: "beam:option:direct_running_mode:v1"
value {
string_value: "in_memory"
}
}
fields {
key: "beam:option:dry_run:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:enable_streaming_engine:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:environment_cache_millis:v1"
value {
string_value: "0"
}
}
fields {
key: "beam:option:environment_config:v1"
value {
string_value: "apache/beam_python3.6_sdk"
}
}
fields {
key: "beam:option:expansion_port:v1"
value {
string_value: "0"
}
}
fields {
key: "beam:option:experiments:v1"
value {
list_value {
values {
string_value: "beam_fn_api"
}
}
}
}
fields {
key: "beam:option:flink_master:v1"
value {
string_value: "[auto]"
}
}
fields {
key: "beam:option:flink_submit_uber_jar:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:flink_version:v1"
value {
string_value: "1.10"
}
}
fields {
key: "beam:option:hdfs_full_urls:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:hdfs_host:v1"
value {
string_value: "ZZZZZZZZZZ"
}
}
fields {
key: "beam:option:hdfs_port:v1"
value {
string_value: "50070"
}
}
fields {
key: "beam:option:hdfs_user:v1"
value {
string_value: "hdfs"
}
}
fields {
key: "beam:option:job_endpoint:v1"
value {
string_value: "embed"
}
}
fields {
key: "beam:option:job_port:v1"
value {
string_value: "0"
}
}
fields {
key: "beam:option:job_server_timeout:v1"
value {
string_value: "60"
}
}
fields {
key: "beam:option:no_auth:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:pipeline_type_check:v1"
value {
bool_value: true
}
}
fields {
key: "beam:option:profile_cpu:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:profile_memory:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:profile_sample_rate:v1"
value {
number_value: 1.0
}
}
fields {
key: "beam:option:runtime_type_check:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:save_main_session:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:sdk_location:v1"
value {
string_value: "container"
}
}
fields {
key: "beam:option:sdk_worker_parallelism:v1"
value {
string_value: "1"
}
}
fields {
key: "beam:option:spark_master_url:v1"
value {
string_value: "local[4]"
}
}
fields {
key: "beam:option:spark_submit_uber_jar:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:streaming:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:type_check_strictness:v1"
value {
string_value: "DEFAULT_TO_ANY"
}
}
fields {
key: "beam:option:update:v1"
value {
bool_value: false
}
}
}
retrieval_token:
"/tmp/tmpzjfxi5nc/b6aaf7812dbef9ccaef8f57e980b40b1005b6dc63f2966bd74d25ab66f9a6b37/MANIFEST"
, 'context', <grpc._server._Context object at 0x7fb61f745d30>)
ERROR:root:('info', pipeline_options {
fields {
key: "beam:option:artifact_port:v1"
value {
string_value: "0"
}
}
fields {
key: "beam:option:dataflow_endpoint:v1"
value {
string_value: "https://dataflow.googleapis.com"
}
}
fields {
key: "beam:option:direct_num_workers:v1"
value {
string_value: "1"
}
}
fields {
key: "beam:option:direct_runner_bundle_repeat:v1"
value {
string_value: "0"
}
}
fields {
key: "beam:option:direct_runner_use_stacked_bundle:v1"
value {
bool_value: true
}
}
fields {
key: "beam:option:direct_running_mode:v1"
value {
string_value: "in_memory"
}
}
fields {
key: "beam:option:dry_run:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:enable_streaming_engine:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:environment_cache_millis:v1"
value {
string_value: "0"
}
}
fields {
key: "beam:option:environment_config:v1"
value {
string_value: "apache/beam_python3.6_sdk"
}
}
fields {
key: "beam:option:expansion_port:v1"
value {
string_value: "0"
}
}
fields {
key: "beam:option:experiments:v1"
value {
list_value {
values {
string_value: "beam_fn_api"
}
}
}
}
fields {
key: "beam:option:flink_master:v1"
value {
string_value: "[auto]"
}
}
fields {
key: "beam:option:flink_submit_uber_jar:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:flink_version:v1"
value {
string_value: "1.10"
}
}
fields {
key: "beam:option:hdfs_full_urls:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:hdfs_host:v1"
value {
string_value: "ZZZZZZZZZZ"
}
}
fields {
key: "beam:option:hdfs_port:v1"
value {
string_value: "50070"
}
}
fields {
key: "beam:option:hdfs_user:v1"
value {
string_value: "hdfs"
}
}
fields {
key: "beam:option:job_endpoint:v1"
value {
string_value: "embed"
}
}
fields {
key: "beam:option:job_port:v1"
value {
string_value: "0"
}
}
fields {
key: "beam:option:job_server_timeout:v1"
value {
string_value: "60"
}
}
fields {
key: "beam:option:no_auth:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:pipeline_type_check:v1"
value {
bool_value: true
}
}
fields {
key: "beam:option:profile_cpu:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:profile_memory:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:profile_sample_rate:v1"
value {
number_value: 1.0
}
}
fields {
key: "beam:option:runtime_type_check:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:save_main_session:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:sdk_location:v1"
value {
string_value: "container"
}
}
fields {
key: "beam:option:sdk_worker_parallelism:v1"
value {
string_value: "1"
}
}
fields {
key: "beam:option:spark_master_url:v1"
value {
string_value: "local[4]"
}
}
fields {
key: "beam:option:spark_submit_uber_jar:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:streaming:v1"
value {
bool_value: false
}
}
fields {
key: "beam:option:type_check_strictness:v1"
value {
string_value: "DEFAULT_TO_ANY"
}
}
fields {
key: "beam:option:update:v1"
value {
bool_value: false
}
}
}
retrieval_token:
"/tmp/tmpzjfxi5nc/b6aaf7812dbef9ccaef8f57e980b40b1005b6dc63f2966bd74d25ab66f9a6b37/MANIFEST"
logging_endpoint {
url: "localhost:40674"
}
artifact_endpoint {
url: "localhost:39934"
}
control_endpoint {
url: "localhost:39934"
}
, 'worker_id', 'worker_0')
WARNING:root:severity: WARN
timestamp {
seconds: 1590724880
nanos: 840057611
}
message: "No session file found: /tmp/staged/pickled_main_session. Functions
defined in __main__ (interactive session) may fail."
log_location:
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker_main.py:240"
thread: "MainThread"
WARNING:root:severity: WARN
timestamp {
seconds: 1590724880
nanos: 844359397
}
message: "Discarding unparseable args: [\'--direct_runner_use_stacked_bundle\',
\'--job_server_timeout=60\', \'--pipeline_type_check\']"
log_location:
"/usr/local/lib/python3.6/site-packages/apache_beam/options/pipeline_options.py:309"
thread: "MainThread"
ERROR:root:severity: ERROR
timestamp {
seconds: 1590724880
nanos: 958515882
}
message: "Error processing instruction bundle_1. Original traceback
is\nTraceback (most recent call last):\n File
\"apache_beam/runners/common.py\", line 961, in
apache_beam.runners.common.DoFnRunner.process\n File
\"apache_beam/runners/common.py\", line 726, in
apache_beam.runners.common.PerWindowInvoker.invoke_process\n File
\"apache_beam/runners/common.py\", line 812, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window\n File
\"apache_beam/runners/common.py\", line 1095, in
apache_beam.runners.common._OutputProcessor.process_outputs\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\",
line 1395, in process\n element)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py\", line 1448,
in initial_restriction\n range_tracker =
self._source.get_range_tracker(None, None)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\",
line 210, in get_range_tracker\n return
self._get_concat_source().get_range_tracker(\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py\",
line 135, in _f\n return fnc(self, *args, **kwargs)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\",
line 145, in _get_concat_source\n match_result =
FileSystems.match([pattern])[0]\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line
203, in match\n filesystem = FileSystems.get_filesystem(patterns[0])\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line
113, in get_filesystem\n return systems[0](pipeline_options=options)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py\",
line 114, in __init__\n raise ValueError(\'pipeline_options is not
set\')\nValueError: pipeline_options is not set\n\nDuring handling of the above
exception, another exception occurred:\n\nTraceback (most recent call last):\n
File
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\",
line 245, in _execute\n response = task()\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\",
line 302, in <lambda>\n lambda:
self.create_worker().do_instruction(request), request)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\",
line 471, in do_instruction\n getattr(request, request_type),
request.instruction_id)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py\",
line 506, in process_bundle\n
bundle_processor.process_bundle(instruction_id))\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\",
line 972, in process_bundle\n element.data)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\",
line 218, in process_encoded\n self.output(decoded_value)\n File
\"apache_beam/runners/worker/operations.py\", line 330, in
apache_beam.runners.worker.operations.Operation.output\n File
\"apache_beam/runners/worker/operations.py\", line 332, in
apache_beam.runners.worker.operations.Operation.output\n File
\"apache_beam/runners/worker/operations.py\", line 195, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive\n File
\"apache_beam/runners/worker/operations.py\", line 670, in
apache_beam.runners.worker.operations.DoOperation.process\n File
\"apache_beam/runners/worker/operations.py\", line 671, in
apache_beam.runners.worker.operations.DoOperation.process\n File
\"apache_beam/runners/common.py\", line 963, in
apache_beam.runners.common.DoFnRunner.process\n File
\"apache_beam/runners/common.py\", line 1045, in
apache_beam.runners.common.DoFnRunner._reraise_augmented\n File
\"/usr/local/lib/python3.6/site-packages/future/utils/__init__.py\", line 421,
in raise_with_traceback\n raise exc.with_traceback(traceback)\n File
\"apache_beam/runners/common.py\", line 961, in
apache_beam.runners.common.DoFnRunner.process\n File
\"apache_beam/runners/common.py\", line 726, in
apache_beam.runners.common.PerWindowInvoker.invoke_process\n File
\"apache_beam/runners/common.py\", line 812, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window\n File
\"apache_beam/runners/common.py\", line 1095, in
apache_beam.runners.common._OutputProcessor.process_outputs\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py\",
line 1395, in process\n element)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py\", line 1448,
in initial_restriction\n range_tracker =
self._source.get_range_tracker(None, None)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\",
line 210, in get_range_tracker\n return
self._get_concat_source().get_range_tracker(\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py\",
line 135, in _f\n return fnc(self, *args, **kwargs)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py\",
line 145, in _get_concat_source\n match_result =
FileSystems.match([pattern])[0]\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line
203, in match\n filesystem = FileSystems.get_filesystem(patterns[0])\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py\", line
113, in get_filesystem\n return systems[0](pipeline_options=options)\n File
\"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py\",
line 114, in __init__\n raise ValueError(\'pipeline_options is not
set\')\nValueError: pipeline_options is not set [while running
\'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction\']\n\n"
instruction_id: "bundle_1"
log_location:
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py:252"
thread: "Thread-13"
0f9c70c36c0a24caaa8fd6f4fd60e33168c25e63a79d4deff94712399c248266
ERROR:apache_beam.runners.portability.local_job_service:Error running pipeline.
Traceback (most recent call last):
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/local_job_service.py",
line 280, in _run_job
self._pipeline_proto)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 189, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 335, in run_stages
bundle_context_manager,
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 545, in _run_stage
expected_timer_output)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1051, in process_bundle
for result, split_result in executor.map(execute, part_inputs):
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 586, in
result_iterator
yield fs.pop().result()
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in
__get_result
raise self._exception
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
line 44, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1048, in execute
part_map, expected_outputs, fired_timers, expected_output_timers)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 986, in process_bundle
raise RuntimeError(result.error)
RuntimeError: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 961, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 812, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1095, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1395, in process
element)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line
1448, in initial_restriction
range_tracker = self._source.get_range_tracker(None, None)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 210, in get_range_tracker
return self._get_concat_source().get_range_tracker(
File
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
line 135, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 145, in _get_concat_source
match_result = FileSystems.match([pattern])[0]
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 203, in match
filesystem = FileSystems.get_filesystem(patterns[0])
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 113, in get_filesystem
return systems[0](pipeline_options=options)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
line 114, in __init__
raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 245, in _execute
response = task()
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 471, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 506, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 972, in process_bundle
element.data)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 218, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 670, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 671, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 963, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1045, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line
421, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 961, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 812, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1095, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1395, in process
element)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line
1448, in initial_restriction
range_tracker = self._source.get_range_tracker(None, None)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 210, in get_range_tracker
return self._get_concat_source().get_range_tracker(
File
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
line 135, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 145, in _get_concat_source
match_result = FileSystems.match([pattern])[0]
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 203, in match
filesystem = FileSystems.get_filesystem(patterns[0])
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 113, in get_filesystem
return systems[0](pipeline_options=options)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
line 114, in __init__
raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set [while running
'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
ERROR:apache_beam.runners.portability.local_job_service:<module 'traceback'
from '/usr/lib64/python3.6/traceback.py'>
Traceback (most recent call last):
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/local_job_service.py",
line 280, in _run_job
self._pipeline_proto)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 189, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 335, in run_stages
bundle_context_manager,
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 545, in _run_stage
expected_timer_output)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1051, in process_bundle
for result, split_result in executor.map(execute, part_inputs):
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 586, in
result_iterator
yield fs.pop().result()
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in
__get_result
raise self._exception
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
line 44, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1048, in execute
part_map, expected_outputs, fired_timers, expected_output_timers)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 986, in process_bundle
raise RuntimeError(result.error)
RuntimeError: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 961, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 812, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1095, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1395, in process
element)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line
1448, in initial_restriction
range_tracker = self._source.get_range_tracker(None, None)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 210, in get_range_tracker
return self._get_concat_source().get_range_tracker(
File
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
line 135, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 145, in _get_concat_source
match_result = FileSystems.match([pattern])[0]
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 203, in match
filesystem = FileSystems.get_filesystem(patterns[0])
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 113, in get_filesystem
return systems[0](pipeline_options=options)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
line 114, in __init__
raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 245, in _execute
response = task()
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 471, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 506, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 972, in process_bundle
element.data)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 218, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 670, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 671, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 963, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1045, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line
421, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 961, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 812, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1095, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1395, in process
element)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line
1448, in initial_restriction
range_tracker = self._source.get_range_tracker(None, None)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 210, in get_range_tracker
return self._get_concat_source().get_range_tracker(
File
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
line 135, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 145, in _get_concat_source
match_result = FileSystems.match([pattern])[0]
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 203, in match
filesystem = FileSystems.get_filesystem(patterns[0])
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 113, in get_filesystem
return systems[0](pipeline_options=options)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
line 114, in __init__
raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set [while running
'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib64/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib64/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/local_job_service.py",
line 280, in _run_job
self._pipeline_proto)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 189, in run_via_runner_api
return self.run_stages(stage_context, stages)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 335, in run_stages
bundle_context_manager,
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 545, in _run_stage
expected_timer_output)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1051, in process_bundle
for result, split_result in executor.map(execute, part_inputs):
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 586, in
result_iterator
yield fs.pop().result()
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "/usr/lib64/python3.6/concurrent/futures/_base.py", line 384, in
__get_result
raise self._exception
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/utils/thread_pool_executor.py",
line 44, in run
self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 1048, in execute
part_map, expected_outputs, fired_timers, expected_output_timers)
File
"/home/tfs/venv_beam_2.21.0/lib/python3.6/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py",
line 986, in process_bundle
raise RuntimeError(result.error)
RuntimeError: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 961, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 812, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1095, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1395, in process
element)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line
1448, in initial_restriction
range_tracker = self._source.get_range_tracker(None, None)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 210, in get_range_tracker
return self._get_concat_source().get_range_tracker(
File
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
line 135, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 145, in _get_concat_source
match_result = FileSystems.match([pattern])[0]
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 203, in match
filesystem = FileSystems.get_filesystem(patterns[0])
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 113, in get_filesystem
return systems[0](pipeline_options=options)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
line 114, in __init__
raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 245, in _execute
response = task()
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 302, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 471, in do_instruction
getattr(request, request_type), request.instruction_id)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 506, in process_bundle
bundle_processor.process_bundle(instruction_id))
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 972, in process_bundle
element.data)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 218, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 670, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 671, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 963, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1045, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line
421, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 961, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 812, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1095, in
apache_beam.runners.common._OutputProcessor.process_outputs
File
"/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1395, in process
element)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/iobase.py", line
1448, in initial_restriction
range_tracker = self._source.get_range_tracker(None, None)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 210, in get_range_tracker
return self._get_concat_source().get_range_tracker(
File
"/usr/local/lib/python3.6/site-packages/apache_beam/options/value_provider.py",
line 135, in _f
return fnc(self, *args, **kwargs)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/filebasedsource.py",
line 145, in _get_concat_source
match_result = FileSystems.match([pattern])[0]
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 203, in match
filesystem = FileSystems.get_filesystem(patterns[0])
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/filesystems.py",
line 113, in get_filesystem
return systems[0](pipeline_options=options)
File
"/usr/local/lib/python3.6/site-packages/apache_beam/io/hadoopfilesystem.py",
line 114, in __init__
raise ValueError('pipeline_options is not set')
ValueError: pipeline_options is not set [while running
'ReadMyFile/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction']