Hi Lydian,
I'm not sure about this one. Can you please clarify - the logs are logs
of TaskManager, right? Can you please share the command line options you
pass to the Pipeline?
I'm not sure why the SDK harness would need the job server, that seems
strange. Can you let the python x-lang transform start its own expansion
service using the default expansion service? That would mean you have to:
a) pack the expansion service with dependencies into single jar
(shadow jar)
b) create something like get_expansion_service function in [1]
c) passing this expansion service to the ReadFromKafka(...,
expansion_service=get_expansion_service())
That should start the expansion service locally where you run your
python main method and then submit the job.
Jan
[1]
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
On 9/20/22 23:26, Lydian wrote:
Thanks Jan! I am now building my taskmanager with
```
COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/
/opt/apache/beam/
```
and start the expansion service with:
```
java -cp /opt/apache/beam/jars/*
org.apache.beam.sdk.expansion.service.ExpansionService 8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=PROCESS
--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/boot\"}
```
But now I got another error which seems related to staging artifact:
```
2022/09/20 20:56:35 Initializing java harness:
/opt/apache/beam/java_boot --id=1-1 --provision_endpoint=localhost:33487
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-ru
nners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/3-external_1beam:env:proces-jaccess
-oq6STrCYDJnGcSOIfKmQgoBfAcDfKf-zQ3-p2ZtEQsY.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/2-external_1beam:env:proces-dnsns-F
kpviqOgSjgfQr2mi062RMyZ-gSRyWRKVwOvxTXdcFA.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/5-external_1beam:env:proces-nashorn
-y9iNNWfaBYThUJK3bCSr0D1ntOzfXi13Zp8V-pzM2h0.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/1-external_1beam:env:proces-cldrdat
a-IHVqwlrHHtgWuTtrhzSTUFawFn1x1Qi97s3jYsygE0Y.jar
2022-09-20 20:56:35,923 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/4-external_1beam:env:proces-localed
ata-xQnGOSPHoBjAh1B5aVBszDCEI8BXc7-R_L7taFUKHg8.jar
java.io.FileNotFoundException:
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-runners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
(No such fil
e or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at
org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128)
at
org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84)
at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256)
at
org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124)
at
org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
at
org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
Does anyone know which things I would also need to configure? Thanks
Sincerely,
Lydian Lee
On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský <[email protected]> wrote:
Hi Lydian,
there are two parts involved.
a) expansion service (which you run on port 8097) - this service
expands the ReadFromKafka which is Java transform
b) Java SDK environment, which is not the expansion service, it
must be some environment that is able to run the Java
ReadFromKafka transform. In flink, you can use PROCESS environment
type (e.g. [1]), but there might be other options (e.g. DOCKER),
see [2]
Hope this helps,
Jan
[1]
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
[2] https://beam.apache.org/documentation/runtime/sdk-harness-config/
On 9/20/22 10:45, Lydian wrote:
Hi,
I am using protable runner (flink) with python SDK. I am on
latest version of Beam (0.41).
The job is running on kubernetes. I launched the job manager with
sidecar container (using
image: apache/beam_flink1.14_job_server:2.41.0) to start the
expansion service with following command:
```
java
-cp /opt/apache/beam/jars/
org.apache.beam.sdk.expansion.service.ExpansionService
8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=EXTERNAL
--defaultEnvironmentConfig=localhost:8097
```
In the code I am doing:
```
ReadFromKafka(
consumer_config={
"bootstrap.servers": 'BROKER',
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.jaas.config":
f'org.apache.kafka.common.security.scram.ScramLoginModule
required username="{sasl_username}" password="{sasl_password}";',
},
topics=[self.options.topic],
with_metadata=False,
expansion_service="localhost:8097"
)
```
But it shows with error:
```
2022-09-20 08:36:36,549 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
Source: Impulse -> [3]Reading message from
kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
KafkaIO.ReadSourceDescriptors} (1/1)
(da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to
FAILED on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal
(dataPort=43553).
org.apache.flink.util.SerializedThrowable:
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
UNIMPLEMENTED: Method not found:
org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
~[?:?]
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
~[?:?]
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
~[?:?]
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
~[?:?]
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
~[?:?]
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
~[?:?]
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
~[?:?]
at
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
~[?:?]
at
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
~[?:?]
at
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
~[?:?]
at
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
~[?:?]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist_2.12-1.14.5.jar:1.14.5]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
``` Does anyone know how I could fix this issue? Thanks!
Sincerely,
Lydian Lee