Using deduplicate + side inputs will allow you to have a consistent view of
the account information for the entire window which can be nice since it
gives consistent processing semantics but using a simple in memory cache to
reduce the amount of lookups will likely be much easier to debug and
simpler to implement and maintain.

On Tue, May 15, 2018 at 2:31 PM Harshvardhan Agrawal <
harshvardhan.ag...@gmail.com> wrote:

> Thanks Raghu!
>
> Lukasz,
>
> Do you think lookups would be a better option than side inputs in my case?
>
>
> On Tue, May 15, 2018 at 16:33 Raghu Angadi <rang...@google.com> wrote:
>
>> It should work. I think you need apply Distinct before looking up account
>> info :
>> billingDataPairs.apply(Keys.create()).apply(Distinct.create()).apply("LookupAccounts",
>> ...).
>> Note that all of the accounts are stored in single in-memory map. It
>> should be small enough for that.
>>
>> On Tue, May 15, 2018 at 1:15 PM Harshvardhan Agrawal <
>> harshvardhan.ag...@gmail.com> wrote:
>>
>>> Well ideally, I actually made the example a little easy. In the actual
>>> example I have multiple reference datasets. Say, I have a tuple of Account
>>> and Product as the key. The reason we don’t do the lookup in the DoFn
>>> directly is that we don’t want to lookup the data for the same account or
>>> same product multiple times across workers in a window.
>>>
>>> What I was thinking was that it might be better to perform the lookup
>>> only once for each account and product in a window and then supply them as
>>> side inputs to the main input.
>>>
>>> On Tue, May 15, 2018 at 16:03 Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Is there a reason you don't want to read the accounting information
>>>> within the DoFn directly from the datastore, it seems like that would be
>>>> your simplest approach.
>>>>
>>>> On Tue, May 15, 2018 at 12:43 PM Harshvardhan Agrawal <
>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> No we don’t receive any such information from Kafka.
>>>>>
>>>>> The account information in the external store does change. Every time
>>>>> we have a change in the account information we will have to recompute all
>>>>> the billing info. Our source systems will make sure that they publish
>>>>> messages for those accounts again.
>>>>>
>>>>>
>>>>> On Tue, May 15, 2018 at 15:11 Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> For each BillingModel you receive over Kafka, how "fresh" should the
>>>>>> account information be?
>>>>>> Does the account information in the external store change?
>>>>>>
>>>>>> On Tue, May 15, 2018 at 11:22 AM Harshvardhan Agrawal <
>>>>>> harshvardhan.ag...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> We have certain billing data that arrives to us from Kafka. The
>>>>>>> billing data is in json and it contains an account ID. In order for us 
>>>>>>> to
>>>>>>> generate the final report we need to use some account data associated 
>>>>>>> with
>>>>>>> the account id and is stored in an external database.
>>>>>>>
>>>>>>> It is possible that we get multiple billing info messages for the
>>>>>>> same account. We want to be able to lookup the account information for 
>>>>>>> the
>>>>>>> messages in a window and then supply that as a side input to the next
>>>>>>> PTransform.
>>>>>>>
>>>>>>> Is it possible to achieve that in Beam?
>>>>>>>
>>>>>>> Here is my attempt:
>>>>>>>
>>>>>>>     PCollection<KV<Integer, BillingModel>> billingDataPairs =
>>>>>>> p.apply("ReadBillingInfo", KafkaIO.<String, String>read()
>>>>>>>      .withBootstrapServers(KAFKA_BOOTSTRAP_SERVER)
>>>>>>>      .withTopic(KAFKA_TOPIC)
>>>>>>>      .withKeyDeserializer(StringDeserializer.class)
>>>>>>>      .withValueDeserializer(StringDeserializer.class)
>>>>>>>      )
>>>>>>>      .apply("Window",
>>>>>>> Window.into(FixedWindows.of(Duration.standardSeconds(30))))
>>>>>>>      .apply("ProcessKafkaMessages",new KafkaProcessor());
>>>>>>>
>>>>>>>      PCollection<KV<Integer, Iterable<BillingModel>> billingData =
>>>>>>> billingDataPairs.apply(GroupByKey.<Integer, BillingModel>create());
>>>>>>>
>>>>>>>      PCollectionView<Map<Integer, Account>> accountData =
>>>>>>> billingDataPairs.apply("LookupAccounts",new
>>>>>>> AccountLookupClient()).apply(View.asMap());
>>>>>>>
>>>>>>>     billingDataPairs.apply(ParDo.of(new DoFn<KV<Integer,
>>>>>>> BillingModel>>(){
>>>>>>>     @ProcessElement
>>>>>>>     public void processElement(ProcessContext ctx) {
>>>>>>>     Integer accountId = ctx.element().getKey();
>>>>>>>     Iterable<BillingModel> billingModel = ctx.element().getValue();
>>>>>>>     Account account = ctx.sideinput(accountData).get(accountId);
>>>>>>>     }
>>>>>>>     }));
>>>>>>>
>>>>>>> Regards,
>>>>>>> Harsh
>>>>>>> --
>>>>>>>
>>>>>>> *Regards,Harshvardhan Agrawal*
>>>>>>> *267.991.6618 | LinkedIn
>>>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>>>
>>>>>> --
>>>>>
>>>>> *Regards,Harshvardhan Agrawal*
>>>>> *267.991.6618 | LinkedIn
>>>>> <https://www.linkedin.com/in/harshvardhanagr/>*
>>>>>
>>>> --
>>>
>>> *Regards,Harshvardhan Agrawal*
>>> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>>>
>> --
>
> *Regards,Harshvardhan Agrawal*
> *267.991.6618 | LinkedIn <https://www.linkedin.com/in/harshvardhanagr/>*
>

Reply via email to