Is this the same issue as in this thread
https://lists.apache.org/[email protected]:2021-5
about specifying the environment to be used in cross-language transforms.

Is the problem in the taskmanager or expansion service? Are there
environment variables I can override to force it to use an external
environment so I can use a sidecar for the Java SDK harness?

Thanks
J






On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <[email protected]> wrote:

> It is likely that the expansion service is returning a graph segment
> saying you execute KafkaIO within a docker environment which is what Flink
> is trying to do.
>
> On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi <[email protected]> wrote:
>
>> Hi Folks,
>>
>> So I tried putting the beam job server on the Flink JobManager and
>> Taskmanager containers and setting classloader.resolve-order
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order>
>> to parent-first.
>>
>> My taskmanager's are now crashing because it looks like the Kafka IO
>> transform is trying to run docker and it can't because its running in a K8s
>> pod. Logs attached.
>>
>> Why would KafkaIO try to launch docker? Does this have something to do
>> with the expansion service. The docs
>> <https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines>
>>  make
>> it seem like the expansion service only runs at job submission time and
>> only needs to be accessible from the machine where you are running your
>> python program to submit the job.
>>
>> Thanks
>> J
>>
>> On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi <[email protected]>
>> wrote:
>>
>>> Hi Luke,
>>>
>>> Thanks. I've attached the full stack trace. When I reran it gave me an
>>> error about a different class.
>>>
>>> I checked the beam job server jar and as far as I can tell the classes
>>> are present. So seems like a potential issue with the classpath or staging
>>> of JARs on the task managers.
>>>
>>> Does anyone happen to know how jars get staged onto Flink taskmanagers?
>>> On the jobmanager I was able to locate the jar in a /tmp directory but I
>>> couldn't figure out how it was getting staged on taskmanagers.
>>>
>>> I tried baking the job server jar into the flink containers. That gave
>>> me an IllegalAccessError. I assume per the Flink Docs
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order>
>>>  this
>>> is indicating a dependency conflict between the system JARs and the
>>> application JARs.
>>>
>>> With the portable runner is there anyway to disable uploading of the JAR
>>> and instead rely on the JARs being baked into the docker container?
>>>
>>> Thanks
>>> J
>>>
>>> On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik <[email protected]> wrote:
>>>
>>>> Both those classes exist in beam-vendor-grpc-1_36_0-0.1.jar:
>>>>
>>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar | grep
>>>> Hpack
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class
>>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar | grep
>>>> DnsNameResolver
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class
>>>>
>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class
>>>>
>>>> Java has a tendency to only report the full cause on the first failure
>>>> of this kind with all subsequent failures only reporting the
>>>> ClassNotFoundException. This happens because the ClassLoader remembers
>>>> which classes failed and doesn't try loading them again.
>>>>
>>>> Is there more of the stack trace pointing out the actual cause
>>>> associated with the first time this exception occurred?
>>>>
>>>>
>>>> On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Folks,
>>>>>
>>>>> I'm trying to run Beam Python 2.31 on Flink 1.13.
>>>>>
>>>>> I've created a simple streaming program to count Kafka messages.
>>>>> Running on the DirectRunner this works fine. But when I try to submit to 
>>>>> my
>>>>> Flink cluster. I get the exception below in my taskmanager.
>>>>>
>>>>> I'm using the PortableRunner. Any suggestions on how to fix or debug
>>>>> this?
>>>>>
>>>>> Running programs that don't use Kafka works.
>>>>>
>>>>> Thanks
>>>>> J
>>>>>
>>>>> WARNING: An illegal reflective access operation has occurred
>>>>>
>>>>> WARNING: Illegal reflective access by
>>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
>>>>> (file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar) to method
>>>>> java.nio.DirectByteBuffer.cleaner()
>>>>>
>>>>> WARNING: Please consider reporting this to the maintainers of
>>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
>>>>>
>>>>> WARNING: Use --illegal-access=warn to enable warnings of further
>>>>> illegal reflective access operations
>>>>>
>>>>> WARNING: All illegal access operations will be denied in a future
>>>>> release
>>>>>
>>>>> Aug 24, 2021 11:17:12 PM
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2
>>>>> uncaughtException
>>>>>
>>>>> SEVERE: [Channel<55>: (localhost:50000)] Uncaught exception in the
>>>>> SynchronizationContext. Panic!
>>>>>
>>>>> java.lang.NoClassDefFoundError:
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815)
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333)
>>>>>
>>>>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>>>>> Source)
>>>>>
>>>>> at
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
>>>>> Source)
>>>>>
>>>>> at java.base/java.lang.Thread.run(Unknown Source)
>>>>>
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder
>>>>>
>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>>>
>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>
>>>>> at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>>>
>>>>> at
>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>
>>>>> at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>
>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>
>>>>> ... 17 more
>>>>>
>>>>>
>>>>> Exception in thread "grpc-default-executor-0"
>>>>> java.lang.NoClassDefFoundError:
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1
>>>>>
>>>>> at
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339)
>>>>>
>>>>> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
>>>>> Source)
>>>>>
>>>>> at
>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
>>>>> Source)
>>>>>
>>>>> at java.base/java.lang.Thread.run(Unknown Source)
>>>>>
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1
>>>>>
>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>>>
>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>
>>>>> at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>>>
>>>>> at
>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>
>>>>> at
>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>
>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>
>>>>> ... 4 more
>>>>>
>>>>

Reply via email to