Hi Jeremy,

the fix for expansion service enables specifying ExperimentalOptions [1] and PortablePipelineOptions [2], so you can specify default environment for the expansion. However ... Java SDK harness does not have the "work_pool" implementation that python SDK harness has. So I think that the correct environment for the expansion would be either DOCKER (which is a pain in kubernetes) or PROCESS - that requires building custom flink docker image for TaskManager that includes the binaries from beam Java SDK image (/opt/apache/beam).

I didn't test if EMBEDDED environment would work as well, you might try it. That would mean that the expansion will be completely inlined inside the TaskManager process.

 Jan

[1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java

[2] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java

On 8/26/21 3:14 PM, Jeremy Lewi wrote:
HI Jan,

That's very helpful. Do you have a timeline for 2.33? Until then I will try building from source.

So if I understand correctly. Using 2.33, the solution would be to set the use_deprecated_read flag until the issues with SDFs[1]  are fixed?

Does the fix for 3 allow specifying a different environment for different languages?  When running in Kubernetes, I think the preferred solution is to use two side car containers one running the python SDK harness and the other running the java SDK harness. So the environment config would need to be different for the two languages.

Thanks
J




J

On Thu, Aug 26, 2021 at 3:14 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Hi Jeremy,

    unfortunately, there are several bugs affecting KafkaIO with
    Python on FlinkRunner in current releases.

     a) there are some limitations to portable SDF support on Flink [1]

     b) the use_deprecated_read flag cannot be passed to
    ExpansionService, that is fixed for upcoming 2.32.0 in [2]

     c) primitive Read transform needed for the use_deprecated_read
    flag to work is not working properly until 2.33.0, fix was merged
    just yesterday, see [3]

    Unfortunately, there are no known workarounds, if you can build
    beam from sources, you can try building it from the currently cut
    release branch 'release-2.33.0'. It would require to build both
    java and python SDKs. The alternative would be to wait for the
    release 2.33.0 to come out.

    Hope this helps, if you had any more questions, I'd be glad to help.

     Jan

    [1] https://issues.apache.org/jira/browse/BEAM-11998
    <https://issues.apache.org/jira/browse/BEAM-11998>

    [2] https://issues.apache.org/jira/browse/BEAM-12538
    <https://issues.apache.org/jira/browse/BEAM-12538>

    [3] https://issues.apache.org/jira/browse/BEAM-12704
    <https://issues.apache.org/jira/browse/BEAM-12704>

    On 8/26/21 2:36 AM, Jeremy Lewi wrote:
    Is this the same issue as in this thread
    https://lists.apache.org/[email protected]:2021-5
    <https://lists.apache.org/[email protected]:2021-5>
    about specifying the environment to be used in cross-language
    transforms.

    Is the problem in the taskmanager or expansion service? Are there
    environment variables I can override to force it to use an
    external environment so I can use a sidecar for the Java SDK harness?

    Thanks
    J






    On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <[email protected]
    <mailto:[email protected]>> wrote:

        It is likely that the expansion service is returning a graph
        segment saying you execute KafkaIO within a docker
        environment which is what Flink is trying to do.

        On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi
        <[email protected] <mailto:[email protected]>> wrote:

            Hi Folks,

            So I tried putting the beam job server on the Flink
            JobManager and Taskmanager containers and setting
            classloader.resolve-order
            
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order>to
            parent-first.
            My taskmanager's are now crashing because it looks like
            the Kafka IO transform is trying to run docker and it
            can't because its running in a K8s pod. Logs attached.

            Why would KafkaIO try to launch docker? Does this have
            something to do with the expansion service. The docs
            
<https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines>
 make
            it seem like the expansion service only runs at job
            submission time and only needs to be accessible from the
            machine where you are running your python program to
            submit the job.

            Thanks
            J

            On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi
            <[email protected] <mailto:[email protected]>> wrote:

                Hi Luke,

                Thanks. I've attached the full stack trace. When I
                reran it gave me an error about a different class.

                I checked the beam job server jar and as far as I can
                tell the classes are present. So seems like a
                potential issue with the classpath or staging of JARs
                on the task managers.

                Does anyone happen to know how jars get staged onto
                Flink taskmanagers? On the jobmanager I was able to
                locate the jar in a /tmp directory but I couldn't
                figure out how it was getting staged on taskmanagers.

                I tried baking the job server jar into the flink
                containers. That gave me an IllegalAccessError. I
                assume per the Flink Docs
                
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order>
 this
                is indicating a dependency conflict between the
                system JARs and the application JARs.

                With the portable runner is there anyway to disable
                uploading of the JAR and instead rely on the JARs
                being baked into the docker container?

                Thanks
                J

                On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik
                <[email protected] <mailto:[email protected]>> wrote:

                    Both those classes exist in
                    beam-vendor-grpc-1_36_0-0.1.jar:

                    lcwik@lcwik:~/Downloads$ jar tf
                    beam-vendor-grpc-1_36_0-0.1.jar | grep Hpack
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class
                    lcwik@lcwik:~/Downloads$ jar tf
                    beam-vendor-grpc-1_36_0-0.1.jar | grep
                    DnsNameResolver
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class

                    Java has a tendency to only report the full cause
                    on the first failure of this kind with all
                    subsequent failures only reporting the
                    ClassNotFoundException. This happens because the
                    ClassLoader remembers which classes failed and
                    doesn't try loading them again.

                    Is there more of the stack trace pointing out the
                    actual cause associated with the first time
                    this exception occurred?


                    On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi
                    <[email protected]
                    <mailto:[email protected]>> wrote:

                        Hi Folks,

                        I'm trying to run Beam Python 2.31 on Flink
                        1.13.

                        I've created a simple streaming program to
                        count Kafka messages. Running on the
                        DirectRunner this works fine. But when I try
                        to submit to my Flink cluster. I get the
                        exception below in my taskmanager.

                        I'm using the PortableRunner. Any suggestions
                        on how to fix or debug this?

                        Running programs that don't use Kafka works.

                        Thanks
                        J

                        WARNING: An illegal reflective access
                        operation has occurred

                        WARNING: Illegal reflective access by
                        
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
                        (file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar)
                        to method java.nio.DirectByteBuffer.cleaner()

                        WARNING: Please consider reporting this to
                        the maintainers of
                        
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil

                        WARNING: Use --illegal-access=warn to enable
                        warnings of further illegal reflective access
                        operations

                        WARNING: All illegal access operations will
                        be denied in a future release

                        Aug 24, 2021 11:17:12 PM
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2
                        uncaughtException

                        SEVERE: [Channel<55>: (localhost:50000)]
                        Uncaught exception in the
                        SynchronizationContext. Panic!

                        java.lang.NoClassDefFoundError:
                        
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815)

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333)

                        at
                        
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
                        Source)

                        at
                        
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
                        Source)

                        at java.base/java.lang.Thread.run(Unknown Source)

                        Caused by: java.lang.ClassNotFoundException:
                        
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder

                        at
                        java.base/java.net.URLClassLoader.findClass(Unknown
                        Source)

                        at
                        java.base/java.lang.ClassLoader.loadClass(Unknown
                        Source)

                        at
                        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)

                        at
                        
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)

                        at
                        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

                        at
                        java.base/java.lang.ClassLoader.loadClass(Unknown
                        Source)

                        ... 17 more


                        Exception in thread "grpc-default-executor-0"
                        java.lang.NoClassDefFoundError:
                        
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1

                        at
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339)

                        at
                        
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
                        Source)

                        at
                        
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
                        Source)

                        at java.base/java.lang.Thread.run(Unknown Source)

                        Caused by: java.lang.ClassNotFoundException:
                        
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1

                        at
                        java.base/java.net.URLClassLoader.findClass(Unknown
                        Source)

                        at
                        java.base/java.lang.ClassLoader.loadClass(Unknown
                        Source)

                        at
                        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)

                        at
                        
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)

                        at
                        
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

                        at
                        java.base/java.lang.ClassLoader.loadClass(Unknown
                        Source)

                        ... 4 more

Reply via email to