Re: Write to multiple IOs in linear fashion

2021-03-25 Thread Kenneth Knowles
On Thu, Mar 25, 2021 at 12:55 PM Robert Bradshaw 
wrote:

> On Wed, Mar 24, 2021 at 7:29 PM Vincent Marquez 
> wrote:
>
>>
>> *~Vincent*
>>
>>
>> On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles  wrote:
>>
>>> The reason I was checking out the code is that sometimes a natural thing
>>> to output would be a summary of what was written. So each chunk of writes
>>> and the final chunk written in @FinishBundle. This is, for example, what
>>> SQL engines do (output # of rows written).
>>>
>>> You could output both the summary and the full list of written elements
>>> to different outputs, and users can choose. Outputs that are never consumed
>>> should be very low or zero cost.n
>>>
>>>
>> I like this approach.  I would much prefer two outputs (one of which is
>> all elements written) to returning an existential/wildcard PCollection.
>>
>
> +1, this would work well too. Returning a PCollectionTuple is extensible
> too, as one could add more (or better) outputs in the future without
> changing the signature.
>

This comment is dangerously close to sparking a philosophical conversation!

Kenn


>
>>
>>
>>> Kenn
>>>
>>> On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw 
>>> wrote:
>>>
 Yeah, the entire input is not always what is needed, and can generally
 be achieved via

 input -> wait(side input of write) -> do something with the input

 Of course one could also do

 entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
 CombineGlobally(TrivialCombineFn)

 to reduce this to a more minimal set with at least one element per
 Window.

 The file writing operations emit the actual files that were written,
 which can be handy. My suggestion of PCollection was just so that we can
 emit something usable, and decide exactly what is the most useful is later.


 On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax  wrote:

> I believe that the Wait transform turns this output into a side input,
> so outputting the input PCollection might be problematic.
>
> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles 
> wrote:
>
>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>> just reading really quickly so sorry if I missed something...
>>
>> Checking out the code for the WriteFn I see a big problem:
>>
>> @Setup
>> public void setup() {
>>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>> }
>>
>> @ProcessElement
>>   public void processElement(ProcessContext c) throws
>> ExecutionException, InterruptedException {
>>   writer.mutate(c.element());
>> }
>>
>> @Teardown
>> public void teardown() throws Exception {
>>   writer.close();
>>   writer = null;
>> }
>>
>> It is only in writer.close() that all async writes are waited on.
>> This needs to happen in @FinishBundle.
>>
>> Did you discover this when implementing your own Cassandra.Write?
>>
>> Until you have waited on the future, you should not output the
>> element as "has been written". And you cannot output from the @TearDown
>> method which is just for cleaning up resources.
>>
>> Am I reading this wrong?
>>
>> Kenn
>>
>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato 
>> wrote:
>>
>>> How about a PCollection containing every element which was
>>> successfully written?
>>> Basically the same things which were passed into it.
>>>
>>> Then you could act on every element after its been successfully
>>> written to the sink.
>>>
>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
>>> wrote:
>>>
 On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía 
 wrote:

> +dev
>
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
>

 My proposal is that one returns a PCollection that consists,
 internally, of something contentless like nulls. This is future 
 compatible
 with returning something more maningful based on the source source or 
 write
 process itself, but at least this would be followable.


> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
>
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E


 Yeah, we should go ahead and finally do something.


>
> > Returning PDone is an anti-pattern that should be avoided, but
> 

Re: Write to multiple IOs in linear fashion

2021-03-25 Thread Robert Bradshaw
On Wed, Mar 24, 2021 at 7:29 PM Vincent Marquez 
wrote:

>
> *~Vincent*
>
>
> On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles  wrote:
>
>> The reason I was checking out the code is that sometimes a natural thing
>> to output would be a summary of what was written. So each chunk of writes
>> and the final chunk written in @FinishBundle. This is, for example, what
>> SQL engines do (output # of rows written).
>>
>> You could output both the summary and the full list of written elements
>> to different outputs, and users can choose. Outputs that are never consumed
>> should be very low or zero cost.n
>>
>>
> I like this approach.  I would much prefer two outputs (one of which is
> all elements written) to returning an existential/wildcard PCollection.
>

+1, this would work well too. Returning a PCollectionTuple is extensible
too, as one could add more (or better) outputs in the future without
changing the signature.


>
>
>
>> Kenn
>>
>> On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw 
>> wrote:
>>
>>> Yeah, the entire input is not always what is needed, and can generally
>>> be achieved via
>>>
>>> input -> wait(side input of write) -> do something with the input
>>>
>>> Of course one could also do
>>>
>>> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
>>> CombineGlobally(TrivialCombineFn)
>>>
>>> to reduce this to a more minimal set with at least one element per
>>> Window.
>>>
>>> The file writing operations emit the actual files that were written,
>>> which can be handy. My suggestion of PCollection was just so that we can
>>> emit something usable, and decide exactly what is the most useful is later.
>>>
>>>
>>> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax  wrote:
>>>
 I believe that the Wait transform turns this output into a side input,
 so outputting the input PCollection might be problematic.

 On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles 
 wrote:

> Alex's idea sounds good and like what Vincent maybe implemented. I am
> just reading really quickly so sorry if I missed something...
>
> Checking out the code for the WriteFn I see a big problem:
>
> @Setup
> public void setup() {
>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
> }
>
> @ProcessElement
>   public void processElement(ProcessContext c) throws
> ExecutionException, InterruptedException {
>   writer.mutate(c.element());
> }
>
> @Teardown
> public void teardown() throws Exception {
>   writer.close();
>   writer = null;
> }
>
> It is only in writer.close() that all async writes are waited on. This
> needs to happen in @FinishBundle.
>
> Did you discover this when implementing your own Cassandra.Write?
>
> Until you have waited on the future, you should not output the element
> as "has been written". And you cannot output from the @TearDown method
> which is just for cleaning up resources.
>
> Am I reading this wrong?
>
> Kenn
>
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:
>
>> How about a PCollection containing every element which was
>> successfully written?
>> Basically the same things which were passed into it.
>>
>> Then you could act on every element after its been successfully
>> written to the sink.
>>
>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
>> wrote:
>>
>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía 
>>> wrote:
>>>
 +dev

 Since we all agree that we should return something different than
 PDone the real question is what should we return.

>>>
>>> My proposal is that one returns a PCollection that consists,
>>> internally, of something contentless like nulls. This is future 
>>> compatible
>>> with returning something more maningful based on the source source or 
>>> write
>>> process itself, but at least this would be followable.
>>>
>>>
 As a reminder we had a pretty interesting discussion about this
 already in the past but uniformization of our return values has not
 happened.
 This thread is worth reading for Vincent or anyone who wants to
 contribute Write transforms that return.

 https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>
>>>
>>> Yeah, we should go ahead and finally do something.
>>>
>>>

 > Returning PDone is an anti-pattern that should be avoided, but
 changing it now would be backwards incompatible.

 Periodic reminder most IOs are still Experimental so I suppose it is
 worth to the maintainers to judge if the upgrade to return
 someething
 different of PDone is worth, in that case we can deprecate and

Re: Write to multiple IOs in linear fashion

2021-03-25 Thread Alexey Romanenko
I think you are right, since "writer.close()”  contains a business logic, it 
must be moved to @FinishBundle. The same thing about DeleteFn.
I’ll create a Jira for that.

> On 25 Mar 2021, at 00:49, Kenneth Knowles  wrote:
> 
> Alex's idea sounds good and like what Vincent maybe implemented. I am just 
> reading really quickly so sorry if I missed something...
> 
> Checking out the code for the WriteFn I see a big problem:
> 
> @Setup
> public void setup() {
>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
> }
> 
> @ProcessElement
>   public void processElement(ProcessContext c) throws ExecutionException, 
> InterruptedException {
>   writer.mutate(c.element());
> }
> 
> @Teardown
> public void teardown() throws Exception {
>   writer.close();
>   writer = null;
> }
> 
> It is only in writer.close() that all async writes are waited on. This needs 
> to happen in @FinishBundle.
> 
> Did you discover this when implementing your own Cassandra.Write?
> 
> Until you have waited on the future, you should not output the element as 
> "has been written". And you cannot output from the @TearDown method which is 
> just for cleaning up resources.
> 
> Am I reading this wrong?
> 
> Kenn
> 
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  > wrote:
> How about a PCollection containing every element which was successfully 
> written?
> Basically the same things which were passed into it.
> 
> Then you could act on every element after its been successfully written to 
> the sink.
> 
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw  > wrote:
> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  > wrote:
> +dev
> 
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
> 
> My proposal is that one returns a PCollection that consists, internally, 
> of something contentless like nulls. This is future compatible with returning 
> something more maningful based on the source source or write process itself, 
> but at least this would be followable. 
>  
> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>  
> 
> 
> Yeah, we should go ahead and finally do something. 
>  
> 
> > Returning PDone is an anti-pattern that should be avoided, but changing it 
> > now would be backwards incompatible.
> 
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
> 
> 
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
> mailto:aromanenko@gmail.com>> wrote:
> >
> > I thought that was said about returning a PCollection of write results as 
> > it’s done in other IOs (as I mentioned as examples) that have _additional_ 
> > write methods, like “withWriteResults()” etc, that return PTransform<…, 
> > PCollection>.
> > In this case, we keep backwards compatibility and just add new 
> > funtionality. Though, we need to follow the same pattern for user API and 
> > maybe even naming for this feature across different IOs (like we have for 
> > "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw  > > wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but changing it 
> > now would be backwards incompatible. PRs to add non-PDone returning 
> > variants (probably as another option to the builders) that compose well 
> > with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko  > > wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as it was 
> >> mentioned before, it doesn’t work with PDone, only with PCollection as a 
> >> signal.
> >>
> >> Since you already adjusted your own writer for that, it would be great to 
> >> contribute it back to Beam in the way as it was done for other IOs (for 
> >> example, JdbcIO [1] or BigtableIO [2])
> >>
> >> In general, I think we need to have it for all IOs, at least to use with 
> >> “Wait” because this pattern it's quite often required.
> >>
> >> [1] 
> >> 

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Vincent Marquez
*~Vincent*


On Wed, Mar 24, 2021 at 6:07 PM Kenneth Knowles  wrote:

> The reason I was checking out the code is that sometimes a natural thing
> to output would be a summary of what was written. So each chunk of writes
> and the final chunk written in @FinishBundle. This is, for example, what
> SQL engines do (output # of rows written).
>
> You could output both the summary and the full list of written elements to
> different outputs, and users can choose. Outputs that are never consumed
> should be very low or zero cost.n
>
>
I like this approach.  I would much prefer two outputs (one of which is all
elements written) to returning an existential/wildcard PCollection.



> Kenn
>
> On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw 
> wrote:
>
>> Yeah, the entire input is not always what is needed, and can generally be
>> achieved via
>>
>> input -> wait(side input of write) -> do something with the input
>>
>> Of course one could also do
>>
>> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
>> CombineGlobally(TrivialCombineFn)
>>
>> to reduce this to a more minimal set with at least one element per
>> Window.
>>
>> The file writing operations emit the actual files that were written,
>> which can be handy. My suggestion of PCollection was just so that we can
>> emit something usable, and decide exactly what is the most useful is later.
>>
>>
>> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax  wrote:
>>
>>> I believe that the Wait transform turns this output into a side input,
>>> so outputting the input PCollection might be problematic.
>>>
>>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles  wrote:
>>>
 Alex's idea sounds good and like what Vincent maybe implemented. I am
 just reading really quickly so sorry if I missed something...

 Checking out the code for the WriteFn I see a big problem:

 @Setup
 public void setup() {
   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
 }

 @ProcessElement
   public void processElement(ProcessContext c) throws
 ExecutionException, InterruptedException {
   writer.mutate(c.element());
 }

 @Teardown
 public void teardown() throws Exception {
   writer.close();
   writer = null;
 }

 It is only in writer.close() that all async writes are waited on. This
 needs to happen in @FinishBundle.

 Did you discover this when implementing your own Cassandra.Write?

 Until you have waited on the future, you should not output the element
 as "has been written". And you cannot output from the @TearDown method
 which is just for cleaning up resources.

 Am I reading this wrong?

 Kenn

 On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:

> How about a PCollection containing every element which was
> successfully written?
> Basically the same things which were passed into it.
>
> Then you could act on every element after its been successfully
> written to the sink.
>
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
> wrote:
>
>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía 
>> wrote:
>>
>>> +dev
>>>
>>> Since we all agree that we should return something different than
>>> PDone the real question is what should we return.
>>>
>>
>> My proposal is that one returns a PCollection that consists,
>> internally, of something contentless like nulls. This is future 
>> compatible
>> with returning something more maningful based on the source source or 
>> write
>> process itself, but at least this would be followable.
>>
>>
>>> As a reminder we had a pretty interesting discussion about this
>>> already in the past but uniformization of our return values has not
>>> happened.
>>> This thread is worth reading for Vincent or anyone who wants to
>>> contribute Write transforms that return.
>>>
>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>
>>
>> Yeah, we should go ahead and finally do something.
>>
>>
>>>
>>> > Returning PDone is an anti-pattern that should be avoided, but
>>> changing it now would be backwards incompatible.
>>>
>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>> worth to the maintainers to judge if the upgrade to return someething
>>> different of PDone is worth, in that case we can deprecate and remove
>>> the previous signature in short time (2 releases was the average for
>>> previous cases).
>>>
>>>
>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>  wrote:
>>> >
>>> > I thought that was said about returning a PCollection of write
>>> results as it’s done in other IOs (as I mentioned as examples) that have
>>> 

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Kenneth Knowles
The reason I was checking out the code is that sometimes a natural thing to
output would be a summary of what was written. So each chunk of writes and
the final chunk written in @FinishBundle. This is, for example, what SQL
engines do (output # of rows written).

You could output both the summary and the full list of written elements to
different outputs, and users can choose. Outputs that are never consumed
should be very low or zero cost.

Kenn

On Wed, Mar 24, 2021 at 5:36 PM Robert Bradshaw  wrote:

> Yeah, the entire input is not always what is needed, and can generally be
> achieved via
>
> input -> wait(side input of write) -> do something with the input
>
> Of course one could also do
>
> entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
> CombineGlobally(TrivialCombineFn)
>
> to reduce this to a more minimal set with at least one element per Window.
>
> The file writing operations emit the actual files that were written, which
> can be handy. My suggestion of PCollection was just so that we can emit
> something usable, and decide exactly what is the most useful is later.
>
>
> On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax  wrote:
>
>> I believe that the Wait transform turns this output into a side input, so
>> outputting the input PCollection might be problematic.
>>
>> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles  wrote:
>>
>>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>>> just reading really quickly so sorry if I missed something...
>>>
>>> Checking out the code for the WriteFn I see a big problem:
>>>
>>> @Setup
>>> public void setup() {
>>>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>>> }
>>>
>>> @ProcessElement
>>>   public void processElement(ProcessContext c) throws
>>> ExecutionException, InterruptedException {
>>>   writer.mutate(c.element());
>>> }
>>>
>>> @Teardown
>>> public void teardown() throws Exception {
>>>   writer.close();
>>>   writer = null;
>>> }
>>>
>>> It is only in writer.close() that all async writes are waited on. This
>>> needs to happen in @FinishBundle.
>>>
>>> Did you discover this when implementing your own Cassandra.Write?
>>>
>>> Until you have waited on the future, you should not output the element
>>> as "has been written". And you cannot output from the @TearDown method
>>> which is just for cleaning up resources.
>>>
>>> Am I reading this wrong?
>>>
>>> Kenn
>>>
>>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:
>>>
 How about a PCollection containing every element which was successfully
 written?
 Basically the same things which were passed into it.

 Then you could act on every element after its been successfully written
 to the sink.

 On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
 wrote:

> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía 
> wrote:
>
>> +dev
>>
>> Since we all agree that we should return something different than
>> PDone the real question is what should we return.
>>
>
> My proposal is that one returns a PCollection that consists,
> internally, of something contentless like nulls. This is future compatible
> with returning something more maningful based on the source source or 
> write
> process itself, but at least this would be followable.
>
>
>> As a reminder we had a pretty interesting discussion about this
>> already in the past but uniformization of our return values has not
>> happened.
>> This thread is worth reading for Vincent or anyone who wants to
>> contribute Write transforms that return.
>>
>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>
>
> Yeah, we should go ahead and finally do something.
>
>
>>
>> > Returning PDone is an anti-pattern that should be avoided, but
>> changing it now would be backwards incompatible.
>>
>> Periodic reminder most IOs are still Experimental so I suppose it is
>> worth to the maintainers to judge if the upgrade to return someething
>> different of PDone is worth, in that case we can deprecate and remove
>> the previous signature in short time (2 releases was the average for
>> previous cases).
>>
>>
>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>  wrote:
>> >
>> > I thought that was said about returning a PCollection of write
>> results as it’s done in other IOs (as I mentioned as examples) that have
>> _additional_ write methods, like “withWriteResults()” etc, that return
>> PTransform<…, PCollection>.
>> > In this case, we keep backwards compatibility and just add new
>> funtionality. Though, we need to follow the same pattern for user API and
>> maybe even naming for this feature across different IOs (like we have for
>> "readAll()” methods).
>> >

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Robert Bradshaw
Yeah, the entire input is not always what is needed, and can generally be
achieved via

input -> wait(side input of write) -> do something with the input

Of course one could also do

entire_input_as_output_of_wait -> MapTo(KV.of(null, null)) ->
CombineGlobally(TrivialCombineFn)

to reduce this to a more minimal set with at least one element per Window.

The file writing operations emit the actual files that were written, which
can be handy. My suggestion of PCollection was just so that we can emit
something usable, and decide exactly what is the most useful is later.


On Wed, Mar 24, 2021 at 5:30 PM Reuven Lax  wrote:

> I believe that the Wait transform turns this output into a side input, so
> outputting the input PCollection might be problematic.
>
> On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles  wrote:
>
>> Alex's idea sounds good and like what Vincent maybe implemented. I am
>> just reading really quickly so sorry if I missed something...
>>
>> Checking out the code for the WriteFn I see a big problem:
>>
>> @Setup
>> public void setup() {
>>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
>> }
>>
>> @ProcessElement
>>   public void processElement(ProcessContext c) throws
>> ExecutionException, InterruptedException {
>>   writer.mutate(c.element());
>> }
>>
>> @Teardown
>> public void teardown() throws Exception {
>>   writer.close();
>>   writer = null;
>> }
>>
>> It is only in writer.close() that all async writes are waited on. This
>> needs to happen in @FinishBundle.
>>
>> Did you discover this when implementing your own Cassandra.Write?
>>
>> Until you have waited on the future, you should not output the element as
>> "has been written". And you cannot output from the @TearDown method which
>> is just for cleaning up resources.
>>
>> Am I reading this wrong?
>>
>> Kenn
>>
>> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:
>>
>>> How about a PCollection containing every element which was successfully
>>> written?
>>> Basically the same things which were passed into it.
>>>
>>> Then you could act on every element after its been successfully written
>>> to the sink.
>>>
>>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
>>> wrote:
>>>
 On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  wrote:

> +dev
>
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
>

 My proposal is that one returns a PCollection that consists,
 internally, of something contentless like nulls. This is future compatible
 with returning something more maningful based on the source source or write
 process itself, but at least this would be followable.


> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
>
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E


 Yeah, we should go ahead and finally do something.


>
> > Returning PDone is an anti-pattern that should be avoided, but
> changing it now would be backwards incompatible.
>
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
>
>
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>  wrote:
> >
> > I thought that was said about returning a PCollection of write
> results as it’s done in other IOs (as I mentioned as examples) that have
> _additional_ write methods, like “withWriteResults()” etc, that return
> PTransform<…, PCollection>.
> > In this case, we keep backwards compatibility and just add new
> funtionality. Though, we need to follow the same pattern for user API and
> maybe even naming for this feature across different IOs (like we have for
> "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw 
> wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but
> changing it now would be backwards incompatible. PRs to add non-PDone
> returning variants (probably as another option to the builders) that
> compose well with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as
> it was mentioned before, it 

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Reuven Lax
I believe that the Wait transform turns this output into a side input, so
outputting the input PCollection might be problematic.

On Wed, Mar 24, 2021 at 4:49 PM Kenneth Knowles  wrote:

> Alex's idea sounds good and like what Vincent maybe implemented. I am just
> reading really quickly so sorry if I missed something...
>
> Checking out the code for the WriteFn I see a big problem:
>
> @Setup
> public void setup() {
>   writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
> }
>
> @ProcessElement
>   public void processElement(ProcessContext c) throws
> ExecutionException, InterruptedException {
>   writer.mutate(c.element());
> }
>
> @Teardown
> public void teardown() throws Exception {
>   writer.close();
>   writer = null;
> }
>
> It is only in writer.close() that all async writes are waited on. This
> needs to happen in @FinishBundle.
>
> Did you discover this when implementing your own Cassandra.Write?
>
> Until you have waited on the future, you should not output the element as
> "has been written". And you cannot output from the @TearDown method which
> is just for cleaning up resources.
>
> Am I reading this wrong?
>
> Kenn
>
> On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:
>
>> How about a PCollection containing every element which was successfully
>> written?
>> Basically the same things which were passed into it.
>>
>> Then you could act on every element after its been successfully written
>> to the sink.
>>
>> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
>> wrote:
>>
>>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  wrote:
>>>
 +dev

 Since we all agree that we should return something different than
 PDone the real question is what should we return.

>>>
>>> My proposal is that one returns a PCollection that consists,
>>> internally, of something contentless like nulls. This is future compatible
>>> with returning something more maningful based on the source source or write
>>> process itself, but at least this would be followable.
>>>
>>>
 As a reminder we had a pretty interesting discussion about this
 already in the past but uniformization of our return values has not
 happened.
 This thread is worth reading for Vincent or anyone who wants to
 contribute Write transforms that return.

 https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>>
>>>
>>> Yeah, we should go ahead and finally do something.
>>>
>>>

 > Returning PDone is an anti-pattern that should be avoided, but
 changing it now would be backwards incompatible.

 Periodic reminder most IOs are still Experimental so I suppose it is
 worth to the maintainers to judge if the upgrade to return someething
 different of PDone is worth, in that case we can deprecate and remove
 the previous signature in short time (2 releases was the average for
 previous cases).


 On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
  wrote:
 >
 > I thought that was said about returning a PCollection of write
 results as it’s done in other IOs (as I mentioned as examples) that have
 _additional_ write methods, like “withWriteResults()” etc, that return
 PTransform<…, PCollection>.
 > In this case, we keep backwards compatibility and just add new
 funtionality. Though, we need to follow the same pattern for user API and
 maybe even naming for this feature across different IOs (like we have for
 "readAll()” methods).
 >
 >  I agree that we have to avoid returning PDone for such cases.
 >
 > On 24 Mar 2021, at 20:05, Robert Bradshaw 
 wrote:
 >
 > Returning PDone is an anti-pattern that should be avoided, but
 changing it now would be backwards incompatible. PRs to add non-PDone
 returning variants (probably as another option to the builders) that
 compose well with Wait, etc. would be welcome.
 >
 > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
 aromanenko@gmail.com> wrote:
 >>
 >> In this way, I think “Wait” PTransform should work for you but, as
 it was mentioned before, it doesn’t work with PDone, only with PCollection
 as a signal.
 >>
 >> Since you already adjusted your own writer for that, it would be
 great to contribute it back to Beam in the way as it was done for other IOs
 (for example, JdbcIO [1] or BigtableIO [2])
 >>
 >> In general, I think we need to have it for all IOs, at least to use
 with “Wait” because this pattern it's quite often required.
 >>
 >> [1]
 https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
 >> [2]
 

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Kenneth Knowles
Alex's idea sounds good and like what Vincent maybe implemented. I am just
reading really quickly so sorry if I missed something...

Checking out the code for the WriteFn I see a big problem:

@Setup
public void setup() {
  writer = new Mutator<>(spec, Mapper::saveAsync, "writes");
}

@ProcessElement
  public void processElement(ProcessContext c) throws
ExecutionException, InterruptedException {
  writer.mutate(c.element());
}

@Teardown
public void teardown() throws Exception {
  writer.close();
  writer = null;
}

It is only in writer.close() that all async writes are waited on. This
needs to happen in @FinishBundle.

Did you discover this when implementing your own Cassandra.Write?

Until you have waited on the future, you should not output the element as
"has been written". And you cannot output from the @TearDown method which
is just for cleaning up resources.

Am I reading this wrong?

Kenn

On Wed, Mar 24, 2021 at 4:35 PM Alex Amato  wrote:

> How about a PCollection containing every element which was successfully
> written?
> Basically the same things which were passed into it.
>
> Then you could act on every element after its been successfully written to
> the sink.
>
> On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw 
> wrote:
>
>> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  wrote:
>>
>>> +dev
>>>
>>> Since we all agree that we should return something different than
>>> PDone the real question is what should we return.
>>>
>>
>> My proposal is that one returns a PCollection that consists,
>> internally, of something contentless like nulls. This is future compatible
>> with returning something more maningful based on the source source or write
>> process itself, but at least this would be followable.
>>
>>
>>> As a reminder we had a pretty interesting discussion about this
>>> already in the past but uniformization of our return values has not
>>> happened.
>>> This thread is worth reading for Vincent or anyone who wants to
>>> contribute Write transforms that return.
>>>
>>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>>
>>
>> Yeah, we should go ahead and finally do something.
>>
>>
>>>
>>> > Returning PDone is an anti-pattern that should be avoided, but
>>> changing it now would be backwards incompatible.
>>>
>>> Periodic reminder most IOs are still Experimental so I suppose it is
>>> worth to the maintainers to judge if the upgrade to return someething
>>> different of PDone is worth, in that case we can deprecate and remove
>>> the previous signature in short time (2 releases was the average for
>>> previous cases).
>>>
>>>
>>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>>  wrote:
>>> >
>>> > I thought that was said about returning a PCollection of write results
>>> as it’s done in other IOs (as I mentioned as examples) that have
>>> _additional_ write methods, like “withWriteResults()” etc, that return
>>> PTransform<…, PCollection>.
>>> > In this case, we keep backwards compatibility and just add new
>>> funtionality. Though, we need to follow the same pattern for user API and
>>> maybe even naming for this feature across different IOs (like we have for
>>> "readAll()” methods).
>>> >
>>> >  I agree that we have to avoid returning PDone for such cases.
>>> >
>>> > On 24 Mar 2021, at 20:05, Robert Bradshaw  wrote:
>>> >
>>> > Returning PDone is an anti-pattern that should be avoided, but
>>> changing it now would be backwards incompatible. PRs to add non-PDone
>>> returning variants (probably as another option to the builders) that
>>> compose well with Wait, etc. would be welcome.
>>> >
>>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>>> aromanenko@gmail.com> wrote:
>>> >>
>>> >> In this way, I think “Wait” PTransform should work for you but, as it
>>> was mentioned before, it doesn’t work with PDone, only with PCollection as
>>> a signal.
>>> >>
>>> >> Since you already adjusted your own writer for that, it would be
>>> great to contribute it back to Beam in the way as it was done for other IOs
>>> (for example, JdbcIO [1] or BigtableIO [2])
>>> >>
>>> >> In general, I think we need to have it for all IOs, at least to use
>>> with “Wait” because this pattern it's quite often required.
>>> >>
>>> >> [1]
>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>>> >> [2]
>>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>> >>
>>> >> On 24 Mar 2021, at 18:01, Vincent Marquez 
>>> wrote:
>>> >>
>>> >> No, it only needs to ensure that one record seen on Pubsub has
>>> successfully written to a database.  So "record by record" is fine, or even
>>> "bundle".
>>> >>
>>> >> ~Vincent
>>> >>
>>> >>
>>> >> On Wed, 

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Alex Amato
How about a PCollection containing every element which was successfully
written?
Basically the same things which were passed into it.

Then you could act on every element after its been successfully written to
the sink.

On Wed, Mar 24, 2021 at 3:16 PM Robert Bradshaw  wrote:

> On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  wrote:
>
>> +dev
>>
>> Since we all agree that we should return something different than
>> PDone the real question is what should we return.
>>
>
> My proposal is that one returns a PCollection that consists,
> internally, of something contentless like nulls. This is future compatible
> with returning something more maningful based on the source source or write
> process itself, but at least this would be followable.
>
>
>> As a reminder we had a pretty interesting discussion about this
>> already in the past but uniformization of our return values has not
>> happened.
>> This thread is worth reading for Vincent or anyone who wants to
>> contribute Write transforms that return.
>>
>> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E
>
>
> Yeah, we should go ahead and finally do something.
>
>
>>
>> > Returning PDone is an anti-pattern that should be avoided, but changing
>> it now would be backwards incompatible.
>>
>> Periodic reminder most IOs are still Experimental so I suppose it is
>> worth to the maintainers to judge if the upgrade to return someething
>> different of PDone is worth, in that case we can deprecate and remove
>> the previous signature in short time (2 releases was the average for
>> previous cases).
>>
>>
>> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>>  wrote:
>> >
>> > I thought that was said about returning a PCollection of write results
>> as it’s done in other IOs (as I mentioned as examples) that have
>> _additional_ write methods, like “withWriteResults()” etc, that return
>> PTransform<…, PCollection>.
>> > In this case, we keep backwards compatibility and just add new
>> funtionality. Though, we need to follow the same pattern for user API and
>> maybe even naming for this feature across different IOs (like we have for
>> "readAll()” methods).
>> >
>> >  I agree that we have to avoid returning PDone for such cases.
>> >
>> > On 24 Mar 2021, at 20:05, Robert Bradshaw  wrote:
>> >
>> > Returning PDone is an anti-pattern that should be avoided, but changing
>> it now would be backwards incompatible. PRs to add non-PDone returning
>> variants (probably as another option to the builders) that compose well
>> with Wait, etc. would be welcome.
>> >
>> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>> >>
>> >> In this way, I think “Wait” PTransform should work for you but, as it
>> was mentioned before, it doesn’t work with PDone, only with PCollection as
>> a signal.
>> >>
>> >> Since you already adjusted your own writer for that, it would be great
>> to contribute it back to Beam in the way as it was done for other IOs (for
>> example, JdbcIO [1] or BigtableIO [2])
>> >>
>> >> In general, I think we need to have it for all IOs, at least to use
>> with “Wait” because this pattern it's quite often required.
>> >>
>> >> [1]
>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>> >> [2]
>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>> >>
>> >> On 24 Mar 2021, at 18:01, Vincent Marquez 
>> wrote:
>> >>
>> >> No, it only needs to ensure that one record seen on Pubsub has
>> successfully written to a database.  So "record by record" is fine, or even
>> "bundle".
>> >>
>> >> ~Vincent
>> >>
>> >>
>> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
>> aromanenko@gmail.com> wrote:
>> >>>
>> >>> Do you want to wait for ALL records are written for Cassandra and
>> then write all successfully written records to PubSub or it should be
>> performed "record by record"?
>> >>>
>> >>> On 24 Mar 2021, at 04:58, Vincent Marquez 
>> wrote:
>> >>>
>> >>> I have a common use case where my pipeline looks like this:
>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write ->
>> PubSubIO.write
>> >>>
>> >>> I do NOT want my pipeline to look like the following:
>> >>>
>> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>> >>>  |
>> >>>   ->
>> PubsubIO.write
>> >>>
>> >>> Because I need to ensure that only items written to Pubsub have
>> successfully finished a (quorum) write.
>> >>>
>> >>> Since CassandraIO.write is a PTransform I can't actually
>> use it here so I often roll my own 'writer', but maybe there is a
>> recommended way of doing this?
>> >>>
>> >>> Thanks in advance for any help.
>> 

Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Robert Bradshaw
On Wed, Mar 24, 2021 at 2:36 PM Ismaël Mejía  wrote:

> +dev
>
> Since we all agree that we should return something different than
> PDone the real question is what should we return.
>

My proposal is that one returns a PCollection that consists, internally,
of something contentless like nulls. This is future compatible
with returning something more maningful based on the source source or write
process itself, but at least this would be followable.


> As a reminder we had a pretty interesting discussion about this
> already in the past but uniformization of our return values has not
> happened.
> This thread is worth reading for Vincent or anyone who wants to
> contribute Write transforms that return.
>
> https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E


Yeah, we should go ahead and finally do something.


>
> > Returning PDone is an anti-pattern that should be avoided, but changing
> it now would be backwards incompatible.
>
> Periodic reminder most IOs are still Experimental so I suppose it is
> worth to the maintainers to judge if the upgrade to return someething
> different of PDone is worth, in that case we can deprecate and remove
> the previous signature in short time (2 releases was the average for
> previous cases).
>
>
> On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
>  wrote:
> >
> > I thought that was said about returning a PCollection of write results
> as it’s done in other IOs (as I mentioned as examples) that have
> _additional_ write methods, like “withWriteResults()” etc, that return
> PTransform<…, PCollection>.
> > In this case, we keep backwards compatibility and just add new
> funtionality. Though, we need to follow the same pattern for user API and
> maybe even naming for this feature across different IOs (like we have for
> "readAll()” methods).
> >
> >  I agree that we have to avoid returning PDone for such cases.
> >
> > On 24 Mar 2021, at 20:05, Robert Bradshaw  wrote:
> >
> > Returning PDone is an anti-pattern that should be avoided, but changing
> it now would be backwards incompatible. PRs to add non-PDone returning
> variants (probably as another option to the builders) that compose well
> with Wait, etc. would be welcome.
> >
> > On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
> >>
> >> In this way, I think “Wait” PTransform should work for you but, as it
> was mentioned before, it doesn’t work with PDone, only with PCollection as
> a signal.
> >>
> >> Since you already adjusted your own writer for that, it would be great
> to contribute it back to Beam in the way as it was done for other IOs (for
> example, JdbcIO [1] or BigtableIO [2])
> >>
> >> In general, I think we need to have it for all IOs, at least to use
> with “Wait” because this pattern it's quite often required.
> >>
> >> [1]
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
> >> [2]
> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
> >>
> >> On 24 Mar 2021, at 18:01, Vincent Marquez 
> wrote:
> >>
> >> No, it only needs to ensure that one record seen on Pubsub has
> successfully written to a database.  So "record by record" is fine, or even
> "bundle".
> >>
> >> ~Vincent
> >>
> >>
> >> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko <
> aromanenko@gmail.com> wrote:
> >>>
> >>> Do you want to wait for ALL records are written for Cassandra and then
> write all successfully written records to PubSub or it should be performed
> "record by record"?
> >>>
> >>> On 24 Mar 2021, at 04:58, Vincent Marquez 
> wrote:
> >>>
> >>> I have a common use case where my pipeline looks like this:
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
> >>>
> >>> I do NOT want my pipeline to look like the following:
> >>>
> >>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
> >>>  |
> >>>   ->
> PubsubIO.write
> >>>
> >>> Because I need to ensure that only items written to Pubsub have
> successfully finished a (quorum) write.
> >>>
> >>> Since CassandraIO.write is a PTransform I can't actually use
> it here so I often roll my own 'writer', but maybe there is a recommended
> way of doing this?
> >>>
> >>> Thanks in advance for any help.
> >>>
> >>> ~Vincent
> >>>
> >>>
> >>
> >
>


Re: Write to multiple IOs in linear fashion

2021-03-24 Thread Ismaël Mejía
+dev

Since we all agree that we should return something different than
PDone the real question is what should we return.
As a reminder we had a pretty interesting discussion about this
already in the past but uniformization of our return values has not
happened.
This thread is worth reading for Vincent or anyone who wants to
contribute Write transforms that return.
https://lists.apache.org/thread.html/d1a4556a1e13a661cce19021926a5d0997fbbfde016d36989cf75a07%40%3Cdev.beam.apache.org%3E

> Returning PDone is an anti-pattern that should be avoided, but changing it 
> now would be backwards incompatible.

Periodic reminder most IOs are still Experimental so I suppose it is
worth to the maintainers to judge if the upgrade to return someething
different of PDone is worth, in that case we can deprecate and remove
the previous signature in short time (2 releases was the average for
previous cases).


On Wed, Mar 24, 2021 at 10:24 PM Alexey Romanenko
 wrote:
>
> I thought that was said about returning a PCollection of write results as 
> it’s done in other IOs (as I mentioned as examples) that have _additional_ 
> write methods, like “withWriteResults()” etc, that return PTransform<…, 
> PCollection>.
> In this case, we keep backwards compatibility and just add new funtionality. 
> Though, we need to follow the same pattern for user API and maybe even naming 
> for this feature across different IOs (like we have for "readAll()” methods).
>
>  I agree that we have to avoid returning PDone for such cases.
>
> On 24 Mar 2021, at 20:05, Robert Bradshaw  wrote:
>
> Returning PDone is an anti-pattern that should be avoided, but changing it 
> now would be backwards incompatible. PRs to add non-PDone returning variants 
> (probably as another option to the builders) that compose well with Wait, 
> etc. would be welcome.
>
> On Wed, Mar 24, 2021 at 11:14 AM Alexey Romanenko  
> wrote:
>>
>> In this way, I think “Wait” PTransform should work for you but, as it was 
>> mentioned before, it doesn’t work with PDone, only with PCollection as a 
>> signal.
>>
>> Since you already adjusted your own writer for that, it would be great to 
>> contribute it back to Beam in the way as it was done for other IOs (for 
>> example, JdbcIO [1] or BigtableIO [2])
>>
>> In general, I think we need to have it for all IOs, at least to use with 
>> “Wait” because this pattern it's quite often required.
>>
>> [1] 
>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java#L1078
>> [2] 
>> https://github.com/apache/beam/blob/ab1dfa13a983d41669e70e83b11f58a83015004c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L715
>>
>> On 24 Mar 2021, at 18:01, Vincent Marquez  wrote:
>>
>> No, it only needs to ensure that one record seen on Pubsub has successfully 
>> written to a database.  So "record by record" is fine, or even "bundle".
>>
>> ~Vincent
>>
>>
>> On Wed, Mar 24, 2021 at 9:49 AM Alexey Romanenko  
>> wrote:
>>>
>>> Do you want to wait for ALL records are written for Cassandra and then 
>>> write all successfully written records to PubSub or it should be performed 
>>> "record by record"?
>>>
>>> On 24 Mar 2021, at 04:58, Vincent Marquez  wrote:
>>>
>>> I have a common use case where my pipeline looks like this:
>>> CassandraIO.readAll -> Aggregate -> CassandraIO.write -> PubSubIO.write
>>>
>>> I do NOT want my pipeline to look like the following:
>>>
>>> CassandraIO.readAll -> Aggregate -> CassandraIO.write
>>>  |
>>>   -> PubsubIO.write
>>>
>>> Because I need to ensure that only items written to Pubsub have 
>>> successfully finished a (quorum) write.
>>>
>>> Since CassandraIO.write is a PTransform I can't actually use it 
>>> here so I often roll my own 'writer', but maybe there is a recommended way 
>>> of doing this?
>>>
>>> Thanks in advance for any help.
>>>
>>> ~Vincent
>>>
>>>
>>
>