Re: Multiple firings on side input
Ok thanks! Ill give it a shot. Btw. Maybe that part of the docs should be removed until retractions are in place then? It made me really sure it should work for multiple firings. Again, thanks for the help! // Vilhelm On Mon, 16 Apr 2018, 19:10 Kenneth Knowles, wrote: > Hi Vilhelm, > > This is a known issue in the Beam model. Trigger firings should > automatically update downstream results, but instead they are treated as > new elements. The design for retractions will alleviate this problem. But > you can work around it yourself in specific cases like this. > > You can use View.asMultimap() which will mean each trigger firing for a > key will add a new element to the set of values for a key. In order to > distinguish the latest one, you will need to do some manual preparation. > > // This is your triggered input; there will be duplicate keys > PCollection> input = ... > > // Here I am just making up the types you need to implement to keep > the index of the triggering > PCollectionView> sideInput > = input > .apply(ParDo.of(new DoFn, KV MyValuePlusSequenceNumber>() { > @ProcessElement > public void process(ProcessContext ctx) { > ctx.output( > ctx.element().getKey(), > new MyValuePlusSequenceNumber( > ctx.element().getValue(), > ctx.pane().getIndex())); > } > }) > .apply(View.asMultiMap()); > > This will allow you to grab all the trigger firings associated with a > particular key and find the last. It is not ideal, either in clarity or > performance, but it can work for some cases until we have retraction > support. > > Apologies for typos or broken code here, as I am just typing it in email > without checking its compilation or behavior. > > Kenn > > On Mon, Apr 16, 2018 at 4:15 AM Vilhelm von Ehrenheim < > vonehrenh...@gmail.com> wrote: > >> Hi! >> I have a side input with streaming updates in a global window. I have >> tried to approach this several ways but can’t figure out how to do it. What >> I really need is a side-input Map that should be updated when streaming >> input change (i.e keys are updated). >> >> I have tried to implement this with a View.asMap transform but got an >> error that I have duplicate keys in my set (which are there due to multiple >> triggered updates on the pcollection). I then tried to do it as a singleton >> using a global CombineFn to build the map and use the a >> Combine.globally().asSingletonView() instead. But I then got an error >> java.lang.IllegalArgumentException: >> PCollection with more than one element accessed as a singleton view. >> >> How are you supposed to do this? In the documentation there is a part >> that suggests that this should be possible: >> >> If the side input has multiple trigger firings, Beam uses the value from >> the latest trigger firing. This is particularly useful if you use a side >> input with a single global window and specify a trigger. >> >> Thanks! >> >> // Vilhelm von Ehrenheim >> >> >
Re: Multiple firings on side input
Hi Vilhelm, This is a known issue in the Beam model. Trigger firings should automatically update downstream results, but instead they are treated as new elements. The design for retractions will alleviate this problem. But you can work around it yourself in specific cases like this. You can use View.asMultimap() which will mean each trigger firing for a key will add a new element to the set of values for a key. In order to distinguish the latest one, you will need to do some manual preparation. // This is your triggered input; there will be duplicate keys PCollection> input = ... // Here I am just making up the types you need to implement to keep the index of the triggering PCollectionView> sideInput = input .apply(ParDo.of(new DoFn, KV() { @ProcessElement public void process(ProcessContext ctx) { ctx.output( ctx.element().getKey(), new MyValuePlusSequenceNumber( ctx.element().getValue(), ctx.pane().getIndex())); } }) .apply(View.asMultiMap()); This will allow you to grab all the trigger firings associated with a particular key and find the last. It is not ideal, either in clarity or performance, but it can work for some cases until we have retraction support. Apologies for typos or broken code here, as I am just typing it in email without checking its compilation or behavior. Kenn On Mon, Apr 16, 2018 at 4:15 AM Vilhelm von Ehrenheim < vonehrenh...@gmail.com> wrote: > Hi! > I have a side input with streaming updates in a global window. I have > tried to approach this several ways but can’t figure out how to do it. What > I really need is a side-input Map that should be updated when streaming > input change (i.e keys are updated). > > I have tried to implement this with a View.asMap transform but got an > error that I have duplicate keys in my set (which are there due to multiple > triggered updates on the pcollection). I then tried to do it as a singleton > using a global CombineFn to build the map and use the a > Combine.globally().asSingletonView() instead. But I then got an error > java.lang.IllegalArgumentException: > PCollection with more than one element accessed as a singleton view. > > How are you supposed to do this? In the documentation there is a part that > suggests that this should be possible: > > If the side input has multiple trigger firings, Beam uses the value from > the latest trigger firing. This is particularly useful if you use a side > input with a single global window and specify a trigger. > > Thanks! > > // Vilhelm von Ehrenheim > >
Multiple firings on side input
Hi! I have a side input with streaming updates in a global window. I have tried to approach this several ways but can’t figure out how to do it. What I really need is a side-input Map that should be updated when streaming input change (i.e keys are updated). I have tried to implement this with a View.asMap transform but got an error that I have duplicate keys in my set (which are there due to multiple triggered updates on the pcollection). I then tried to do it as a singleton using a global CombineFn to build the map and use the a Combine.globally().asSingletonView() instead. But I then got an error java.lang.IllegalArgumentException: PCollection with more than one element accessed as a singleton view. How are you supposed to do this? In the documentation there is a part that suggests that this should be possible: If the side input has multiple trigger firings, Beam uses the value from the latest trigger firing. This is particularly useful if you use a side input with a single global window and specify a trigger. Thanks! // Vilhelm von Ehrenheim