Hi Folks,

Thank you so much for all your help. I was able to get this working
although I had to hack the python SDK to work around the issue with
connecting to a remote expansion service mentioned in the other thread
<https://lists.apache.org/x/thread.html/r6c02f6c80d35929a46587ac5d6662ca2e5d8997ae6adfb5902314a35@%3Cuser.beam.apache.org%3E>
.

Here's a summary of everything I had to do

   - I built from the 2.33 release branch to pick up the mentioned fixes
   - To Deal with NoClassDefFoundErrors I ended up baking the Beam job
   server Jar into the Flink workers
      - I'm still not quite sure why the Jar isn't being staged correctly
      but I'll have to dig into it further
   - I setup my taskmanagers to run docker in docker so they could use the
   docker environment for the Java SDK harness
   - I ran the expansion service as a separate service from the job server
   so that I could set the options to control the default environment and pass
   the use_deprecated_read_flag.


J

On Fri, Aug 27, 2021 at 7:16 PM Jeremy Lewi <[email protected]> wrote:

> It looks to me like https://github.com/apache/beam/pull/15082 added the
> ability configure the default environment to the main entrypoint to the
> expansion service
>
> but not to the JobServer
>
> https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L260
>
> Should I file a JIRA for this?
>
> Thanks.
> J
>
>
> On Fri, Aug 27, 2021 at 12:36 PM Jeremy Lewi <[email protected]>
> wrote:
>
>> Thank you very much for the pointers. I'm working on getting the code
>> built from 2.33 branch and trying that out.
>>
>> J
>>
>> On Thu, Aug 26, 2021 at 6:35 AM Jan Lukavský <[email protected]> wrote:
>>
>>> Hi Jeremy,
>>>
>>> the fix for expansion service enables specifying ExperimentalOptions [1]
>>> and PortablePipelineOptions [2], so you can specify default environment for
>>> the expansion. However ... Java SDK harness does not have the "work_pool"
>>> implementation that python SDK harness has. So I think that the correct
>>> environment for the expansion would be either DOCKER (which is a pain in
>>> kubernetes) or PROCESS - that requires building custom flink docker image
>>> for TaskManager that includes the binaries from beam Java SDK image
>>> (/opt/apache/beam).
>>>
>>> I didn't test if EMBEDDED environment would work as well, you might try
>>> it. That would mean that the expansion will be completely inlined inside
>>> the TaskManager process.
>>>
>>>  Jan
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java
>>>
>>> [2]
>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java
>>> On 8/26/21 3:14 PM, Jeremy Lewi wrote:
>>>
>>> HI Jan,
>>>
>>> That's very helpful. Do you have a timeline for 2.33? Until then I will
>>> try building from source.
>>>
>>> So if I understand correctly. Using 2.33, the solution would be to set
>>> the use_deprecated_read flag until the issues with SDFs[1]  are fixed?
>>>
>>> Does the fix for 3 allow specifying a different environment for
>>> different languages?  When running in Kubernetes, I think the preferred
>>> solution is to use two side car containers one running the python SDK
>>> harness and the other running the java SDK harness. So the environment
>>> config would need to be different for the two languages.
>>>
>>> Thanks
>>> J
>>>
>>>
>>>
>>>
>>> J
>>>
>>> On Thu, Aug 26, 2021 at 3:14 AM Jan Lukavský <[email protected]> wrote:
>>>
>>>> Hi Jeremy,
>>>>
>>>> unfortunately, there are several bugs affecting KafkaIO with Python on
>>>> FlinkRunner in current releases.
>>>>
>>>>  a) there are some limitations to portable SDF support on Flink [1]
>>>>
>>>>  b) the use_deprecated_read flag cannot be passed to ExpansionService,
>>>> that is fixed for upcoming 2.32.0 in [2]
>>>>
>>>>  c) primitive Read transform needed for the use_deprecated_read flag to
>>>> work is not working properly until 2.33.0, fix was merged just yesterday,
>>>> see [3]
>>>> Unfortunately, there are no known workarounds, if you can build beam
>>>> from sources, you can try building it from the currently cut release branch
>>>> 'release-2.33.0'. It would require to build both java and python SDKs. The
>>>> alternative would be to wait for the release 2.33.0 to come out.
>>>>
>>>> Hope this helps, if you had any more questions, I'd be glad to help.
>>>>
>>>>  Jan
>>>>
>>>> [1] https://issues.apache.org/jira/browse/BEAM-11998
>>>>
>>>> [2] https://issues.apache.org/jira/browse/BEAM-12538
>>>>
>>>> [3] https://issues.apache.org/jira/browse/BEAM-12704
>>>> On 8/26/21 2:36 AM, Jeremy Lewi wrote:
>>>>
>>>> Is this the same issue as in this thread
>>>> https://lists.apache.org/[email protected]:2021-5
>>>> about specifying the environment to be used in cross-language
>>>> transforms.
>>>>
>>>> Is the problem in the taskmanager or expansion service? Are there
>>>> environment variables I can override to force it to use an external
>>>> environment so I can use a sidecar for the Java SDK harness?
>>>>
>>>> Thanks
>>>> J
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <[email protected]> wrote:
>>>>
>>>>> It is likely that the expansion service is returning a graph segment
>>>>> saying you execute KafkaIO within a docker environment which is what Flink
>>>>> is trying to do.
>>>>>
>>>>> On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Folks,
>>>>>>
>>>>>> So I tried putting the beam job server on the Flink JobManager and
>>>>>> Taskmanager containers and setting classloader.resolve-order
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order>
>>>>>> to parent-first.
>>>>>> My taskmanager's are now crashing because it looks like the Kafka IO
>>>>>> transform is trying to run docker and it can't because its running in a 
>>>>>> K8s
>>>>>> pod. Logs attached.
>>>>>>
>>>>>> Why would KafkaIO try to launch docker? Does this have something to
>>>>>> do with the expansion service. The docs
>>>>>> <https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines>
>>>>>>  make
>>>>>> it seem like the expansion service only runs at job submission time and
>>>>>> only needs to be accessible from the machine where you are running your
>>>>>> python program to submit the job.
>>>>>>
>>>>>> Thanks
>>>>>> J
>>>>>>
>>>>>> On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Luke,
>>>>>>>
>>>>>>> Thanks. I've attached the full stack trace. When I reran it gave me
>>>>>>> an error about a different class.
>>>>>>>
>>>>>>> I checked the beam job server jar and as far as I can tell the
>>>>>>> classes are present. So seems like a potential issue with the classpath 
>>>>>>> or
>>>>>>> staging of JARs on the task managers.
>>>>>>>
>>>>>>> Does anyone happen to know how jars get staged onto Flink
>>>>>>> taskmanagers? On the jobmanager I was able to locate the jar in a /tmp
>>>>>>> directory but I couldn't figure out how it was getting staged on
>>>>>>> taskmanagers.
>>>>>>>
>>>>>>> I tried baking the job server jar into the flink containers. That
>>>>>>> gave me an IllegalAccessError. I assume per the Flink Docs
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order>
>>>>>>>  this
>>>>>>> is indicating a dependency conflict between the system JARs and the
>>>>>>> application JARs.
>>>>>>>
>>>>>>> With the portable runner is there anyway to disable uploading of the
>>>>>>> JAR and instead rely on the JARs being baked into the docker container?
>>>>>>>
>>>>>>> Thanks
>>>>>>> J
>>>>>>>
>>>>>>> On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik <[email protected]> wrote:
>>>>>>>
>>>>>>>> Both those classes exist in beam-vendor-grpc-1_36_0-0.1.jar:
>>>>>>>>
>>>>>>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar |
>>>>>>>> grep Hpack
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class
>>>>>>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar |
>>>>>>>> grep DnsNameResolver
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class
>>>>>>>>
>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class
>>>>>>>>
>>>>>>>> Java has a tendency to only report the full cause on the first
>>>>>>>> failure of this kind with all subsequent failures only reporting the
>>>>>>>> ClassNotFoundException. This happens because the ClassLoader remembers
>>>>>>>> which classes failed and doesn't try loading them again.
>>>>>>>>
>>>>>>>> Is there more of the stack trace pointing out the actual cause
>>>>>>>> associated with the first time this exception occurred?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Folks,
>>>>>>>>>
>>>>>>>>> I'm trying to run Beam Python 2.31 on Flink 1.13.
>>>>>>>>>
>>>>>>>>> I've created a simple streaming program to count Kafka messages.
>>>>>>>>> Running on the DirectRunner this works fine. But when I try to submit 
>>>>>>>>> to my
>>>>>>>>> Flink cluster. I get the exception below in my taskmanager.
>>>>>>>>>
>>>>>>>>> I'm using the PortableRunner. Any suggestions on how to fix or
>>>>>>>>> debug this?
>>>>>>>>>
>>>>>>>>> Running programs that don't use Kafka works.
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> J
>>>>>>>>>
>>>>>>>>> WARNING: An illegal reflective access operation has occurred
>>>>>>>>>
>>>>>>>>> WARNING: Illegal reflective access by
>>>>>>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
>>>>>>>>>  (
>>>>>>>>> file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar) to method
>>>>>>>>> java.nio.DirectByteBuffer.cleaner()
>>>>>>>>>
>>>>>>>>> WARNING: Please consider reporting this to the maintainers of
>>>>>>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
>>>>>>>>>
>>>>>>>>> WARNING: Use --illegal-access=warn to enable warnings of further
>>>>>>>>> illegal reflective access operations
>>>>>>>>>
>>>>>>>>> WARNING: All illegal access operations will be denied in a future
>>>>>>>>> release
>>>>>>>>>
>>>>>>>>> Aug 24, 2021 11:17:12 PM
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2
>>>>>>>>> uncaughtException
>>>>>>>>>
>>>>>>>>> SEVERE: [Channel<55>: (localhost:50000)] Uncaught exception in the
>>>>>>>>> SynchronizationContext. Panic!
>>>>>>>>>
>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
>>>>>>>>> Source)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
>>>>>>>>> Source)
>>>>>>>>>
>>>>>>>>> at java.base/java.lang.Thread.run(Unknown Source)
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder
>>>>>>>>>
>>>>>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>>>>>>>
>>>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>>>>>
>>>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>>>>>
>>>>>>>>> ... 17 more
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Exception in thread "grpc-default-executor-0"
>>>>>>>>> java.lang.NoClassDefFoundError:
>>>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
>>>>>>>>> Source)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
>>>>>>>>> Source)
>>>>>>>>>
>>>>>>>>> at java.base/java.lang.Thread.run(Unknown Source)
>>>>>>>>>
>>>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1
>>>>>>>>>
>>>>>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source)
>>>>>>>>>
>>>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
>>>>>>>>>
>>>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
>>>>>>>>>
>>>>>>>>> ... 4 more
>>>>>>>>>
>>>>>>>>

Reply via email to