It looks to me like https://github.com/apache/beam/pull/15082 added the ability configure the default environment to the main entrypoint to the expansion service
but not to the JobServer https://github.com/apache/beam/blob/master/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java#L260 Should I file a JIRA for this? Thanks. J On Fri, Aug 27, 2021 at 12:36 PM Jeremy Lewi <[email protected]> wrote: > Thank you very much for the pointers. I'm working on getting the code > built from 2.33 branch and trying that out. > > J > > On Thu, Aug 26, 2021 at 6:35 AM Jan Lukavský <[email protected]> wrote: > >> Hi Jeremy, >> >> the fix for expansion service enables specifying ExperimentalOptions [1] >> and PortablePipelineOptions [2], so you can specify default environment for >> the expansion. However ... Java SDK harness does not have the "work_pool" >> implementation that python SDK harness has. So I think that the correct >> environment for the expansion would be either DOCKER (which is a pain in >> kubernetes) or PROCESS - that requires building custom flink docker image >> for TaskManager that includes the binaries from beam Java SDK image >> (/opt/apache/beam). >> >> I didn't test if EMBEDDED environment would work as well, you might try >> it. That would mean that the expansion will be completely inlined inside >> the TaskManager process. >> >> Jan >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/ExperimentalOptions.java >> >> [2] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java >> On 8/26/21 3:14 PM, Jeremy Lewi wrote: >> >> HI Jan, >> >> That's very helpful. Do you have a timeline for 2.33? Until then I will >> try building from source. >> >> So if I understand correctly. Using 2.33, the solution would be to set >> the use_deprecated_read flag until the issues with SDFs[1] are fixed? >> >> Does the fix for 3 allow specifying a different environment for different >> languages? When running in Kubernetes, I think the preferred solution is >> to use two side car containers one running the python SDK harness and the >> other running the java SDK harness. So the environment config would need to >> be different for the two languages. >> >> Thanks >> J >> >> >> >> >> J >> >> On Thu, Aug 26, 2021 at 3:14 AM Jan Lukavský <[email protected]> wrote: >> >>> Hi Jeremy, >>> >>> unfortunately, there are several bugs affecting KafkaIO with Python on >>> FlinkRunner in current releases. >>> >>> a) there are some limitations to portable SDF support on Flink [1] >>> >>> b) the use_deprecated_read flag cannot be passed to ExpansionService, >>> that is fixed for upcoming 2.32.0 in [2] >>> >>> c) primitive Read transform needed for the use_deprecated_read flag to >>> work is not working properly until 2.33.0, fix was merged just yesterday, >>> see [3] >>> Unfortunately, there are no known workarounds, if you can build beam >>> from sources, you can try building it from the currently cut release branch >>> 'release-2.33.0'. It would require to build both java and python SDKs. The >>> alternative would be to wait for the release 2.33.0 to come out. >>> >>> Hope this helps, if you had any more questions, I'd be glad to help. >>> >>> Jan >>> >>> [1] https://issues.apache.org/jira/browse/BEAM-11998 >>> >>> [2] https://issues.apache.org/jira/browse/BEAM-12538 >>> >>> [3] https://issues.apache.org/jira/browse/BEAM-12704 >>> On 8/26/21 2:36 AM, Jeremy Lewi wrote: >>> >>> Is this the same issue as in this thread >>> https://lists.apache.org/[email protected]:2021-5 >>> about specifying the environment to be used in cross-language transforms. >>> >>> Is the problem in the taskmanager or expansion service? Are there >>> environment variables I can override to force it to use an external >>> environment so I can use a sidecar for the Java SDK harness? >>> >>> Thanks >>> J >>> >>> >>> >>> >>> >>> >>> On Wed, Aug 25, 2021 at 5:18 PM Luke Cwik <[email protected]> wrote: >>> >>>> It is likely that the expansion service is returning a graph segment >>>> saying you execute KafkaIO within a docker environment which is what Flink >>>> is trying to do. >>>> >>>> On Wed, Aug 25, 2021 at 4:26 PM Jeremy Lewi <[email protected]> >>>> wrote: >>>> >>>>> Hi Folks, >>>>> >>>>> So I tried putting the beam job server on the Flink JobManager and >>>>> Taskmanager containers and setting classloader.resolve-order >>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#classloader-resolve-order> >>>>> to parent-first. >>>>> My taskmanager's are now crashing because it looks like the Kafka IO >>>>> transform is trying to run docker and it can't because its running in a >>>>> K8s >>>>> pod. Logs attached. >>>>> >>>>> Why would KafkaIO try to launch docker? Does this have something to do >>>>> with the expansion service. The docs >>>>> <https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines> >>>>> make >>>>> it seem like the expansion service only runs at job submission time and >>>>> only needs to be accessible from the machine where you are running your >>>>> python program to submit the job. >>>>> >>>>> Thanks >>>>> J >>>>> >>>>> On Wed, Aug 25, 2021 at 12:09 PM Jeremy Lewi <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Luke, >>>>>> >>>>>> Thanks. I've attached the full stack trace. When I reran it gave me >>>>>> an error about a different class. >>>>>> >>>>>> I checked the beam job server jar and as far as I can tell the >>>>>> classes are present. So seems like a potential issue with the classpath >>>>>> or >>>>>> staging of JARs on the task managers. >>>>>> >>>>>> Does anyone happen to know how jars get staged onto Flink >>>>>> taskmanagers? On the jobmanager I was able to locate the jar in a /tmp >>>>>> directory but I couldn't figure out how it was getting staged on >>>>>> taskmanagers. >>>>>> >>>>>> I tried baking the job server jar into the flink containers. That >>>>>> gave me an IllegalAccessError. I assume per the Flink Docs >>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/debugging_classloading/#inverted-class-loading-and-classloader-resolution-order> >>>>>> this >>>>>> is indicating a dependency conflict between the system JARs and the >>>>>> application JARs. >>>>>> >>>>>> With the portable runner is there anyway to disable uploading of the >>>>>> JAR and instead rely on the JARs being baked into the docker container? >>>>>> >>>>>> Thanks >>>>>> J >>>>>> >>>>>> On Wed, Aug 25, 2021 at 9:20 AM Luke Cwik <[email protected]> wrote: >>>>>> >>>>>>> Both those classes exist in beam-vendor-grpc-1_36_0-0.1.jar: >>>>>>> >>>>>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar | >>>>>>> grep Hpack >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$1.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Http2HeadersSink.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil$IndexType.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackUtil.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$HeaderType.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDynamicTable.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanDecoder.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$1.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackStaticTable.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHeaderField.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackEncoder$HeaderEntry.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$1.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodeProcessor.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackHuffmanEncoder$EncodedLengthProcessor.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder$Sink.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder.class >>>>>>> lcwik@lcwik:~/Downloads$ jar tf beam-vendor-grpc-1_36_0-0.1.jar | >>>>>>> grep DnsNameResolver >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolverFactory.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$ResourceResolver.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$AddressResolver.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$InternalResolutionResult.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$1.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$SrvRecord.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$JdkAddressResolver.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolverProvider.class >>>>>>> >>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver.class >>>>>>> >>>>>>> Java has a tendency to only report the full cause on the first >>>>>>> failure of this kind with all subsequent failures only reporting the >>>>>>> ClassNotFoundException. This happens because the ClassLoader remembers >>>>>>> which classes failed and doesn't try loading them again. >>>>>>> >>>>>>> Is there more of the stack trace pointing out the actual cause >>>>>>> associated with the first time this exception occurred? >>>>>>> >>>>>>> >>>>>>> On Tue, Aug 24, 2021 at 4:32 PM Jeremy Lewi <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Folks, >>>>>>>> >>>>>>>> I'm trying to run Beam Python 2.31 on Flink 1.13. >>>>>>>> >>>>>>>> I've created a simple streaming program to count Kafka messages. >>>>>>>> Running on the DirectRunner this works fine. But when I try to submit >>>>>>>> to my >>>>>>>> Flink cluster. I get the exception below in my taskmanager. >>>>>>>> >>>>>>>> I'm using the PortableRunner. Any suggestions on how to fix or >>>>>>>> debug this? >>>>>>>> >>>>>>>> Running programs that don't use Kafka works. >>>>>>>> >>>>>>>> Thanks >>>>>>>> J >>>>>>>> >>>>>>>> WARNING: An illegal reflective access operation has occurred >>>>>>>> >>>>>>>> WARNING: Illegal reflective access by >>>>>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil >>>>>>>> ( >>>>>>>> file:/opt/flink/lib/flink-dist_2.12-1.13.1.jar) to method >>>>>>>> java.nio.DirectByteBuffer.cleaner() >>>>>>>> >>>>>>>> WARNING: Please consider reporting this to the maintainers of >>>>>>>> org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil >>>>>>>> >>>>>>>> WARNING: Use --illegal-access=warn to enable warnings of further >>>>>>>> illegal reflective access operations >>>>>>>> >>>>>>>> WARNING: All illegal access operations will be denied in a future >>>>>>>> release >>>>>>>> >>>>>>>> Aug 24, 2021 11:17:12 PM >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$2 >>>>>>>> uncaughtException >>>>>>>> >>>>>>>> SEVERE: [Channel<55>: (localhost:50000)] Uncaught exception in the >>>>>>>> SynchronizationContext. Panic! >>>>>>>> >>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/netty/handler/codec/http2/HpackDecoder >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:73) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.DefaultHttp2HeadersDecoder.<init>(DefaultHttp2HeadersDecoder.java:59) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.GrpcHttp2HeadersUtils$GrpcHttp2ClientHeadersDecoder.<init>(GrpcHttp2HeadersUtils.java:70) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientHandler.newHandler(NettyClientHandler.java:147) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.netty.NettyClientTransport.start(NettyClientTransport.java:230) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ForwardingConnectionClientTransport.start(ForwardingConnectionClientTransport.java:33) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.startNewTransport(InternalSubchannel.java:258) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel.access$400(InternalSubchannel.java:65) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.InternalSubchannel$2.run(InternalSubchannel.java:200) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.ManagedChannelImpl$NameResolverListener.onResult(ManagedChannelImpl.java:1815) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:333) >>>>>>>> >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >>>>>>>> Source) >>>>>>>> >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >>>>>>>> Source) >>>>>>>> >>>>>>>> at java.base/java.lang.Thread.run(Unknown Source) >>>>>>>> >>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.handler.codec.http2.HpackDecoder >>>>>>>> >>>>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source) >>>>>>>> >>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>>>>>> >>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>>>>> >>>>>>>> ... 17 more >>>>>>>> >>>>>>>> >>>>>>>> Exception in thread "grpc-default-executor-0" >>>>>>>> java.lang.NoClassDefFoundError: >>>>>>>> org/apache/beam/vendor/grpc/v1p36p0/io/grpc/internal/DnsNameResolver$Resolve$1 >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve.run(DnsNameResolver.java:339) >>>>>>>> >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown >>>>>>>> Source) >>>>>>>> >>>>>>>> at >>>>>>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown >>>>>>>> Source) >>>>>>>> >>>>>>>> at java.base/java.lang.Thread.run(Unknown Source) >>>>>>>> >>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>> org.apache.beam.vendor.grpc.v1p36p0.io.grpc.internal.DnsNameResolver$Resolve$1 >>>>>>>> >>>>>>>> at java.base/java.net.URLClassLoader.findClass(Unknown Source) >>>>>>>> >>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74) >>>>>>>> >>>>>>>> at >>>>>>>> org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48) >>>>>>>> >>>>>>>> at java.base/java.lang.ClassLoader.loadClass(Unknown Source) >>>>>>>> >>>>>>>> ... 4 more >>>>>>>> >>>>>>>
