Or perhaps you have a PCollection<String> or something like that, and you
want to use those strings to issue queries to Spanner?

PCollection<String> myStrings = p.apply(.....)

PCollection<Struct> rows = myStrings.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(dbId)
        .withQuery("SELECT id, name, email FROM users WHERE column = %input%"));


Something like that perhaps?


If that's the case, it looks like you can use SpanerIO.readAll(),
where the input PCollection contains ReadOperations. Something like
this:


myStrings.apply(MapElements.into(TypeDescriptor.of(ReadOperation).via(myString
-> ReadOperation.with...())

         .apply(SpannerIO.readAll()
                                    .withInstanceId(instanceId)
                                    .withDatabaseId(dbId));


You'd have to convert your upstream PCollection into a PCollection of
ReadOperations, and then pass that to SpanerIO.readAll().


On Wed, Dec 18, 2019 at 8:53 AM Luke Cwik <lc...@google.com> wrote:

> How do you want to use the previous data in the SpannerIO.read()?
>
> Are you trying to perform a join on a key between two PCollections? If so,
> please use CoGroupByKey[1].
> Are you trying to merge two PCollection<Struct> objects? If so, please use
> Flatten[2].
>
> 1: https://beam.apache.org/documentation/programming-guide/#cogroupbykey
> 2: https://beam.apache.org/documentation/programming-guide/#flatten
>
> On Wed, Dec 18, 2019 at 8:44 AM Ajit Soman <ajit.so...@yantriks.com>
> wrote:
>
>> Hi,
>>
>> I am creating a pipeline . I want to execute Spanner query once I got
>> data from its previous stage.
>>
>> In java docs, they have given reference for this code.
>>
>> PCollection<Struct> rows = pipleline.apply(
>>     SpannerIO.read()
>>         .withInstanceId(instanceId)
>>         .withDatabaseId(dbId)
>>         .withQuery("SELECT id, name, email FROM users"));
>>
>>
>> *In the above code they have applied SpannerIO query to pipeline object
>> .But i want this to apply in PCollection.apply() method so that i can use
>> its previous stage output in my query.*
>>
>> PCollection<Struct> rows = (PCollection)pCollection.apply(
>>     SpannerIO.read()
>>         .withInstanceId(instanceId)
>>         .withDatabaseId(dbId)
>>         .withQuery("SELECT id, name, email FROM users"));
>>
>>
>> Need your help.
>>
>> Thanks & Regards,
>> Ajit
>>
>>
>>
>> ------------------------------
>> DISCLAIMER: This email message along with any attachments may contain
>> information that is confidential or privileged. If you are not the intended
>> recipient or responsible for delivering any of this transmission to an
>> intended recipient, you are hereby notified that any dissemination,
>> distribution, retention, copying or other use of this message or its
>> attachments is prohibited. If you received this message in error, please
>> notify the sender immediately and permanently delete all copies of this
>> message and attachments. No representation is made that this email is
>> free of viruses. Virus scanning is recommended and is the responsibility of
>> the recipient. Thank you.
>
>

Reply via email to