Hi, I’m currently trying to figure out, what would be the best way to approach the following task:
I’m having a an unbounded stream of events carrying payment information of
varying currencies. For further processing I need to convert theses payment
information into one common currency.
For this purpose I’m having a set of currency conversion rates, which is
frequently updated by fetching it from external REST endpoint on larger time
intervals. I’m now trying to figure out, what is the best way to ….
1) schedule updating the currency rates in frequent intervals and
2) to provide these currency conversion rates to the ParDo, which actually does
the conversion.
I was thinking mainly about two different ways to solve it, but they didn’t
felt right for me or didn’t worked.
1) I was thinking about having a ParDo with a Timer, which outputs frequently
the conversion rates, which then is used as side input for the ParDo, which
does the conversion. Here I’m struggling with how I’m able to update the data
provided via the PCollectionView as side input to the conversion ParDo on an
currency update.
My first try went into the following direction:
PCollectionView<Map<String, BigDecimal>> rates = pipeline
.apply(Create.of(KV.of("bootstrap", ""))) // dummy event to set up
timer event & get initial set of conversion rates
.apply(ParDo.of(new CurrencyRates())) // fetch conversion rates
.apply(View.asSingleton());
pipeline.apply(testStream)
.apply(ParDo.of(new DoCurrencyConversionFn(rates))
.withSideInputs(rates));
Which is of course not working, as I would assume, that I need somehow to
define how the “singleton view” will be swapped out, when a new map of
conversion rates is generated.
However I’m getting the following Exception:
java.lang.IllegalArgumentException: Can't add an element past the end
of time (294247-01-10T04:00:54.775Z), got timestamp 294247-01-10T04:00:54.775Z
Which makes sense for me, as I would consider the PCollectionView as static.
Question regarding this, is there any way to make this “singleton view”
updatable?
I was thinking about if windowing would help in this case, but I was as well
not very sure, how to realise this, in that case.
2) The other option I considered, was to implement it as a ParDo, which is
stateful & using a timer.
Having a timer, which frequently triggers fetching currency updates, storing it
a state cell.
I decided against this, because it didn’t felt right, as I would not have a
good key to group by.
Any help or suggestions on this?
Thanks & Best,
Carsten
signature.asc
Description: Message signed with OpenPGP
