Thanks a bunch Stephen, this looks like exactly what I need.  I'll post
follow up questions if I have any.

Thanks again,
Parker Coleman

On Wed, Apr 19, 2017 at 1:41 PM, Stephen Sisk <[email protected]> wrote:

> I believe that GroupIntoBatches[1] may also be of help and it's available
> right now.
>
> Hope that helps,
> Stephen
>
> [1]https://github.com/apache/beam/blob/master/sdks/java/
> core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
>
> On Wed, Apr 19, 2017 at 1:07 PM Parker Coleman <[email protected]>
> wrote:
>
>> We're writing text (JSON fields, one per line).
>>
>> We've written the DoFn; but our issue is that dozens of instances per
>> window are being generated for a relatively small amount of data.  I
>> haven't look into it, but it might be as many as one DoFn object per record.
>>
>> Some psuedo code explaining how we're attempting to write to azure
>>
>> AzureWriter extends DoFn<String, Void>{
>>
>> public AzureWriter(String azureFileName)
>>
>> processElement(){
>> //write each string to a temp file, keeping track of a unique hash of the
>> file to use as a blockId
>> }
>>
>> finishBundle(){
>> //Upload our temp file(s) as blocks to the block blob
>> }
>>
>> }
>>
>> This scheme works great if each DoFn instance gets many elements (which
>> is how I wrote the unit tests, I assumed this would be the case), however,
>> that doesn't appear to be happening and I'm not sure how and if I can force
>> apache beam to bundle more elements per DoFn instance.
>>
>> So I'm leaning toward option 3, where we would do a GroupBy before
>> calling the DoFn to write to azure, which would change the signature from
>> DoFn<String, Void> to DoFn<Iterable<String>, Void>.  I don't know what the
>> consequences of this would be if the amount of data were too large to fit
>> on a single machine though, would different Iterable<String> objects be
>> created to distribute the load?
>>
>> Can you give me a rough timeline on when the IOChannelFactory refactor
>> will be in the snapshot build?
>>
>> Thanks for the reply.
>>
>> On Wed, Apr 19, 2017 at 12:33 PM, Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>>> Hi Parker,
>>>
>>> What's the format you want to write on Azure ?
>>>
>>> Right now, I would say the easiest way is to write your own DoFn that
>>> writes on Azure.
>>>
>>> By the way, we are working on a complete IOChannelFactory refactoring
>>> with Beam filesystems. It means that we will be able to write a Beam
>>> AzureFileSystem with a corresponding scheme that you will be able to use
>>> like (assuming you want to write a text format):
>>>
>>> pipeline.apply()...apply(TextIO.write().to("azure:/...")
>>>
>>> Regards
>>> JB
>>>
>>>
>>> On 04/19/2017 06:52 PM, Parker Coleman wrote:
>>>
>>>> Sorry if this is a double-post, I tried using the web client but didn't
>>>> see the
>>>> post show up, so instead I've subscribed to the list and am sending
>>>> this via my
>>>> email client.
>>>>
>>>> We're working on an apache beam pipeline that gathers messages from
>>>> various
>>>> pub/sub topics, decodes them, bundles them into groups by hour (using a
>>>> FixedWindow) and sends them to Azure.  The eventual format of the
>>>> records is
>>>> line-delimited JSON.
>>>>
>>>> There isn't a sink written for Azure yet, and we want the ability to
>>>> control
>>>> which file names get created for each window firing (we want the
>>>> date/time
>>>> appended to the file name, it looks like existing sinks don't give you
>>>> much
>>>> control over the filename outside of specifying sharding).
>>>>
>>>> We decided to try writing our data to Azure via a DoFn instead.  For the
>>>> processElement it writes a single record to a temp file.  In the
>>>> finishBundle
>>>> step, we acquire a lease on a blockId based off the hash of our temp
>>>> file, and
>>>> upload the data.
>>>>
>>>> This would work fine if each instance of our DoFn were used for more
>>>> than one
>>>> record.  However, we're running into the same issue from here:
>>>> http://stackoverflow.com/questions/42914198/buffer-and-
>>>> flush-apache-beam-streaming-data,
>>>> each of our records is getting its own DoFn instance.
>>>>
>>>> So my question is: Whats the best way to proceed?  I see a few options
>>>>
>>>> 1.) Try implementing the stateful processing/trigger mechanism
>>>> suggested in the
>>>> stackoverflow post.  Use this state instead of the temp files and
>>>> trigger writes
>>>> via triggers
>>>> 2.) Try implementing a FileBasedSink.  I'd prefer the flexibility of
>>>> the Sink
>>>> abstract class, but looking at the Apache Beam Jira project, that might
>>>> be going
>>>> away soon (https://issues.apache.org/jira/browse/BEAM-1897).  I'd like
>>>> to stay
>>>> away from that based on that uncertainty.
>>>>
>>>> I saw in this stack overflow post
>>>> http://stackoverflow.com/questions/41386717/azure-blob-
>>>> support-in-apache-beam
>>>> that it might be nice to have a dedicated Azure Sink, if I could get a
>>>> good
>>>> rundown of what needs to be done, I might be able to contribute this.
>>>> 3.) Do a GroupBy when our window fires.  This way instead of having a
>>>> DoFn<String>, it would be a DoFn<Iterable<String>>.  I'm not sure what
>>>> would
>>>> happen if the result of the GroupBy couldn't fit in a single machine's
>>>> memory
>>>> though (say we uploaded a bunch of data during that 1 hour).  Would the
>>>> Iterable<String>'s records be split across multiple machines?  If so, I
>>>> like
>>>> this solution best.
>>>> 4.) Something else?
>>>>
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> [email protected]
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>>

Reply via email to