Hi Frank,

If you want to write a static PCollection, you can use PTransform (even better is to create kind of ConstIO).

On the other hand, you can change/override the PCollection using:

.apply(ParDo.of(new DoFn() {
  public void processElement(ProcessContext c) {
    c.output(...);
  }
});

Regards
JB

On 06/15/2016 01:26 PM, Frank Wilson wrote:
How can I follow up a write with another operation that users another
pcollection from somewhere else in the pipeline?

I came up with this 'Const' transform that returns a fixed PCollection,
is there anything similar to it in the SDK? I couldn't find anything
like it.

Here's my (not very well tested) code.

class Const<T> extends PTransform<PBegin, PCollection<T>> {
     private final PCollection<T> sourceFiles;

     public Const(PCollection<T> sourceFiles) {
         this.sourceFiles = sourceFiles;
     }

     @Override
     public PCollection<T> apply(PBegin input) {
         return sourceFiles;
     }
}

// usage

PCollection<KV<String, FileData>> data = inputFileDataByFileName();

PCollection<String> fileNames = data.apply("GetFileNames", Keys.create());

PDone writeOp = data.apply(new TransformFileData())
     .apply((TextIO.Write.named("WriteTransformedData")
         .to("myfile"));

writeOp.getPipeline().apply(new Const(fileNames)).apply(new
DeleteInputFiles());

Some other tests that I ran (I used Create.of() instead of Const)
indicated that if
the writeOp fails the following operation - DeleteInputFiles - will not
be run. Is this true in general?

Thanks,


Frank

--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to