Thanks Robert, yes, I'm also thinking of own expansion service and trying it, so:
[grpc-default-executor-3] INFO io.xxxxx.kafka.security.token.RefreshableTokenLoginModule - starting renewal task and exposing its jmx metrics [grpc-default-executor-3] INFO io.xxxx.kafka.security.token.TokenRenewalTask - IAF Token renewal started [grpc-default-executor-3] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. [grpc-default-executor-3] INFO io.xxxx.kafka.security.token.TokenRenewalTask - proposed next checkpoint time Sat Oct 03 11:38:29 UTC 2020, now is Sat Oct 03 02:02:24 UTC 2020, min expiration Sat Oct 03 14:02:30 UTC 2020 I much agree with your last statement! Happy weekend! On Fri, Oct 2, 2020 at 6:31 PM Robert Bradshaw <[email protected]> wrote: > If you make sure that these extra jars are in your path when you > execute your pipeline, they should get picked up when invoking the > expansion service (though this may not be the case long term). > > The cleanest way would be to provide your own expansion service. If > you build a jar that consists of Beam's IO expansion service plus any > necessary dependencies, you should be able to do > > ReadFromKafka( > [ordinary params], > expansion_service=BeamJarExpansionService('path/to/your/jar')) > > to use this "custom" expansion service. See > > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py > An alternative is to pass a pipeline option > > --beam_services={"sdks:java:io:expansion-service:shadowJar": > "path/to/your/jar"} > > which will override the default. (You can pass "host:port" rather than > a path as well if you manually start the expansion service.) > > Exactly how to specify at a top level a set of extra dependencies to > be applied to a particular subset of other-language transforms is > still an open problem. Alternatively we could try to make expansion > services themselves trivially easy to build, customize, and use. > > Hopefully that helps. > > - Robert > > > > > On Fri, Oct 2, 2020 at 5:57 PM Kobe Feng <[email protected]> wrote: > > > > Thanks Rober, yes, our Kafka requires JAAS configuration > (sasl.jaas.config) at the client side for security check with the > corresponding LoginModule which requires additional classes: > > > ================================================================================================================== > > Caused by: javax.security.auth.login.LoginException: unable to find > LoginModule class: io.${XXXX}.kafka.security.iaf.IAFLoginModule > > at > javax.security.auth.login.LoginContext.invoke(LoginContext.java:794) > > at > javax.security.auth.login.LoginContext.access$000(LoginContext.java:195) > > at > javax.security.auth.login.LoginContext$4.run(LoginContext.java:682) > > at > javax.security.auth.login.LoginContext$4.run(LoginContext.java:680) > > at java.security.AccessController.doPrivileged(Native Method) > > at > javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680) > > at > javax.security.auth.login.LoginContext.login(LoginContext.java:587) > > at > org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52) > > at > org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:53) > > at > org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:76) > > at > org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103) > > ... 42 more > > > > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) > > at > org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) > > > > On Fri, Oct 2, 2020 at 5:14 PM Robert Bradshaw <[email protected]> > wrote: > >> > >> Could you clarify a bit exactly what you're trying to do? When using > KafkaIO, the provided jar should have all the necessary dependencies to > construct and execute the kafka read/write. Is there some reason you need > to inject additional dependencies into the environment provided by kafka? > >> > >> On Fri, Oct 2, 2020 at 3:20 PM Kobe Feng <[email protected]> wrote: > >>> > >>> Just a followup since no one replied it. > >>> My understanding is for any expanded transforms beam wants the > environment self-described. > >>> So I updated boot and dockerfile for the java harness environment and > use --sdk_harness_container_image_overrides in portable runner but fail to > see the updated image loaded (default still), I guess only dataflow runner > support it by glancing the code, but I believe it's the correct way and > just need to deep dive the codes here when I turn back, then I will update > this thread too. > >>> > >>> Kobe > >>> On Wed, Sep 30, 2020 at 1:26 PM Kobe Feng <[email protected]> > wrote: > >>>> > >>>> Hi everyone, > >>>> Is there any recommended way to upload a third party jar (runtime > scope) for expanding transform like KafkaIO.Read when using the python > portable runner? Thank you! > >>>> > >>>> I tried --experiments=jar_packages=abc.jar,d.jar but just found those > artifacts in python harness with provision info, and the java harness just > uses the default environment for dependencies after expanding > transformation from the grpc server upon expansion jar for reading Kafka > messages. > >>>> > >>>> Also noticed above option will be removed in the future then tried > --files_to_stage but this option only exists in Java SDK pipeline options. > >>>> > >>>> -- > >>>> Yours Sincerely > >>>> Kobe Feng > >>> > >>> > >>> > >>> -- > >>> Yours Sincerely > >>> Kobe Feng > > > > > > > > -- > > Yours Sincerely > > Kobe Feng > -- Yours Sincerely Kobe Feng
