The Beam sink API doesn't currently support "after this completes, then
execute this transform". We would like to provide support for this type of
Pipeline in the future, however.

With regards to the reuse of elements, you don't need the Const transform;
a PCollection is an immutable collection of elements, and PTransforms do
not mutate the contents but produce a new PCollection of the transformed
elements, and thus you can reuse the PCollection you are passing to const
directly. your code above could be written as

PCollection<String> fileNames = data.apply("GetFileNames", Keys.create());

PDone writeOp = data.apply(new TransformFileData())
    .apply((TextIO.Write.named("WriteTransformedData")
        .to("myfile"));

fileNames.apply(new DeleteInputFiles());

However, due to the fact that you can't currently enforce the order of the
write elements and the delete input files, this may delete files whenever
the runner chooses to execute the transform, which could be at any point
before or after those files would be otherwise read.

On Wed, Jun 15, 2016 at 4:33 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

> Hi Frank,
>
> If you want to write a static PCollection, you can use PTransform (even
> better is to create kind of ConstIO).
>
> On the other hand, you can change/override the PCollection using:
>
> .apply(ParDo.of(new DoFn() {
>   public void processElement(ProcessContext c) {
>     c.output(...);
>   }
> });
>
> Regards
> JB
>
>
> On 06/15/2016 01:26 PM, Frank Wilson wrote:
>
>> How can I follow up a write with another operation that users another
>> pcollection from somewhere else in the pipeline?
>>
>> I came up with this 'Const' transform that returns a fixed PCollection,
>> is there anything similar to it in the SDK? I couldn't find anything
>> like it.
>>
>> Here's my (not very well tested) code.
>>
>> class Const<T> extends PTransform<PBegin, PCollection<T>> {
>>      private final PCollection<T> sourceFiles;
>>
>>      public Const(PCollection<T> sourceFiles) {
>>          this.sourceFiles = sourceFiles;
>>      }
>>
>>      @Override
>>      public PCollection<T> apply(PBegin input) {
>>          return sourceFiles;
>>      }
>> }
>>
>> // usage
>>
>> PCollection<KV<String, FileData>> data = inputFileDataByFileName();
>>
>> PCollection<String> fileNames = data.apply("GetFileNames", Keys.create());
>>
>> PDone writeOp = data.apply(new TransformFileData())
>>      .apply((TextIO.Write.named("WriteTransformedData")
>>          .to("myfile"));
>>
>> writeOp.getPipeline().apply(new Const(fileNames)).apply(new
>> DeleteInputFiles());
>>
>> Some other tests that I ran (I used Create.of() instead of Const)
>> indicated that if
>> the writeOp fails the following operation - DeleteInputFiles - will not
>> be run. Is this true in general?
>>
>> Thanks,
>>
>>
>> Frank
>>
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to