How do you want to use the previous data in the SpannerIO.read()? Are you trying to perform a join on a key between two PCollections? If so, please use CoGroupByKey[1]. Are you trying to merge two PCollection<Struct> objects? If so, please use Flatten[2].
1: https://beam.apache.org/documentation/programming-guide/#cogroupbykey 2: https://beam.apache.org/documentation/programming-guide/#flatten On Wed, Dec 18, 2019 at 8:44 AM Ajit Soman <ajit.so...@yantriks.com> wrote: > Hi, > > I am creating a pipeline . I want to execute Spanner query once I got data > from its previous stage. > > In java docs, they have given reference for this code. > > PCollection<Struct> rows = pipleline.apply( > SpannerIO.read() > .withInstanceId(instanceId) > .withDatabaseId(dbId) > .withQuery("SELECT id, name, email FROM users")); > > > *In the above code they have applied SpannerIO query to pipeline object > .But i want this to apply in PCollection.apply() method so that i can use > its previous stage output in my query.* > > PCollection<Struct> rows = (PCollection)pCollection.apply( > SpannerIO.read() > .withInstanceId(instanceId) > .withDatabaseId(dbId) > .withQuery("SELECT id, name, email FROM users")); > > > Need your help. > > Thanks & Regards, > Ajit > > > > ------------------------------ > DISCLAIMER: This email message along with any attachments may contain > information that is confidential or privileged. If you are not the intended > recipient or responsible for delivering any of this transmission to an > intended recipient, you are hereby notified that any dissemination, > distribution, retention, copying or other use of this message or its > attachments is prohibited. If you received this message in error, please > notify the sender immediately and permanently delete all copies of this > message and attachments. No representation is made that this email is > free of viruses. Virus scanning is recommended and is the responsibility of > the recipient. Thank you.