Or perhaps you have a PCollection<String> or something like that, and you want to use those strings to issue queries to Spanner?
PCollection<String> myStrings = p.apply(.....) PCollection<Struct> rows = myStrings.apply( SpannerIO.read() .withInstanceId(instanceId) .withDatabaseId(dbId) .withQuery("SELECT id, name, email FROM users WHERE column = %input%")); Something like that perhaps? If that's the case, it looks like you can use SpanerIO.readAll(), where the input PCollection contains ReadOperations. Something like this: myStrings.apply(MapElements.into(TypeDescriptor.of(ReadOperation).via(myString -> ReadOperation.with...()) .apply(SpannerIO.readAll() .withInstanceId(instanceId) .withDatabaseId(dbId)); You'd have to convert your upstream PCollection into a PCollection of ReadOperations, and then pass that to SpanerIO.readAll(). On Wed, Dec 18, 2019 at 8:53 AM Luke Cwik <lc...@google.com> wrote: > How do you want to use the previous data in the SpannerIO.read()? > > Are you trying to perform a join on a key between two PCollections? If so, > please use CoGroupByKey[1]. > Are you trying to merge two PCollection<Struct> objects? If so, please use > Flatten[2]. > > 1: https://beam.apache.org/documentation/programming-guide/#cogroupbykey > 2: https://beam.apache.org/documentation/programming-guide/#flatten > > On Wed, Dec 18, 2019 at 8:44 AM Ajit Soman <ajit.so...@yantriks.com> > wrote: > >> Hi, >> >> I am creating a pipeline . I want to execute Spanner query once I got >> data from its previous stage. >> >> In java docs, they have given reference for this code. >> >> PCollection<Struct> rows = pipleline.apply( >> SpannerIO.read() >> .withInstanceId(instanceId) >> .withDatabaseId(dbId) >> .withQuery("SELECT id, name, email FROM users")); >> >> >> *In the above code they have applied SpannerIO query to pipeline object >> .But i want this to apply in PCollection.apply() method so that i can use >> its previous stage output in my query.* >> >> PCollection<Struct> rows = (PCollection)pCollection.apply( >> SpannerIO.read() >> .withInstanceId(instanceId) >> .withDatabaseId(dbId) >> .withQuery("SELECT id, name, email FROM users")); >> >> >> Need your help. >> >> Thanks & Regards, >> Ajit >> >> >> >> ------------------------------ >> DISCLAIMER: This email message along with any attachments may contain >> information that is confidential or privileged. If you are not the intended >> recipient or responsible for delivering any of this transmission to an >> intended recipient, you are hereby notified that any dissemination, >> distribution, retention, copying or other use of this message or its >> attachments is prohibited. If you received this message in error, please >> notify the sender immediately and permanently delete all copies of this >> message and attachments. No representation is made that this email is >> free of viruses. Virus scanning is recommended and is the responsibility of >> the recipient. Thank you. > >