Hi Frank,
If you want to write a static PCollection, you can use PTransform (even
better is to create kind of ConstIO).
On the other hand, you can change/override the PCollection using:
.apply(ParDo.of(new DoFn() {
public void processElement(ProcessContext c) {
c.output(...);
}
});
Regards
JB
On 06/15/2016 01:26 PM, Frank Wilson wrote:
How can I follow up a write with another operation that users another
pcollection from somewhere else in the pipeline?
I came up with this 'Const' transform that returns a fixed PCollection,
is there anything similar to it in the SDK? I couldn't find anything
like it.
Here's my (not very well tested) code.
class Const<T> extends PTransform<PBegin, PCollection<T>> {
private final PCollection<T> sourceFiles;
public Const(PCollection<T> sourceFiles) {
this.sourceFiles = sourceFiles;
}
@Override
public PCollection<T> apply(PBegin input) {
return sourceFiles;
}
}
// usage
PCollection<KV<String, FileData>> data = inputFileDataByFileName();
PCollection<String> fileNames = data.apply("GetFileNames", Keys.create());
PDone writeOp = data.apply(new TransformFileData())
.apply((TextIO.Write.named("WriteTransformedData")
.to("myfile"));
writeOp.getPipeline().apply(new Const(fileNames)).apply(new
DeleteInputFiles());
Some other tests that I ran (I used Create.of() instead of Const)
indicated that if
the writeOp fails the following operation - DeleteInputFiles - will not
be run. Is this true in general?
Thanks,
Frank
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com