HI Jan,

That's very helpful. Do you have a timeline for 2.33? Until then I will try
building from source.

So if I understand correctly. Using 2.33, the solution would be to set the
use_deprecated_read flag until the issues with SDFs[1]  are fixed?

Does the fix for 3 allow specifying a different environment for different
languages?  When running in Kubernetes, I think the preferred solution is
to use two side car containers one running the python SDK harness and the
other running the java SDK harness. So the environment config would need to
be different for the two languages.

Thanks
J




J

On Thu, Aug 26, 2021 at 3:14 AM Jan Lukavský <[email protected]> wrote:

> Hi Jeremy,
>
> unfortunately, there are several bugs affecting KafkaIO with Python on
> FlinkRunner in current releases.
>
>  a) there are some limitations to portable SDF support on Flink [1]
>
>  b) the use_deprecated_read flag cannot be passed to ExpansionService,
> that is fixed for upcoming 2.32.0 in [2]
>
>  c) primitive Read transform needed for the use_deprecated_read flag to
> work is not working properly until 2.33.0, fix was merged just yesterday,
> see [3]
> Unfortunately, there are no known workarounds, if you can build beam from
> sources, you can try building it from the currently cut release branch
> 'release-2.33.0'. It would require to build both java and python SDKs. The
> alternative would be to wait for the release 2.33.0 to come out.
>
> Hope this helps, if you had any more questions, I'd be glad to help.
>
>  Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-11998
>
> [2] https://issues.apache.org/jira/browse/BEAM-12538
>
> [3] https://issues.apache.org/jira/browse/BEAM-12704
> On 8/26/21 2:36 AM, Jeremy Lewi wrote:
>
> Is this the same issue as in this thread
> https://lists.apache.org/[email protected]:2021-5
> about specifying the environment to be used in cross-language transforms.
>
> Is the problem in the taskmanager or expansion service? Are there
> environment variables I can override to force it to use an external
> environment so I can use a sidecar for the Java SDK harness?
>
> Thanks
> J
>
>
>
>
>
>
> On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <[email protected]> wrote:
>
>> It is likely that the expansion service is returning a graph segment
>> saying you execute KafkaIO within a docker environment which is what Flink
>> is trying to do.
>>
>> On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi <[email protected]>
>> wrote:
>>
>>> Hi Folks,
>>>
>>> So I tried putting the beam job server on the Flink JobManager and
>>> Taskmanager containers and setting classloader.resolve-order
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order>
>>> to parent-first.
>>> My taskmanager's are now crashing because it looks like the Kafka IO
>>> transform is trying to run docker and it can't because its running in a K8s
>>> pod. Logs attached.
>>>
>>> Why would KafkaIO try to launch docker? Does this have something to do
>>> with the expansion service. The docs
>>> <https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines>
>>>  make
>>> it seem like the expansion service only runs at job submission time and
>>> only needs to be accessible from the machine where you are running your
>>> python program to submit the job.
>>>
>>> Thanks
>>> J
>>>
>>> On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi <[email protected]>
>>> wrote:
>>>
>>>> Hi Luke,
>>>>
>>>> Thanks. I've attached the full stack trace. When I reran it gave me an
>>>> error about a different class.
>>>>
>>>> I checked the beam job server jar and as far as I can tell the classes
>>>> are present. So seems like a potential issue with the classpath or staging
>>>> of JARs on the task managers.
>>>>
>>>> Does anyone happen to know how jars get staged onto Flink taskmanagers?
>>>> On the jobmanager I was able to locate the jar in a /tmp directory but I
>>>> couldn't figure out how it was getting staged on taskmanagers.
>>>>
>>>> I tried baking the job server jar into the flink containers. That gave
>>>> me an IllegalAccessError. I assume per the Flink Docs
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order>
>>>>  this
>>>> is indicating a dependency conflict between the system JARs and the
>>>> application JARs.
>>>>
>>>> With the portable runner is there anyway to disable uploading of the
>>>> JAR and instead rely on the JARs being baked into the docker container?
>>>>
>>>> Thanks
>>>> J
>>>>
>>>> On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> Both those classes exist in beam-vendor-grpc-1_36_0-0.1.jar:
>>>>>
>>>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar |
>>>>> grep Hpack
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class
>>>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar |
>>>>> grep DnsNameResolver
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class
>>>>>
>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class
>>>>>
>>>>> Java has a tendency to only report the full cause on the first failure
>>>>> of this kind with all subsequent failures only reporting the
>>>>> ClassNotFoundException. This happens because the ClassLoader remembers
>>>>> which classes failed and doesn't try loading them again.
>>>>>
>>>>> Is there more of the stack trace pointing out the actual cause
>>>>> associated with the first time this exception occurred?
>>>>>
>>>>>
>>>>> On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Folks,
>>>>>>
>>>>>> I'm trying to run Beam Python 2.31 on Flink 1.13.
>>>>>>
>>>>>> I've created a simple streaming program to count Kafka messages.
>>>>>> Running on the DirectRunner this works fine. But when I try to submit to 
>>>>>> my
>>>>>> Flink cluster. I get the exception below in my taskmanager.
>>>>>>
>>>>>> I'm using the PortableRunner. Any suggestions on how to fix or debug
>>>>>> this?
>>>>>>
>>>>>> Running programs that don't use Kafka works.
>>>>>>
>>>>>> Thanks
>>>>>> J
>>>>>>
>>>>>> WARNING: An illegal reflective access operation has occurred
>>>>>>
>>>>>> WARNING: Illegal reflective access by
>>>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
>>>>>>  (
>>>>>> file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar) to method
>>>>>> java.nio.DirectByteBuffer.cleaner()
>>>>>>
>>>>>> WARNING: Please consider reporting this to the maintainers of
>>>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
>>>>>>
>>>>>> WARNING: Use --illegal-access=warn to enable warnings of further
>>>>>> illegal reflective access operations
>>>>>>
>>>>>> WARNING: All illegal access operations will be denied in a future
>>>>>> release
>>>>>>
>>>>>> Aug 24, 2021 11:17:12 PM
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2
>>>>>> uncaughtException
>>>>>>
>>>>>> SEVERE: [Channel<55>: (localhost:50000)] Uncaught exception in the
>>>>>> SynchronizationContext. Panic!
>>>>>>
>>>>>> java.lang.NoClassDefFoundError:
>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815)
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333)
>>>>>>
>>>>>> at
>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
>>>>>> Source)
>>>>>>
>>>>>> at
>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
>>>>>> Source)
>>>>>>
>>>>>> at java.base/java.lang.Thread.run(Unknown Source)
>>>>>>
>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder
>>>>>>
>>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>>>>
>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>>
>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>>
>>>>>> ... 17 more
>>>>>>
>>>>>>
>>>>>> Exception in thread "grpc-default-executor-0"
>>>>>> java.lang.NoClassDefFoundError:
>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1
>>>>>>
>>>>>> at
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339)
>>>>>>
>>>>>> at
>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
>>>>>> Source)
>>>>>>
>>>>>> at
>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
>>>>>> Source)
>>>>>>
>>>>>> at java.base/java.lang.Thread.run(Unknown Source)
>>>>>>
>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1
>>>>>>
>>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>>>>
>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>>
>>>>>> at
>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>>
>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>>
>>>>>> ... 4 more
>>>>>>
>>>>>

Reply via email to