Hi Jeremy,
unfortunately, there are several bugs affecting KafkaIO with Python on
FlinkRunner in current releases.
a) there are some limitations to portable SDF support on Flink [1]
b) the use_deprecated_read flag cannot be passed to ExpansionService,
that is fixed for upcoming 2.32.0 in [2]
c) primitive Read transform needed for the use_deprecated_read flag to
work is not working properly until 2.33.0, fix was merged just
yesterday, see [3]
Unfortunately, there are no known workarounds, if you can build beam
from sources, you can try building it from the currently cut release
branch 'release-2.33.0'. It would require to build both java and python
SDKs. The alternative would be to wait for the release 2.33.0 to come out.
Hope this helps, if you had any more questions, I'd be glad to help.
Jan
[1] https://issues.apache.org/jira/browse/BEAM-11998
[2] https://issues.apache.org/jira/browse/BEAM-12538
[3] https://issues.apache.org/jira/browse/BEAM-12704
On 8/26/21 2:36 AM, Jeremy Lewi wrote:
Is this the same issue as in this thread
https://lists.apache.org/[email protected]:2021-5
<https://lists.apache.org/[email protected]:2021-5>
about specifying the environment to be used in cross-language transforms.
Is the problem in the taskmanager or expansion service? Are there
environment variables I can override to force it to use an external
environment so I can use a sidecar for the Java SDK harness?
Thanks
J
On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <[email protected]
<mailto:[email protected]>> wrote:
It is likely that the expansion service is returning a graph
segment saying you execute KafkaIO within a docker environment
which is what Flink is trying to do.
On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi <[email protected]
<mailto:[email protected]>> wrote:
Hi Folks,
So I tried putting the beam job server on the Flink JobManager
and Taskmanager containers and setting
classloader.resolve-order
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order>to
parent-first.
My taskmanager's are now crashing because it looks like the
Kafka IO transform is trying to run docker and it can't
because its running in a K8s pod. Logs attached.
Why would KafkaIO try to launch docker? Does this have
something to do with the expansion service. The docs
<https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines>
make
it seem like the expansion service only runs at job submission
time and only needs to be accessible from the machine where
you are running your python program to submit the job.
Thanks
J
On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi
<[email protected] <mailto:[email protected]>> wrote:
Hi Luke,
Thanks. I've attached the full stack trace. When I reran
it gave me an error about a different class.
I checked the beam job server jar and as far as I can tell
the classes are present. So seems like a potential issue
with the classpath or staging of JARs on the task managers.
Does anyone happen to know how jars get staged onto Flink
taskmanagers? On the jobmanager I was able to locate the
jar in a /tmp directory but I couldn't figure out how it
was getting staged on taskmanagers.
I tried baking the job server jar into the flink
containers. That gave me an IllegalAccessError. I assume
per the Flink Docs
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order>
this
is indicating a dependency conflict between the system
JARs and the application JARs.
With the portable runner is there anyway to disable
uploading of the JAR and instead rely on the JARs being
baked into the docker container?
Thanks
J
On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik
<[email protected] <mailto:[email protected]>> wrote:
Both those classes exist in
beam-vendor-grpc-1_36_0-0.1.jar:
lcwik@lcwik:~/Downloads$ jar tf
beam-vendor-grpc-1_36_0-0.1.jar | grep Hpack
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class
lcwik@lcwik:~/Downloads$ jar tf
beam-vendor-grpc-1_36_0-0.1.jar | grep DnsNameResolver
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class
Java has a tendency to only report the full cause on
the first failure of this kind with all subsequent
failures only reporting the ClassNotFoundException.
This happens because the ClassLoader remembers which
classes failed and doesn't try loading them again.
Is there more of the stack trace pointing out the
actual cause associated with the first time
this exception occurred?
On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi
<[email protected] <mailto:[email protected]>>
wrote:
Hi Folks,
I'm trying to run Beam Python 2.31 on Flink 1.13.
I've created a simple streaming program to count
Kafka messages. Running on the DirectRunner this
works fine. But when I try to submit to my Flink
cluster. I get the exception below in my taskmanager.
I'm using the PortableRunner. Any suggestions on
how to fix or debug this?
Running programs that don't use Kafka works.
Thanks
J
WARNING: An illegal reflective access operation
has occurred
WARNING: Illegal reflective access by
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
(file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar)
to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the
maintainers of
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable
warnings of further illegal reflective access
operations
WARNING: All illegal access operations will be
denied in a future release
Aug 24, 2021 11:17:12 PM
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2
uncaughtException
SEVERE: [Channel<55>: (localhost:50000)] Uncaught
exception in the SynchronizationContext. Panic!
java.lang.NoClassDefFoundError:
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73)
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException:
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder
at
java.base/java.net.URLClassLoader.findClass(Unknown
Source)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
... 17 more
Exception in thread "grpc-default-executor-0"
java.lang.NoClassDefFoundError:
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException:
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1
at
java.base/java.net.URLClassLoader.findClass(Unknown
Source)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
... 4 more