I believe that GroupIntoBatches[1] may also be of help and it's available right now.
Hope that helps, Stephen [1] https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java On Wed, Apr 19, 2017 at 1:07 PM Parker Coleman <[email protected]> wrote: > We're writing text (JSON fields, one per line). > > We've written the DoFn; but our issue is that dozens of instances per > window are being generated for a relatively small amount of data. I > haven't look into it, but it might be as many as one DoFn object per record. > > Some psuedo code explaining how we're attempting to write to azure > > AzureWriter extends DoFn<String, Void>{ > > public AzureWriter(String azureFileName) > > processElement(){ > //write each string to a temp file, keeping track of a unique hash of the > file to use as a blockId > } > > finishBundle(){ > //Upload our temp file(s) as blocks to the block blob > } > > } > > This scheme works great if each DoFn instance gets many elements (which is > how I wrote the unit tests, I assumed this would be the case), however, > that doesn't appear to be happening and I'm not sure how and if I can force > apache beam to bundle more elements per DoFn instance. > > So I'm leaning toward option 3, where we would do a GroupBy before calling > the DoFn to write to azure, which would change the signature from > DoFn<String, Void> to DoFn<Iterable<String>, Void>. I don't know what the > consequences of this would be if the amount of data were too large to fit > on a single machine though, would different Iterable<String> objects be > created to distribute the load? > > Can you give me a rough timeline on when the IOChannelFactory refactor > will be in the snapshot build? > > Thanks for the reply. > > On Wed, Apr 19, 2017 at 12:33 PM, Jean-Baptiste Onofré <[email protected]> > wrote: > >> Hi Parker, >> >> What's the format you want to write on Azure ? >> >> Right now, I would say the easiest way is to write your own DoFn that >> writes on Azure. >> >> By the way, we are working on a complete IOChannelFactory refactoring >> with Beam filesystems. It means that we will be able to write a Beam >> AzureFileSystem with a corresponding scheme that you will be able to use >> like (assuming you want to write a text format): >> >> pipeline.apply()...apply(TextIO.write().to("azure:/...") >> >> Regards >> JB >> >> >> On 04/19/2017 06:52 PM, Parker Coleman wrote: >> >>> Sorry if this is a double-post, I tried using the web client but didn't >>> see the >>> post show up, so instead I've subscribed to the list and am sending this >>> via my >>> email client. >>> >>> We're working on an apache beam pipeline that gathers messages from >>> various >>> pub/sub topics, decodes them, bundles them into groups by hour (using a >>> FixedWindow) and sends them to Azure. The eventual format of the >>> records is >>> line-delimited JSON. >>> >>> There isn't a sink written for Azure yet, and we want the ability to >>> control >>> which file names get created for each window firing (we want the >>> date/time >>> appended to the file name, it looks like existing sinks don't give you >>> much >>> control over the filename outside of specifying sharding). >>> >>> We decided to try writing our data to Azure via a DoFn instead. For the >>> processElement it writes a single record to a temp file. In the >>> finishBundle >>> step, we acquire a lease on a blockId based off the hash of our temp >>> file, and >>> upload the data. >>> >>> This would work fine if each instance of our DoFn were used for more >>> than one >>> record. However, we're running into the same issue from here: >>> >>> http://stackoverflow.com/questions/42914198/buffer-and-flush-apache-beam-streaming-data >>> , >>> each of our records is getting its own DoFn instance. >>> >>> So my question is: Whats the best way to proceed? I see a few options >>> >>> 1.) Try implementing the stateful processing/trigger mechanism suggested >>> in the >>> stackoverflow post. Use this state instead of the temp files and >>> trigger writes >>> via triggers >>> 2.) Try implementing a FileBasedSink. I'd prefer the flexibility of the >>> Sink >>> abstract class, but looking at the Apache Beam Jira project, that might >>> be going >>> away soon (https://issues.apache.org/jira/browse/BEAM-1897). I'd like >>> to stay >>> away from that based on that uncertainty. >>> >>> I saw in this stack overflow post >>> >>> http://stackoverflow.com/questions/41386717/azure-blob-support-in-apache-beam >>> that it might be nice to have a dedicated Azure Sink, if I could get a >>> good >>> rundown of what needs to be done, I might be able to contribute this. >>> 3.) Do a GroupBy when our window fires. This way instead of having a >>> DoFn<String>, it would be a DoFn<Iterable<String>>. I'm not sure what >>> would >>> happen if the result of the GroupBy couldn't fit in a single machine's >>> memory >>> though (say we uploaded a bunch of data during that 1 hour). Would the >>> Iterable<String>'s records be split across multiple machines? If so, I >>> like >>> this solution best. >>> 4.) Something else? >>> >> >> -- >> Jean-Baptiste Onofré >> [email protected] >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> > >
