Hi Jan,
I have not checked the harness log. I have now checked it *Apache
Beam worker log) and found this, but currently not sure what it
means:
2022/06/01 13:34:40 Python exited: <nil>
2022/06/01 13:34:41 Python exited: <nil>
Exception in thread read_grpc_client_inputs:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/threading.py", line 926, in
_bootstrap_inner
self.run()
File "/usr/local/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
line 587, in <lambda>
target=lambda: self._read_inputs(elements_iterator),
File
"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py",
line 570, in _read_inputs
for elements in elements_iterator:
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py",
line 416, in __next__
return self._next()
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py",
line 803, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous
of RPC that terminated with:
status = StatusCode.CANCELLED
details = "Multiplexer hanging up"
debug_error_string =
"{"created":"@1654090485.252525992","description":"Error received
from peer ipv4:127.0.0.1:44439
<http://127.0.0.1:44439>","file":"src/core/lib/surface/call.cc","file_line":1062,"grpc_message":"Multiplexer
hanging up","grpc_status":1}"
>
2022/06/01 13:34:45 Python exited: <nil>
2022/06/01 13:34:46 Python exited: <nil>
2022/06/01 13:34:46 Python exited: <nil>
2022/06/01 13:34:47 Python exited: <nil>
Starting worker with command ['/opt/apache/beam/boot',
'--id=3-1', '--logging_endpoint=localhost:44267',
'--artifact_endpoint=localhost:36413',
'--provision_endpoint=localhost:42179',
'--control_endpoint=localhost:38825']
Starting worker with command ['/opt/apache/beam/boot',
'--id=3-3', '--logging_endpoint=localhost:38683',
'--artifact_endpoint=localhost:44867',
'--provision_endpoint=localhost:34833',
'--control_endpoint=localhost:44351']
Starting worker with command ['/opt/apache/beam/boot',
'--id=3-2', '--logging_endpoint=localhost:35391',
'--artifact_endpoint=localhost:46571',
'--provision_endpoint=localhost:44073',
'--control_endpoint=localhost:44133']
Starting work...
On Wed, Jun 1, 2022 at 11:21 AM Jan Lukavský <[email protected]> wrote:
Hi Gorjan,
+user@beam <mailto:[email protected]>
The trace you posted is just waiting for a bundle to finish
in the SDK harness. I would suspect there is a problem in the
logs of the harness. Did you look for possible errors there?
Jan
On 5/31/22 13:54, Gorjan Todorovski wrote:
Hi,
I am running a TensorFlow Extended (TFX) pipeline which uses
Apache Beam for data processing which in turn has a Flink
Runner (Basically a batch job on a Flink Session Cluster on
Kubernetes) version 1.13.6, but the job (for gathering
stats) gets stuck.
There is nothing significant in the Job Manager or Task
Manager logs. The only thing that possibly might tell why
the task is stuck seems to be a thread dump:
"MapPartition (MapPartition at [14]{TFXIORead[train],
GenerateStatistics[train]}) (1/32)#0" Id=188 WAITING on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at sun.misc.Unsafe.park(Native Method)
- waiting on
java.util.concurrent.CompletableFuture$Signaller@6f078632
at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)
at
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at
org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
...
I use 32 parallel degrees. Task managers are set, so each TM
runs in one container with 1 CPU and a total process memory
set to 20 GB. Each TM runs 1 tasksslot.
This is failing with ~100 files with a total size of about
100 GB. If I run the pipeline with a smaller number of files
to process, it runs ok.
I need Flink to be able to process different amounts of data
as it is able to scale by automatically adding pods
depending on the parallel degree setting for the specific
job (I set the parallel degree to the max(number of files,32))
Thanks,
Gorjan