Hi Lydian,

I'm not sure about this one. Can you please clarify - the logs are logs of TaskManager, right? Can you please share the command line options you pass to the Pipeline?

I'm not sure why the SDK harness would need the job server, that seems strange. Can you let the python x-lang transform start its own expansion service using the default expansion service? That would mean you have to:

 a) pack the expansion service with dependencies into single jar (shadow jar)

 b) create something like get_expansion_service function in [1]

 c) passing this expansion service to the ReadFromKafka(..., expansion_service=get_expansion_service())

That should start the expansion service locally where you run your python main method and then submit the job.

  Jan

[1] https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py

On 9/20/22 23:26, Lydian wrote:
Thanks Jan! I am now building my taskmanager with
```
COPY --from=apache/beam_java8_sdk:2.41.0 /opt/apache/beam/ /opt/apache/beam/
```
and start the expansion service with:
```
java -cp /opt/apache/beam/jars/* org.apache.beam.sdk.expansion.service.ExpansionService 8097
--javaClassLookupAllowlistFile=*
--defaultEnvironmentType=PROCESS
--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/boot\"}
```
But now I got another error which seems related to staging artifact:
```
2022/09/20 20:56:35 Initializing java harness: /opt/apache/beam/java_boot --id=1-1 --provision_endpoint=localhost:33487 2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem                       [] - opening file /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-ru
nners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem                       [] - opening file /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/3-external_1beam:env:proces-jaccess
-oq6STrCYDJnGcSOIfKmQgoBfAcDfKf-zQ3-p2ZtEQsY.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem                       [] - opening file /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/2-external_1beam:env:proces-dnsns-F
kpviqOgSjgfQr2mi062RMyZ-gSRyWRKVwOvxTXdcFA.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem                       [] - opening file /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/5-external_1beam:env:proces-nashorn
-y9iNNWfaBYThUJK3bCSr0D1ntOzfXi13Zp8V-pzM2h0.jar
2022-09-20 20:56:35,922 DEBUG org.apache.beam.sdk.io.LocalFileSystem                       [] - opening file /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/1-external_1beam:env:proces-cldrdat
a-IHVqwlrHHtgWuTtrhzSTUFawFn1x1Qi97s3jYsygE0Y.jar
2022-09-20 20:56:35,923 DEBUG org.apache.beam.sdk.io.LocalFileSystem                       [] - opening file /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/4-external_1beam:env:proces-localed
ata-xQnGOSPHoBjAh1B5aVBszDCEI8BXc7-R_L7taFUKHg8.jar
java.io.FileNotFoundException: /tmp/beam-artifact-staging/0d0db4641fb81a65612a1322f110d045af5b04fc864669ba4b4a93779cd0ced1/6-external_1beam:env:proces-beam-runners-flink-job-server-k70RnE5z6gAhYmm-_6mK6Qgftuj4uScSj7D-ZM0Siy (No such fil
e or directory)
        at java.io.FileInputStream.open0(Native Method)
        at java.io.FileInputStream.open(FileInputStream.java:195)
        at java.io.FileInputStream.<init>(FileInputStream.java:138)
        at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:128)         at org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:84)
        at org.apache.beam.sdk.io.FileSystems.open(FileSystems.java:256)
        at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:124)         at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)         at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:315)         at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)         at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)         at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)         at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)         at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)         at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:340)         at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)         at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)         at org.apache.beam.vendor.grpc.v1p43p2.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
```

Does anyone know which things I would also need to configure? Thanks


Sincerely,
Lydian Lee



On Tue, Sep 20, 2022 at 1:57 AM Jan Lukavský <[email protected]> wrote:

    Hi Lydian,

    there are two parts involved.

     a) expansion service (which you run on port 8097) - this service
    expands the ReadFromKafka which is Java transform

     b) Java SDK environment, which is not the expansion service, it
    must be some environment that is able to run the Java
    ReadFromKafka transform. In flink, you can use PROCESS environment
    type (e.g. [1]), but there might be other options (e.g. DOCKER),
    see [2]

    Hope this helps,

     Jan

    [1]
    
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter6/src/main/python/beam_utils.py

    [2] https://beam.apache.org/documentation/runtime/sdk-harness-config/

    On 9/20/22 10:45, Lydian wrote:
    Hi,
    I am using protable runner (flink) with python SDK.  I am on
    latest version of Beam (0.41).
    The job is running on kubernetes. I launched the job manager with
    sidecar container (using
    image: apache/beam_flink1.14_job_server:2.41.0) to start the
    expansion service with following command:
    ```
    java
    -cp /opt/apache/beam/jars/
    org.apache.beam.sdk.expansion.service.ExpansionService
    8097
    --javaClassLookupAllowlistFile=*
    --defaultEnvironmentType=EXTERNAL
    --defaultEnvironmentConfig=localhost:8097
    ```
    In the code I am doing:
    ```
    ReadFromKafka(
    consumer_config={
    "bootstrap.servers": 'BROKER',
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "SCRAM-SHA-512",
    "sasl.jaas.config":
    f'org.apache.kafka.common.security.scram.ScramLoginModule
    required username="{sasl_username}" password="{sasl_password}";',
    },
    topics=[self.options.topic],
    with_metadata=False,
    expansion_service="localhost:8097"
    )
    ```
    But it shows with error:
    ```
    2022-09-20 08:36:36,549 INFO
    org.apache.flink.runtime.executiongraph.ExecutionGraph [] -
    Source: Impulse -> [3]Reading message from
    
kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaSDF/{ParDo(GenerateKafkaSourceDescriptor),
    KafkaIO.ReadSourceDescriptors} (1/1)
    (da27593d9232b9781fa1db3fd49d228e) switched from INITIALIZING to
    FAILED on 10.0.69.250:35101-76f99c @ ip-10-0-69-250.ec2.internal
    (dataPort=43553).
    org.apache.flink.util.SerializedThrowable:
    org.apache.beam.vendor.grpc.v1p43p2.io.grpc.StatusRuntimeException:
    UNIMPLEMENTED: Method not found:
    org.apache.beam.model.fn_execution.v1.BeamFnExternalWorkerPool/StartWorker
    at
    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
    ~[?:?]
    at
    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    ~[?:?]
    at
    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
    ~[?:?]
    at
    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    ~[?:?]
    at
    
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
    ~[?:?]
    at
    
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
    ~[?:?]
    at
    
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
    ~[?:?]
    at
    
org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
    ~[?:?]
    at
    
org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
    ~[?:?]
    at
    
org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:202)
    ~[?:?]
    at
    
org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator.open(ExecutableStageDoFnOperator.java:249)
    ~[?:?]
    at
    
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:110)
    ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
    ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
    ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
    ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at
    
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
    ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at
    
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at
    org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
    ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    ~[flink-dist_2.12-1.14.5.jar:1.14.5]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_292]
    ``` Does anyone know how I could fix this issue? Thanks!
    Sincerely,
    Lydian Lee

Reply via email to