Hello Jeremy,  If you need example for deploying Flink on k8s 
https://github.com/sambvfx/beam-flink-k8s , for spark , 
https://github.com/cometta/python-apache-beam-spark

Thank you
    On Thursday, August 26, 2021, 08:37:09 AM GMT+8, Jeremy Lewi 
<[email protected]> wrote:  
 
 Is this the same issue as in this 
threadhttps://lists.apache.org/[email protected]:2021-5
about specifying the environment to be used in cross-language transforms.

Is the problem in the taskmanager or expansion service? Are there environment 
variables I can override to force it to use an external environment so I can 
use a sidecar for the Java SDK harness?
ThanksJ





On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <[email protected]> wrote:

It is likely that the expansion service is returning a graph segment saying you 
execute KafkaIO within a docker environment which is what Flink is trying to do.
On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi <[email protected]> wrote:

Hi Folks,

So I tried putting the beam job server on the Flink JobManager and Taskmanager 
containers and setting classloader.resolve-order to parent-first.
My taskmanager's are now crashing because it looks like the Kafka IO transform 
is trying to run docker and it can't because its running in a K8s pod. Logs 
attached.
Why would KafkaIO try to launch docker? Does this have something to do with the 
expansion service. The docs make it seem like the expansion service only runs 
at job submission time and only needs to be accessible from the machine where 
you are running your python program to submit the job. 
ThanksJ
On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi <[email protected]> wrote:

Hi Luke,

Thanks. I've attached the full stack trace. When I reran it gave me an error 
about a different class.
I checked the beam job server jar and as far as I can tell the classes are 
present. So seems like a potential issue with the classpath or staging of JARs 
on the task managers.
Does anyone happen to know how jars get staged onto Flink taskmanagers? On the 
jobmanager I was able to locate the jar in a /tmp directory but I couldn't 
figure out how it was getting staged on taskmanagers.
I tried baking the job server jar into the flink containers. That gave me an 
IllegalAccessError. I assume per the Flink Docs this is indicating a dependency 
conflict between the system JARs and the application JARs.
With the portable runner is there anyway to disable uploading of the JAR and 
instead rely on the JARs being baked into the docker container?
ThanksJ
On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik <[email protected]> wrote:

Both those classes exist in beam-vendor-grpc-1_36_0-0.1.jar:
lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar | grep Hpack
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class
lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar | grep 
DnsNameResolver
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class

Java has a tendency to only report the full cause on the first failure of this 
kind with all subsequent failures only reporting the ClassNotFoundException. 
This happens because the ClassLoader remembers which classes failed and doesn't 
try loading them again.

Is there more of the stack trace pointing out the actual cause associated with 
the first time this exception occurred?

On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi <[email protected]> wrote:

Hi Folks,

I'm trying to run Beam Python 2.31 on Flink 1.13. 
I've created a simple streaming program to count Kafka messages. Running on the 
DirectRunner this works fine. But when I try to submit to my Flink cluster. I 
get the exception below in my taskmanager.
I'm using the PortableRunner. Any suggestions on how to fix or debug this?
Running programs that don't use Kafka works.
ThanksJ

WARNING: An illegal reflective access operation has occurred

WARNING: Illegal reflective access by 
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil 
(file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar) to method 
java.nio.DirectByteBuffer.cleaner()

WARNING: Please consider reporting this to the maintainers of 
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil

WARNING: Use --illegal-access=warn to enable warnings of further illegal 
reflective access operations

WARNING: All illegal access operations will be denied in a future release

Aug 24, 2021 11:17:12 PM 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2 
uncaughtException

SEVERE: [Channel<55>: (localhost:50000)] Uncaught exception in the 
SynchronizationContext. Panic!

java.lang.NoClassDefFoundError: 
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815)

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333)

 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

 at java.base/java.lang.Thread.run(Unknown Source)

Caused by: java.lang.ClassNotFoundException: 
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder

 at java.base/java.net.URLClassLoader.findClass(Unknown Source)

 at java.base/java.lang.ClassLoader.loadClass(Unknown Source)

 at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)

 at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)

 at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

 at java.base/java.lang.ClassLoader.loadClass(Unknown Source)

 ... 17 more




Exception in thread "grpc-default-executor-0" java.lang.NoClassDefFoundError: 
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1

 at 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339)

 at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)

 at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

 at java.base/java.lang.Thread.run(Unknown Source)

Caused by: java.lang.ClassNotFoundException: 
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1

 at java.base/java.net.URLClassLoader.findClass(Unknown Source)

 at java.base/java.lang.ClassLoader.loadClass(Unknown Source)

 at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)

 at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)

 at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

 at java.base/java.lang.ClassLoader.loadClass(Unknown Source)

 ... 4 more





  

Reply via email to