I've just learned that there are these transforms that should be useful:

p.apply(FileIO.match().filepattern(...))
  .apply(WithKeys.of((Void) null))
  .apply(GroupByKey.create())
  .apply(Values.create())
  .apply(Flatten.itearables())
  .apply(FileIO.readMatches())
  .apply(ParDo.of(new ConsumeFileDescriptors());

-P.

On Mon, Jan 7, 2019 at 5:23 PM Pablo Estrada <pabl...@google.com> wrote:

> Hi Matt,
>
> I am much more familiar with Python, so I usually answer questions using
> that SDK. Also, it's quicker to type a fully detailed pipeline on an email
> and the SDKs are similar enough that it should not be too difficult to
> translate to Java from an IDE.
>
> To your questions:
> 1. Grouping like that should not create a single in-memory iterable.
> Runners provide lazy iterables that load elements as you go through them.
>
> 2. The Java SDK provides the FileIO.match(), and FileIO.readMatches()
> transforms, which generate a list of file metadata objects, and convert
> them into file descriptors that can be read in a ParDo (respectively). I
> think those should do the trick for you.
>
> I guess, in Java you'd do something like so:
>
> p.apply(FileIO.match().filepattern(...))
>   .apply(ParDo.of(new AddSingleKeyDoFn())
>   .apply(GroupByKey.create())
>   .apply(ParDo.of(new DropKeyDoFn())
>   .apply(FileIO.readMatches())
>   .apply(ParDo.of(new ConsumeFileDescriptors());
>
> You'll have to follow the FileIO documentation
> <https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/FileIO.html>[1]
> to see what the file descriptors look like, and how to use the match
> function.
>
> Best
> -P.
>
> [1]
> https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/FileIO.html
>
> On Mon, Jan 7, 2019 at 2:41 PM Matt Casters <mattcast...@gmail.com> wrote:
>
>> Hi Pablo,
>>
>> Apologies, I thought the cases were very simple and clear.  Obviously I
>> should have also mentioned I'm in Java land, not used to the script kiddy
>> stuff :-)
>>
>> On the output side: thanks for the grouping "trick".  However, doesn't
>> that mean that all rows will end up in a single in-memory Iterable?
>>
>> On the input side, reading a JSON or XML file shouldn't be black magic
>> but apparently it's completely undocumented as far as I could tell.  Any
>> tips there?
>>
>> Mind you, if it's not possible in the Beam API, feel free to let me know
>> and I'll dive into the Beam source code to figure something out.
>>
>> Cheers,
>>
>> Matt
>>
>>
>>
>> ---
>> Matt Casters <m <mcast...@pentaho.org>attcast...@gmail.com>
>> Senior Solution Architect, Kettle Project Founder
>>
>>
>>
>> Op ma 7 jan. 2019 om 23:09 schreef Pablo Estrada <pabl...@google.com>:
>>
>>> Hi Matt,
>>> is this computation running as part of a larger pipeline that does run
>>> some parallel processing? Otherwise, it's odd that it needs to run on Beam.
>>> Nonetheless, you can certainly do this with a pipeline that has a single
>>> element. Here's what that looks like in python:
>>>
>>> p | beam.Create(['gs://myfile.json']) | beam.ParDo(LoadEachFile()) |
>>> WriteToMyDatabase()
>>>
>>> If, on the other hand, you may have a PCollection with multiple elements
>>> (e.g. filenames), and you want to process them one-by-one, you can group
>>> them all on a single key, like so:
>>>
>>> my_filename_pcolll | GroupIntoSingleThread() |
>>> beam.PArDo(LoadEachFile()) | WriteToMyDatabase()
>>>
>>> Where the GroupIntoSingleThread transform looks basically like so:
>>>
>>> input | beam.Map(lambda x: ('singlekey', x)) | beam.GroupByKey() |
>>> beam.FlatMap(lambda x: x[1])
>>>
>>> In this example, we are adding a single key to all elements, grouping
>>> them all together, and then throwing away the key, to get each of the
>>> elements one-by-one in a single thread. You can do something similar using
>>> side inputs (with AsIter(my_filename_pcoll)).
>>>
>>> Does that help? Or perhaps you could clarify a bit more about your use
>>> case.
>>> Best
>>> -P.
>>>
>>> On Mon, Jan 7, 2019 at 1:33 PM Matt Casters <mattcast...@gmail.com>
>>> wrote:
>>>
>>>> Hi Beam!
>>>>
>>>> There's a bunch of stuff that I would like to support and it's probably
>>>> something silly but I couldn't find it immediately ... or I'm completely
>>>> dim and making too much of certain things.
>>>>
>>>> The thing is, sometimes you just want to do a single threaded
>>>> operations.
>>>> For example, we sometimes need to read generic JSON or XML documents or
>>>> perform single threaded bulk loads into certain databases.
>>>> There's also simple relational database data you might want to
>>>> side-load or data from some web service somewhere.
>>>>
>>>> So, how can I instruct Beam not to fire up a bunch of readers or
>>>> writers, what is a good alternative for ParDo?
>>>>
>>>> Thanks in advance for any suggestions!
>>>>
>>>> Matt
>>>> ---
>>>> Matt Casters <m <mcast...@pentaho.org>attcast...@gmail.com>
>>>> Senior Solution Architect, Kettle Project Founder
>>>>
>>>>
>>>>

Reply via email to