Thanks Jan! I am now building my taskmanager with
```
COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/ /opt/apache/beam/
```
and start the expansion service with:
```
java -cp /opt/apache/beam/jars/*
org.apache.beam.sdk.expansion.service.ExpansionService 8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=PROCESS
--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/boot\"}
```
But now I got another error which seems related to staging artifact:
```
2022/09/20 20:56:35 Initializing java harness: /opt/apache/beam/java_boot
--id=1-1 --provision_endpoint=localhost:33487
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-ru
nners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/3-external_1beam:env:proces-jaccess
-oq6STrCYDJnGcSOIfKmQgoBfAcDfKf-zQ3-p2ZtEQsY.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/2-external_1beam:env:proces-dnsns-F
kpviqOgSjgfQr2mi062RMyZ-gSRyWRKVwOvxTXdcFA.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/5-external_1beam:env:proces-nashorn
-y9iNNWfaBYThUJK3bCSr0D1ntOzfXi13Zp8V-pzM2h0.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/1-external_1beam:env:proces-cldrdat
a-IHVqwlrHHtgWuTtrhzSTUFawFn1x1Qi97s3jYsygE0Y.jar
2022-09-20 20:56:35,923 DEBUG org.apache.beam.sdk.io.LocalFileSystem
[] - opening file
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/4-external_1beam:env:proces-localed
ata-xQnGOSPHoBjAh1B5aVBszDCEI8BXc7-R_L7taFUKHg8.jar
java.io.FileNotFoundException:
/tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-runners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
(No such fil
e or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.<init>(FileInputStream.java:138)
at
org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128)
at
org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84)
at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256)
at
org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124)
at
org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
at
org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at
org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
Does anyone know which things I would also need to configure? Thanks
Sincerely,
Lydian Lee
On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský <[email protected]> wrote:
> Hi Lydian,
>
> there are two parts involved.
>
> a) expansion service (which you run on port 8097) - this service expands
> the ReadFromKafka which is Java transform
>
> b) Java SDK environment, which is not the expansion service, it must be
> some environment that is able to run the Java ReadFromKafka transform. In
> flink, you can use PROCESS environment type (e.g. [1]), but there might be
> other options (e.g. DOCKER), see [2]
>
> Hope this helps,
>
> Jan
>
> [1]
> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py
>
> [2] https://beam.apache.org/documentation/runtime/sdk-harness-config/
> On 9/20/22 10:45, Lydian wrote:
>
> Hi,
> I am using protable runner (flink) with python SDK. I am on latest
> version of Beam (0.41).
> The job is running on kubernetes. I launched the job manager with sidecar
> container (using image: apache/beam_flink1.14_job_server:2.41.0) to start
> the expansion service with following command:
> ```
> java
> -cp /opt/apache/beam/jars/
> org.apache.beam.sdk.expansion.service.ExpansionService
> 8097
> --javaClassLookupAllowlistFile=*
> --defaultEnvironmentType=EXTERNAL
> --defaultEnvironmentConfig=localhost:8097
> ```
> In the code I am doing:
> ```
> ReadFromKafka(
> consumer_config={
> "bootstrap.servers": 'BROKER',
> "security.protocol": "SASL_SSL",
> "sasl.mechanism": "SCRAM-SHA-512",
> "sasl.jaas.config":
> f'org.apache.kafka.common.security.scram.ScramLoginModule required
> username="{sasl_username}" password="{sasl_password}";',
> },
> topics=[self.options.topic],
> with_metadata=False,
> expansion_service="localhost:8097"
> )
> ```
> But it shows with error:
> ```
> 2022-09-20 08:36:36,549 INFO org.apache.flink.runtime.executiongraph.
> ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/
> KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(
> GenerateKafkaSourceDescriptor), KafkaIO.ReadSourceDescriptors} (1/1)
> (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to FAILED on
> 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal (dataPort=43553).
> org.apache.flink.util.SerializedThrowable:
> org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
> UNIMPLEMENTED: Method not found: org.apache.beam.model.fn_execution.v1.
> BeamFnExternalWorkerPool/StartWorker
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
> LocalCache$Segment.get(LocalCache.java:2050) ~[?:?]
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
> LocalCache.get(LocalCache.java:3952) ~[?:?]
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
> LocalCache.getOrLoad(LocalCache.java:3974) ~[?:?]
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
> LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[?:?]
> at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.
> LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[?:?]
> at org.apache.beam.runners.fnexecution.control.
> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
> DefaultJobBundleFactory.java:451) ~[?:?]
> at org.apache.beam.runners.fnexecution.control.
> DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(
> DefaultJobBundleFactory.java:436) ~[?:?]
> at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory
> .forStage(DefaultJobBundleFactory.java:303) ~[?:?]
> at org.apache.beam.runners.fnexecution.control.
> DefaultExecutableStageContext.getStageBundleFactory(
> DefaultExecutableStageContext.java:38) ~[?:?]
> at org.apache.beam.runners.fnexecution.control.
> ReferenceCountingExecutableStageContextFactory$WrappedContext
> .getStageBundleFactory(ReferenceCountingExecutableStageContextFactory
> .java:202) ~[?:?]
> at org.apache.beam.runners.flink.translation.wrappers.streaming.
> ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
> ~[?:?]
> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
> .initializeStateAndOpenOperators(RegularOperatorChain.java:110)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(
> StreamTask.java:711) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(
> StreamTaskActionExecutor.java:100) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
> StreamTask.java:687) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
> .java:654) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
> Task.java:958) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
> 927) ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> ~[flink-dist_2.12-1.14.5.jar:1.14.5]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
> ``` Does anyone know how I could fix this issue? Thanks!
> Sincerely,
> Lydian Lee
>
>