Using deduplicate + side inputs will allow you to have a consistent view of the account information for the entire window which can be nice since it gives consistent processing semantics but using a simple in memory cache to reduce the amount of lookups will likely be much easier to debug and simpler to implement and maintain.
On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal < harshvardhan.ag...@gmail.com> wrote: > Thanks Raghu! > > Lukasz, > > Do you think lookups would be a better option than side inputs in my case? > > > On Tue, May 15, 2018 at 16:33 Raghu Angadi <rang...@google.com> wrote: > >> It should work. I think you need apply Distinct before looking up account >> info : >> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts", >> ...). >> Note that all of the accounts are stored in single in-memory map. It >> should be small enough for that. >> >> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal < >> harshvardhan.ag...@gmail.com> wrote: >> >>> Well ideally, I actually made the example a little easy. In the actual >>> example I have multiple reference datasets. Say, I have a tuple of Account >>> and Product as the key. The reason we don’t do the lookup in the DoFn >>> directly is that we don’t want to lookup the data for the same account or >>> same product multiple times across workers in a window. >>> >>> What I was thinking was that it might be better to perform the lookup >>> only once for each account and product in a window and then supply them as >>> side inputs to the main input. >>> >>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <lc...@google.com> wrote: >>> >>>> Is there a reason you don't want to read the accounting information >>>> within the DoFn directly from the datastore, it seems like that would be >>>> your simplest approach. >>>> >>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal < >>>> harshvardhan.ag...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> No we don’t receive any such information from Kafka. >>>>> >>>>> The account information in the external store does change. Every time >>>>> we have a change in the account information we will have to recompute all >>>>> the billing info. Our source systems will make sure that they publish >>>>> messages for those accounts again. >>>>> >>>>> >>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <lc...@google.com> wrote: >>>>> >>>>>> For each BillingModel you receive over Kafka, how "fresh" should the >>>>>> account information be? >>>>>> Does the account information in the external store change? >>>>>> >>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal < >>>>>> harshvardhan.ag...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> We have certain billing data that arrives to us from Kafka. The >>>>>>> billing data is in json and it contains an account ID. In order for us >>>>>>> to >>>>>>> generate the final report we need to use some account data associated >>>>>>> with >>>>>>> the account id and is stored in an external database. >>>>>>> >>>>>>> It is possible that we get multiple billing info messages for the >>>>>>> same account. We want to be able to lookup the account information for >>>>>>> the >>>>>>> messages in a window and then supply that as a side input to the next >>>>>>> PTransform. >>>>>>> >>>>>>> Is it possible to achieve that in Beam? >>>>>>> >>>>>>> Here is my attempt: >>>>>>> >>>>>>> PCollection<KV<Integer, BillingModel>> billingDataPairs = >>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read() >>>>>>> .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER) >>>>>>> .withTopic(KAFKA_TOPIC) >>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>> .withValueDeserializer(StringDeserializer.class) >>>>>>> ) >>>>>>> .apply("Window", >>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30)))) >>>>>>> .apply("ProcessKafkaMessages",new KafkaProcessor()); >>>>>>> >>>>>>> PCollection<KV<Integer, Iterable<BillingModel>> billingData = >>>>>>> billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create()); >>>>>>> >>>>>>> PCollectionView<Map<Integer, Account>> accountData = >>>>>>> billingDataPairs.apply("LookupAccounts",new >>>>>>> AccountLookupClient()).apply(View.asMap()); >>>>>>> >>>>>>> billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer, >>>>>>> BillingModel>>(){ >>>>>>> @ProcessElement >>>>>>> public void processElement(ProcessContext ctx) { >>>>>>> Integer accountId = ctx.element().getKey(); >>>>>>> Iterable<BillingModel> billingModel = ctx.element().getValue(); >>>>>>> Account account = ctx.sideinput(accountData).get(accountId); >>>>>>> } >>>>>>> })); >>>>>>> >>>>>>> Regards, >>>>>>> Harsh >>>>>>> -- >>>>>>> >>>>>>> *Regards,Harshvardhan Agrawal* >>>>>>> *267.991.6618 | LinkedIn >>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>* >>>>>>> >>>>>> -- >>>>> >>>>> *Regards,Harshvardhan Agrawal* >>>>> *267.991.6618 | LinkedIn >>>>> <https://www.linkedin.com/in/harshvardhanagr/>* >>>>> >>>> -- >>> >>> *Regards,Harshvardhan Agrawal* >>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >>> >> -- > > *Regards,Harshvardhan Agrawal* > *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>* >