For each BillingModel you receive over Kafka, how "fresh" should the account information be? Does the account information in the external store change?
On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal < harshvardhan.ag...@gmail.com> wrote: > Hi, > > We have certain billing data that arrives to us from Kafka. The billing > data is in json and it contains an account ID. In order for us to generate > the final report we need to use some account data associated with the > account id and is stored in an external database. > > It is possible that we get multiple billing info messages for the same > account. We want to be able to lookup the account information for the > messages in a window and then supply that as a side input to the next > PTransform. > > Is it possible to achieve that in Beam? > > Here is my attempt: > > PCollection<KV<Integer, BillingModel>> billingDataPairs = > p.apply("ReadBillingInfo", KafkaIO.<String, String>read() > .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER) > .withTopic(KAFKA_TOPIC) > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class) > ) > .apply("Window", > Window.into(FixedWindows.of(Duration.standardSeconds(30)))) > .apply("ProcessKafkaMessages",new KafkaProcessor()); > > PCollection<KV<Integer, Iterable<BillingModel>> billingData = > billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create()); > > PCollectionView<Map<Integer, Account>> accountData = > billingDataPairs.apply("LookupAccounts",new > AccountLookupClient()).apply(View.asMap()); > > billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, BillingModel>>(){ > @ProcessElement > public void processElement(ProcessContext ctx) { > Integer accountId = ctx.element().getKey(); > Iterable<BillingModel> billingModel = ctx.element().getValue(); > Account account = ctx.sideinput(accountData).get(accountId); > } > })); > > Regards, > Harsh > -- > > *Regards,Harshvardhan Agrawal* > *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >