For each BillingModel you receive over Kafka, how "fresh" should the
account information be?
Does the account information in the external store change?

On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Hi,
>
> We have certain billing data that arrives to us from Kafka. The billing
> data is in json and it contains an account ID. In order for us to generate
> the final report we need to use some account data associated with the
> account id and is stored in an external database.
>
> It is possible that we get multiple billing info messages for the same
> account. We want to be able to lookup the account information for the
> messages in a window and then supply that as a side input to the next
> PTransform.
>
> Is it possible to achieve that in Beam?
>
> Here is my attempt:
>
>     PCollection<KV<Integer, BillingModel>> billingDataPairs =
> p.apply("ReadBillingInfo", KafkaIO.<String, String>read()
>      .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER)
>      .withTopic(KAFKA_TOPIC)
>      .withKeyDeserializer(StringDeserializer.class)
>      .withValueDeserializer(StringDeserializer.class)
>      )
>      .apply("Window",
> Window.into(FixedWindows.of(Duration.standardSeconds(30))))
>      .apply("ProcessKafkaMessages",new KafkaProcessor());
>
>      PCollection<KV<Integer, Iterable<BillingModel>> billingData =
> billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create());
>
>      PCollectionView<Map<Integer, Account>> accountData =
> billingDataPairs.apply("LookupAccounts",new
> AccountLookupClient()).apply(View.asMap());
>
>     billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, BillingModel>>(){
>     @ProcessElement
>     public void processElement(ProcessContext ctx) {
>     Integer accountId = ctx.element().getKey();
>     Iterable<BillingModel> billingModel = ctx.element().getValue();
>     Account account = ctx.sideinput(accountData).get(accountId);
>     }
>     }));
>
> Regards,
> Harsh
> --
>
> *Regards,Harshvardhan Agrawal*
> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>

Reply via email to