Hi Jeremy,

unfortunately, there are several bugs affecting KafkaIO with Python on FlinkRunner in current releases.

 a) there are some limitations to portable SDF support on Flink [1]

 b) the use_deprecated_read flag cannot be passed to ExpansionService, that is fixed for upcoming 2.32.0 in [2]

 c) primitive Read transform needed for the use_deprecated_read flag to work is not working properly until 2.33.0, fix was merged just yesterday, see [3]

Unfortunately, there are no known workarounds, if you can build beam from sources, you can try building it from the currently cut release branch 'release-2.33.0'. It would require to build both java and python SDKs. The alternative would be to wait for the release 2.33.0 to come out.

Hope this helps, if you had any more questions, I'd be glad to help.

 Jan

[1] https://issues.apache.org/jira/browse/BEAM-11998

[2] https://issues.apache.org/jira/browse/BEAM-12538

[3] https://issues.apache.org/jira/browse/BEAM-12704

On 8/26/21 2:36 AM, Jeremy Lewi wrote:
Is this the same issue as in this thread
https://lists.apache.org/[email protected]:2021-5 <https://lists.apache.org/[email protected]:2021-5>
about specifying the environment to be used in cross-language transforms.

Is the problem in the taskmanager or expansion service? Are there environment variables I can override to force it to use an external environment so I can use a sidecar for the Java SDK harness?

Thanks
J






On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <[email protected] <mailto:[email protected]>> wrote:

    It is likely that the expansion service is returning a graph
    segment saying you execute KafkaIO within a docker environment
    which is what Flink is trying to do.

    On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Folks,

        So I tried putting the beam job server on the Flink JobManager
        and Taskmanager containers and setting
        classloader.resolve-order
        
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order>to
        parent-first.
        My taskmanager's are now crashing because it looks like the
        Kafka IO transform is trying to run docker and it can't
        because its running in a K8s pod. Logs attached.

        Why would KafkaIO try to launch docker? Does this have
        something to do with the expansion service. The docs
        
<https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines>
 make
        it seem like the expansion service only runs at job submission
        time and only needs to be accessible from the machine where
        you are running your python program to submit the job.

        Thanks
        J

        On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi
        <[email protected] <mailto:[email protected]>> wrote:

            Hi Luke,

            Thanks. I've attached the full stack trace. When I reran
            it gave me an error about a different class.

            I checked the beam job server jar and as far as I can tell
            the classes are present. So seems like a potential issue
            with the classpath or staging of JARs on the task managers.

            Does anyone happen to know how jars get staged onto Flink
            taskmanagers? On the jobmanager I was able to locate the
            jar in a /tmp directory but I couldn't figure out how it
            was getting staged on taskmanagers.

            I tried baking the job server jar into the flink
            containers. That gave me an IllegalAccessError. I assume
            per the Flink Docs
            
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order>
 this
            is indicating a dependency conflict between the system
            JARs and the application JARs.

            With the portable runner is there anyway to disable
            uploading of the JAR and instead rely on the JARs being
            baked into the docker container?

            Thanks
            J

            On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik
            <[email protected] <mailto:[email protected]>> wrote:

                Both those classes exist in
                beam-vendor-grpc-1_36_0-0.1.jar:

                lcwik@lcwik:~/Downloads$ jar tf
                beam-vendor-grpc-1_36_0-0.1.jar | grep Hpack
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class
                lcwik@lcwik:~/Downloads$ jar tf
                beam-vendor-grpc-1_36_0-0.1.jar | grep DnsNameResolver
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class
                
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class

                Java has a tendency to only report the full cause on
                the first failure of this kind with all subsequent
                failures only reporting the ClassNotFoundException.
                This happens because the ClassLoader remembers which
                classes failed and doesn't try loading them again.

                Is there more of the stack trace pointing out the
                actual cause associated with the first time
                this exception occurred?


                On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi
                <[email protected] <mailto:[email protected]>>
                wrote:

                    Hi Folks,

                    I'm trying to run Beam Python 2.31 on Flink 1.13.

                    I've created a simple streaming program to count
                    Kafka messages. Running on the DirectRunner this
                    works fine. But when I try to submit to my Flink
                    cluster. I get the exception below in my taskmanager.

                    I'm using the PortableRunner. Any suggestions on
                    how to fix or debug this?

                    Running programs that don't use Kafka works.

                    Thanks
                    J

                    WARNING: An illegal reflective access operation
                    has occurred

                    WARNING: Illegal reflective access by
                    
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
                    (file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar)
                    to method java.nio.DirectByteBuffer.cleaner()

                    WARNING: Please consider reporting this to the
                    maintainers of
                    
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil

                    WARNING: Use --illegal-access=warn to enable
                    warnings of further illegal reflective access
                    operations

                    WARNING: All illegal access operations will be
                    denied in a future release

                    Aug 24, 2021 11:17:12 PM
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2
                    uncaughtException

                    SEVERE: [Channel<55>: (localhost:50000)] Uncaught
                    exception in the SynchronizationContext. Panic!

                    java.lang.NoClassDefFoundError:
                    
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815)

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333)

                    at
                    
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
                    Source)

                    at
                    
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
                    Source)

                    at java.base/java.lang.Thread.run(Unknown Source)

                    Caused by: java.lang.ClassNotFoundException:
                    
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder

                    at
                    java.base/java.net.URLClassLoader.findClass(Unknown
                    Source)

                    at
                    java.base/java.lang.ClassLoader.loadClass(Unknown
                    Source)

                    at
                    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)

                    at
                    
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)

                    at
                    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

                    at
                    java.base/java.lang.ClassLoader.loadClass(Unknown
                    Source)

                    ... 17 more


                    Exception in thread "grpc-default-executor-0"
                    java.lang.NoClassDefFoundError:
                    
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1

                    at
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339)

                    at
                    
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
                    Source)

                    at
                    
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
                    Source)

                    at java.base/java.lang.Thread.run(Unknown Source)

                    Caused by: java.lang.ClassNotFoundException:
                    
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1

                    at
                    java.base/java.net.URLClassLoader.findClass(Unknown
                    Source)

                    at
                    java.base/java.lang.ClassLoader.loadClass(Unknown
                    Source)

                    at
                    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)

                    at
                    
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)

                    at
                    
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

                    at
                    java.base/java.lang.ClassLoader.loadClass(Unknown
                    Source)

                    ... 4 more

Reply via email to