Hi Jeremy,
the fix for expansion service enables specifying ExperimentalOptions [1]
and PortablePipelineOptions [2], so you can specify default environment
for the expansion. However ... Java SDK harness does not have the
"work_pool" implementation that python SDK harness has. So I think that
the correct environment for the expansion would be either DOCKER (which
is a pain in kubernetes) or PROCESS - that requires building custom
flink docker image for TaskManager that includes the binaries from beam
Java SDK image (/opt/apache/beam).
I didn't test if EMBEDDED environment would work as well, you might try
it. That would mean that the expansion will be completely inlined inside
the TaskManager process.
Jan
[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
On 8/26/21 3:14 PM, Jeremy Lewi wrote:
HI Jan,
That's very helpful. Do you have a timeline for 2.33? Until then I
will try building from source.
So if I understand correctly. Using 2.33, the solution would be to set
the use_deprecated_read flag until the issues with SDFs[1] are fixed?
Does the fix for 3 allow specifying a different environment for
different languages? When running in Kubernetes, I think the
preferred solution is to use two side car containers one running the
python SDK harness and the other running the java SDK harness. So the
environment config would need to be different for the two languages.
Thanks
J
J
On Thu, Aug 26, 2021 at 3:14 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Jeremy,
unfortunately, there are several bugs affecting KafkaIO with
Python on FlinkRunner in current releases.
a) there are some limitations to portable SDF support on Flink [1]
b) the use_deprecated_read flag cannot be passed to
ExpansionService, that is fixed for upcoming 2.32.0 in [2]
c) primitive Read transform needed for the use_deprecated_read
flag to work is not working properly until 2.33.0, fix was merged
just yesterday, see [3]
Unfortunately, there are no known workarounds, if you can build
beam from sources, you can try building it from the currently cut
release branch 'release-2.33.0'. It would require to build both
java and python SDKs. The alternative would be to wait for the
release 2.33.0 to come out.
Hope this helps, if you had any more questions, I'd be glad to help.
Jan
[1] https://issues.apache.org/jira/browse/BEAM-11998
<https://issues.apache.org/jira/browse/BEAM-11998>
[2] https://issues.apache.org/jira/browse/BEAM-12538
<https://issues.apache.org/jira/browse/BEAM-12538>
[3] https://issues.apache.org/jira/browse/BEAM-12704
<https://issues.apache.org/jira/browse/BEAM-12704>
On 8/26/21 2:36 AM, Jeremy Lewi wrote:
Is this the same issue as in this thread
https://lists.apache.org/[email protected]:2021-5
<https://lists.apache.org/[email protected]:2021-5>
about specifying the environment to be used in cross-language
transforms.
Is the problem in the taskmanager or expansion service? Are there
environment variables I can override to force it to use an
external environment so I can use a sidecar for the Java SDK harness?
Thanks
J
On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <[email protected]
<mailto:[email protected]>> wrote:
It is likely that the expansion service is returning a graph
segment saying you execute KafkaIO within a docker
environment which is what Flink is trying to do.
On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi
<[email protected] <mailto:[email protected]>> wrote:
Hi Folks,
So I tried putting the beam job server on the Flink
JobManager and Taskmanager containers and setting
classloader.resolve-order
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order>to
parent-first.
My taskmanager's are now crashing because it looks like
the Kafka IO transform is trying to run docker and it
can't because its running in a K8s pod. Logs attached.
Why would KafkaIO try to launch docker? Does this have
something to do with the expansion service. The docs
<https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines>
make
it seem like the expansion service only runs at job
submission time and only needs to be accessible from the
machine where you are running your python program to
submit the job.
Thanks
J
On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi
<[email protected] <mailto:[email protected]>> wrote:
Hi Luke,
Thanks. I've attached the full stack trace. When I
reran it gave me an error about a different class.
I checked the beam job server jar and as far as I can
tell the classes are present. So seems like a
potential issue with the classpath or staging of JARs
on the task managers.
Does anyone happen to know how jars get staged onto
Flink taskmanagers? On the jobmanager I was able to
locate the jar in a /tmp directory but I couldn't
figure out how it was getting staged on taskmanagers.
I tried baking the job server jar into the flink
containers. That gave me an IllegalAccessError. I
assume per the Flink Docs
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order>
this
is indicating a dependency conflict between the
system JARs and the application JARs.
With the portable runner is there anyway to disable
uploading of the JAR and instead rely on the JARs
being baked into the docker container?
Thanks
J
On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik
<[email protected] <mailto:[email protected]>> wrote:
Both those classes exist in
beam-vendor-grpc-1_36_0-0.1.jar:
lcwik@lcwik:~/Downloads$ jar tf
beam-vendor-grpc-1_36_0-0.1.jar | grep Hpack
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class
lcwik@lcwik:~/Downloads$ jar tf
beam-vendor-grpc-1_36_0-0.1.jar | grep
DnsNameResolver
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class
Java has a tendency to only report the full cause
on the first failure of this kind with all
subsequent failures only reporting the
ClassNotFoundException. This happens because the
ClassLoader remembers which classes failed and
doesn't try loading them again.
Is there more of the stack trace pointing out the
actual cause associated with the first time
this exception occurred?
On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi
<[email protected]
<mailto:[email protected]>> wrote:
Hi Folks,
I'm trying to run Beam Python 2.31 on Flink
1.13.
I've created a simple streaming program to
count Kafka messages. Running on the
DirectRunner this works fine. But when I try
to submit to my Flink cluster. I get the
exception below in my taskmanager.
I'm using the PortableRunner. Any suggestions
on how to fix or debug this?
Running programs that don't use Kafka works.
Thanks
J
WARNING: An illegal reflective access
operation has occurred
WARNING: Illegal reflective access by
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
(file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar)
to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to
the maintainers of
org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable
warnings of further illegal reflective access
operations
WARNING: All illegal access operations will
be denied in a future release
Aug 24, 2021 11:17:12 PM
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2
uncaughtException
SEVERE: [Channel<55>: (localhost:50000)]
Uncaught exception in the
SynchronizationContext. Panic!
java.lang.NoClassDefFoundError:
org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73)
at
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815)
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException:
org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder
at
java.base/java.net.URLClassLoader.findClass(Unknown
Source)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
... 17 more
Exception in thread "grpc-default-executor-0"
java.lang.NoClassDefFoundError:
org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1
at
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException:
org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1
at
java.base/java.net.URLClassLoader.findClass(Unknown
Source)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at
java.base/java.lang.ClassLoader.loadClass(Unknown
Source)
... 4 more