Is this the same issue as in this thread https://lists.apache.org/[email protected]:2021-5 about specifying the environment to be used in cross-language transforms.
Is the problem in the taskmanager or expansion service? Are there environment variables I can override to force it to use an external environment so I can use a sidecar for the Java SDK harness? Thanks J On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <[email protected]> wrote: > It is likely that the expansion service is returning a graph segment > saying you execute KafkaIO within a docker environment which is what Flink > is trying to do. > > On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi <[email protected]> wrote: > >> Hi Folks, >> >> So I tried putting the beam job server on the Flink JobManager and >> Taskmanager containers and setting classloader.resolve-order >> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order> >> to parent-first. >> >> My taskmanager's are now crashing because it looks like the Kafka IO >> transform is trying to run docker and it can't because its running in a K8s >> pod. Logs attached. >> >> Why would KafkaIO try to launch docker? Does this have something to do >> with the expansion service. The docs >> <https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines> >> make >> it seem like the expansion service only runs at job submission time and >> only needs to be accessible from the machine where you are running your >> python program to submit the job. >> >> Thanks >> J >> >> On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi <[email protected]> >> wrote: >> >>> Hi Luke, >>> >>> Thanks. I've attached the full stack trace. When I reran it gave me an >>> error about a different class. >>> >>> I checked the beam job server jar and as far as I can tell the classes >>> are present. So seems like a potential issue with the classpath or staging >>> of JARs on the task managers. >>> >>> Does anyone happen to know how jars get staged onto Flink taskmanagers? >>> On the jobmanager I was able to locate the jar in a /tmp directory but I >>> couldn't figure out how it was getting staged on taskmanagers. >>> >>> I tried baking the job server jar into the flink containers. That gave >>> me an IllegalAccessError. I assume per the Flink Docs >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order> >>> this >>> is indicating a dependency conflict between the system JARs and the >>> application JARs. >>> >>> With the portable runner is there anyway to disable uploading of the JAR >>> and instead rely on the JARs being baked into the docker container? >>> >>> Thanks >>> J >>> >>> On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik <[email protected]> wrote: >>> >>>> Both those classes exist in beam-vendor-grpc-1_36_0-0.1.jar: >>>> >>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar | grep >>>> Hpack >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class >>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar | grep >>>> DnsNameResolver >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class >>>> >>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class >>>> >>>> Java has a tendency to only report the full cause on the first failure >>>> of this kind with all subsequent failures only reporting the >>>> ClassNotFoundException. This happens because the ClassLoader remembers >>>> which classes failed and doesn't try loading them again. >>>> >>>> Is there more of the stack trace pointing out the actual cause >>>> associated with the first time this exception occurred? >>>> >>>> >>>> On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi <[email protected]> >>>> wrote: >>>> >>>>> Hi Folks, >>>>> >>>>> I'm trying to run Beam Python 2.31 on Flink 1.13. >>>>> >>>>> I've created a simple streaming program to count Kafka messages. >>>>> Running on the DirectRunner this works fine. But when I try to submit to >>>>> my >>>>> Flink cluster. I get the exception below in my taskmanager. >>>>> >>>>> I'm using the PortableRunner. Any suggestions on how to fix or debug >>>>> this? >>>>> >>>>> Running programs that don't use Kafka works. >>>>> >>>>> Thanks >>>>> J >>>>> >>>>> WARNING: An illegal reflective access operation has occurred >>>>> >>>>> WARNING: Illegal reflective access by >>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil >>>>> (file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar) to method >>>>> java.nio.DirectByteBuffer.cleaner() >>>>> >>>>> WARNING: Please consider reporting this to the maintainers of >>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil >>>>> >>>>> WARNING: Use --illegal-access=warn to enable warnings of further >>>>> illegal reflective access operations >>>>> >>>>> WARNING: All illegal access operations will be denied in a future >>>>> release >>>>> >>>>> Aug 24, 2021 11:17:12 PM >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2 >>>>> uncaughtException >>>>> >>>>> SEVERE: [Channel<55>: (localhost:50000)] Uncaught exception in the >>>>> SynchronizationContext. Panic! >>>>> >>>>> java.lang.NoClassDefFoundError: >>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815) >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333) >>>>> >>>>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >>>>> Source) >>>>> >>>>> at >>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >>>>> Source) >>>>> >>>>> at java.base/java.lang.Thread.run(Unknown Source) >>>>> >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder >>>>> >>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source) >>>>> >>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>> >>>>> at >>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) >>>>> >>>>> at >>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>>>> >>>>> at >>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>>> >>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>> >>>>> ... 17 more >>>>> >>>>> >>>>> Exception in thread "grpc-default-executor-0" >>>>> java.lang.NoClassDefFoundError: >>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1 >>>>> >>>>> at >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339) >>>>> >>>>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >>>>> Source) >>>>> >>>>> at >>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >>>>> Source) >>>>> >>>>> at java.base/java.lang.Thread.run(Unknown Source) >>>>> >>>>> Caused by: java.lang.ClassNotFoundException: >>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1 >>>>> >>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source) >>>>> >>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>> >>>>> at >>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) >>>>> >>>>> at >>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>>>> >>>>> at >>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>>> >>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>> >>>>> ... 4 more >>>>> >>>>
