Thanks a bunch Stephen, this looks like exactly what I need. I'll post follow up questions if I have any.
Thanks again, Parker Coleman On Wed, Apr 19, 2017 at 1:41 PM, Stephen Sisk <[email protected]> wrote: > I believe that GroupIntoBatches[1] may also be of help and it's available > right now. > > Hope that helps, > Stephen > > [1]https://github.com/apache/beam/blob/master/sdks/java/ > core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java > > On Wed, Apr 19, 2017 at 1:07 PM Parker Coleman <[email protected]> > wrote: > >> We're writing text (JSON fields, one per line). >> >> We've written the DoFn; but our issue is that dozens of instances per >> window are being generated for a relatively small amount of data. I >> haven't look into it, but it might be as many as one DoFn object per record. >> >> Some psuedo code explaining how we're attempting to write to azure >> >> AzureWriter extends DoFn<String, Void>{ >> >> public AzureWriter(String azureFileName) >> >> processElement(){ >> //write each string to a temp file, keeping track of a unique hash of the >> file to use as a blockId >> } >> >> finishBundle(){ >> //Upload our temp file(s) as blocks to the block blob >> } >> >> } >> >> This scheme works great if each DoFn instance gets many elements (which >> is how I wrote the unit tests, I assumed this would be the case), however, >> that doesn't appear to be happening and I'm not sure how and if I can force >> apache beam to bundle more elements per DoFn instance. >> >> So I'm leaning toward option 3, where we would do a GroupBy before >> calling the DoFn to write to azure, which would change the signature from >> DoFn<String, Void> to DoFn<Iterable<String>, Void>. I don't know what the >> consequences of this would be if the amount of data were too large to fit >> on a single machine though, would different Iterable<String> objects be >> created to distribute the load? >> >> Can you give me a rough timeline on when the IOChannelFactory refactor >> will be in the snapshot build? >> >> Thanks for the reply. >> >> On Wed, Apr 19, 2017 at 12:33 PM, Jean-Baptiste Onofré <[email protected]> >> wrote: >> >>> Hi Parker, >>> >>> What's the format you want to write on Azure ? >>> >>> Right now, I would say the easiest way is to write your own DoFn that >>> writes on Azure. >>> >>> By the way, we are working on a complete IOChannelFactory refactoring >>> with Beam filesystems. It means that we will be able to write a Beam >>> AzureFileSystem with a corresponding scheme that you will be able to use >>> like (assuming you want to write a text format): >>> >>> pipeline.apply()...apply(TextIO.write().to("azure:/...") >>> >>> Regards >>> JB >>> >>> >>> On 04/19/2017 06:52 PM, Parker Coleman wrote: >>> >>>> Sorry if this is a double-post, I tried using the web client but didn't >>>> see the >>>> post show up, so instead I've subscribed to the list and am sending >>>> this via my >>>> email client. >>>> >>>> We're working on an apache beam pipeline that gathers messages from >>>> various >>>> pub/sub topics, decodes them, bundles them into groups by hour (using a >>>> FixedWindow) and sends them to Azure. The eventual format of the >>>> records is >>>> line-delimited JSON. >>>> >>>> There isn't a sink written for Azure yet, and we want the ability to >>>> control >>>> which file names get created for each window firing (we want the >>>> date/time >>>> appended to the file name, it looks like existing sinks don't give you >>>> much >>>> control over the filename outside of specifying sharding). >>>> >>>> We decided to try writing our data to Azure via a DoFn instead. For the >>>> processElement it writes a single record to a temp file. In the >>>> finishBundle >>>> step, we acquire a lease on a blockId based off the hash of our temp >>>> file, and >>>> upload the data. >>>> >>>> This would work fine if each instance of our DoFn were used for more >>>> than one >>>> record. However, we're running into the same issue from here: >>>> http://stackoverflow.com/questions/42914198/buffer-and- >>>> flush-apache-beam-streaming-data, >>>> each of our records is getting its own DoFn instance. >>>> >>>> So my question is: Whats the best way to proceed? I see a few options >>>> >>>> 1.) Try implementing the stateful processing/trigger mechanism >>>> suggested in the >>>> stackoverflow post. Use this state instead of the temp files and >>>> trigger writes >>>> via triggers >>>> 2.) Try implementing a FileBasedSink. I'd prefer the flexibility of >>>> the Sink >>>> abstract class, but looking at the Apache Beam Jira project, that might >>>> be going >>>> away soon (https://issues.apache.org/jira/browse/BEAM-1897). I'd like >>>> to stay >>>> away from that based on that uncertainty. >>>> >>>> I saw in this stack overflow post >>>> http://stackoverflow.com/questions/41386717/azure-blob- >>>> support-in-apache-beam >>>> that it might be nice to have a dedicated Azure Sink, if I could get a >>>> good >>>> rundown of what needs to be done, I might be able to contribute this. >>>> 3.) Do a GroupBy when our window fires. This way instead of having a >>>> DoFn<String>, it would be a DoFn<Iterable<String>>. I'm not sure what >>>> would >>>> happen if the result of the GroupBy couldn't fit in a single machine's >>>> memory >>>> though (say we uploaded a bunch of data during that 1 hour). Would the >>>> Iterable<String>'s records be split across multiple machines? If so, I >>>> like >>>> this solution best. >>>> 4.) Something else? >>>> >>> >>> -- >>> Jean-Baptiste Onofré >>> [email protected] >>> http://blog.nanthrax.net >>> Talend - http://www.talend.com >>> >> >>
