Thanks Robert, yes, I'm also thinking of own expansion service and trying
it, so:

[grpc-default-executor-3] INFO
io.xxxxx.kafka.security.token.RefreshableTokenLoginModule - starting
renewal task and exposing its jmx metrics
[grpc-default-executor-3] INFO
io.xxxx.kafka.security.token.TokenRenewalTask - IAF Token renewal started
[grpc-default-executor-3] INFO
org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully
logged in.
[grpc-default-executor-3] INFO
io.xxxx.kafka.security.token.TokenRenewalTask - proposed next checkpoint
time Sat Oct 03 11:38:29 UTC 2020, now is Sat Oct 03 02:02:24 UTC 2020, min
expiration Sat Oct 03 14:02:30 UTC 2020

I much agree with your last statement!

Happy weekend!


On Fri, Oct 2, 2020 at 6:31 PM Robert Bradshaw <[email protected]> wrote:

> If you make sure that these extra jars are in your path when you
> execute your pipeline, they should get picked up when invoking the
> expansion service (though this may not be the case long term).
>
> The cleanest way would be to provide your own expansion service. If
> you build a jar that consists of Beam's IO expansion service plus any
> necessary dependencies, you should be able to do
>
>     ReadFromKafka(
>         [ordinary params],
>         expansion_service=BeamJarExpansionService('path/to/your/jar'))
>
> to use this "custom" expansion service. See
>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py
> An alternative is to pass a pipeline option
>
>     --beam_services={"sdks:java:io:expansion-service:shadowJar":
> "path/to/your/jar"}
>
> which will override the default. (You can pass "host:port" rather than
> a path as well if you manually start the expansion service.)
>
> Exactly how to specify at a top level a set of extra dependencies to
> be applied to a particular subset of other-language transforms is
> still an open problem. Alternatively we could try to make expansion
> services themselves trivially easy to build, customize, and use.
>
> Hopefully that helps.
>
> - Robert
>
>
>
>
> On Fri, Oct 2, 2020 at 5:57 PM Kobe Feng <[email protected]> wrote:
> >
> > Thanks Rober, yes, our Kafka requires JAAS configuration
> (sasl.jaas.config) at the client side for security check with the
> corresponding LoginModule which requires additional classes:
> >
> ==================================================================================================================
> > Caused by: javax.security.auth.login.LoginException: unable to find
> LoginModule class: io.${XXXX}.kafka.security.iaf.IAFLoginModule
> >         at
> javax.security.auth.login.LoginContext.invoke(LoginContext.java:794)
> >         at
> javax.security.auth.login.LoginContext.access$000(LoginContext.java:195)
> >         at
> javax.security.auth.login.LoginContext$4.run(LoginContext.java:682)
> >         at
> javax.security.auth.login.LoginContext$4.run(LoginContext.java:680)
> >         at java.security.AccessController.doPrivileged(Native Method)
> >         at
> javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:680)
> >         at
> javax.security.auth.login.LoginContext.login(LoginContext.java:587)
> >         at
> org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:52)
> >         at
> org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:53)
> >         at
> org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:76)
> >         at
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:103)
> >         ... 42 more
> >
> >         at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
> >         at
> org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
> >
> > On Fri, Oct 2, 2020 at 5:14 PM Robert Bradshaw <[email protected]>
> wrote:
> >>
> >> Could you clarify a bit exactly what you're trying to do? When using
> KafkaIO, the provided jar should have all the necessary dependencies to
> construct and execute the kafka read/write. Is there some reason you need
> to inject additional dependencies into the environment provided by kafka?
> >>
> >> On Fri, Oct 2, 2020 at 3:20 PM Kobe Feng <[email protected]> wrote:
> >>>
> >>> Just a followup since no one replied it.
> >>> My understanding is for any expanded transforms beam wants the
> environment self-described.
> >>> So I updated boot and dockerfile for the java harness environment and
> use --sdk_harness_container_image_overrides in portable runner but fail to
> see the updated image loaded (default still), I guess only dataflow runner
> support it by glancing the code, but I believe it's the correct way and
> just need to deep dive the codes here when I turn back, then I will update
> this thread too.
> >>>
> >>> Kobe
> >>> On Wed, Sep 30, 2020 at 1:26 PM Kobe Feng <[email protected]>
> wrote:
> >>>>
> >>>> Hi everyone,
> >>>> Is there any recommended way to upload a third party jar (runtime
> scope) for expanding transform like KafkaIO.Read when using the python
> portable runner? Thank you!
> >>>>
> >>>> I tried --experiments=jar_packages=abc.jar,d.jar but just found those
> artifacts in python harness with provision info, and the java harness just
> uses the default environment for dependencies after expanding
> transformation from the grpc server upon expansion jar for reading Kafka
> messages.
> >>>>
> >>>> Also noticed above option will be removed in the future then tried
> --files_to_stage but this option only exists in Java SDK pipeline options.
> >>>>
> >>>> --
> >>>> Yours Sincerely
> >>>> Kobe Feng
> >>>
> >>>
> >>>
> >>> --
> >>> Yours Sincerely
> >>> Kobe Feng
> >
> >
> >
> > --
> > Yours Sincerely
> > Kobe Feng
>


-- 
Yours Sincerely
Kobe Feng

Reply via email to