I believe that GroupIntoBatches[1] may also be of help and it's available
right now.

Hope that helps,
Stephen

[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java

On Wed, Apr 19, 2017 at 1:07 PM Parker Coleman <[email protected]> wrote:

> We're writing text (JSON fields, one per line).
>
> We've written the DoFn; but our issue is that dozens of instances per
> window are being generated for a relatively small amount of data.  I
> haven't look into it, but it might be as many as one DoFn object per record.
>
> Some psuedo code explaining how we're attempting to write to azure
>
> AzureWriter extends DoFn<String, Void>{
>
> public AzureWriter(String azureFileName)
>
> processElement(){
> //write each string to a temp file, keeping track of a unique hash of the
> file to use as a blockId
> }
>
> finishBundle(){
> //Upload our temp file(s) as blocks to the block blob
> }
>
> }
>
> This scheme works great if each DoFn instance gets many elements (which is
> how I wrote the unit tests, I assumed this would be the case), however,
> that doesn't appear to be happening and I'm not sure how and if I can force
> apache beam to bundle more elements per DoFn instance.
>
> So I'm leaning toward option 3, where we would do a GroupBy before calling
> the DoFn to write to azure, which would change the signature from
> DoFn<String, Void> to DoFn<Iterable<String>, Void>.  I don't know what the
> consequences of this would be if the amount of data were too large to fit
> on a single machine though, would different Iterable<String> objects be
> created to distribute the load?
>
> Can you give me a rough timeline on when the IOChannelFactory refactor
> will be in the snapshot build?
>
> Thanks for the reply.
>
> On Wed, Apr 19, 2017 at 12:33 PM, Jean-Baptiste Onofré <[email protected]>
> wrote:
>
>> Hi Parker,
>>
>> What's the format you want to write on Azure ?
>>
>> Right now, I would say the easiest way is to write your own DoFn that
>> writes on Azure.
>>
>> By the way, we are working on a complete IOChannelFactory refactoring
>> with Beam filesystems. It means that we will be able to write a Beam
>> AzureFileSystem with a corresponding scheme that you will be able to use
>> like (assuming you want to write a text format):
>>
>> pipeline.apply()...apply(TextIO.write().to("azure:/...")
>>
>> Regards
>> JB
>>
>>
>> On 04/19/2017 06:52 PM, Parker Coleman wrote:
>>
>>> Sorry if this is a double-post, I tried using the web client but didn't
>>> see the
>>> post show up, so instead I've subscribed to the list and am sending this
>>> via my
>>> email client.
>>>
>>> We're working on an apache beam pipeline that gathers messages from
>>> various
>>> pub/sub topics, decodes them, bundles them into groups by hour (using a
>>> FixedWindow) and sends them to Azure.  The eventual format of the
>>> records is
>>> line-delimited JSON.
>>>
>>> There isn't a sink written for Azure yet, and we want the ability to
>>> control
>>> which file names get created for each window firing (we want the
>>> date/time
>>> appended to the file name, it looks like existing sinks don't give you
>>> much
>>> control over the filename outside of specifying sharding).
>>>
>>> We decided to try writing our data to Azure via a DoFn instead.  For the
>>> processElement it writes a single record to a temp file.  In the
>>> finishBundle
>>> step, we acquire a lease on a blockId based off the hash of our temp
>>> file, and
>>> upload the data.
>>>
>>> This would work fine if each instance of our DoFn were used for more
>>> than one
>>> record.  However, we're running into the same issue from here:
>>>
>>> http://stackoverflow.com/questions/42914198/buffer-and-flush-apache-beam-streaming-data
>>> ,
>>> each of our records is getting its own DoFn instance.
>>>
>>> So my question is: Whats the best way to proceed?  I see a few options
>>>
>>> 1.) Try implementing the stateful processing/trigger mechanism suggested
>>> in the
>>> stackoverflow post.  Use this state instead of the temp files and
>>> trigger writes
>>> via triggers
>>> 2.) Try implementing a FileBasedSink.  I'd prefer the flexibility of the
>>> Sink
>>> abstract class, but looking at the Apache Beam Jira project, that might
>>> be going
>>> away soon (https://issues.apache.org/jira/browse/BEAM-1897).  I'd like
>>> to stay
>>> away from that based on that uncertainty.
>>>
>>> I saw in this stack overflow post
>>>
>>> http://stackoverflow.com/questions/41386717/azure-blob-support-in-apache-beam
>>> that it might be nice to have a dedicated Azure Sink, if I could get a
>>> good
>>> rundown of what needs to be done, I might be able to contribute this.
>>> 3.) Do a GroupBy when our window fires.  This way instead of having a
>>> DoFn<String>, it would be a DoFn<Iterable<String>>.  I'm not sure what
>>> would
>>> happen if the result of the GroupBy couldn't fit in a single machine's
>>> memory
>>> though (say we uploaded a bunch of data during that 1 hour).  Would the
>>> Iterable<String>'s records be split across multiple machines?  If so, I
>>> like
>>> this solution best.
>>> 4.) Something else?
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> [email protected]
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>

Reply via email to