using FileIO to read a single input file
I have some side inputs that I would like to add to my pipeline. Some of them are based on a file pattern, so I found that I can collect the contents of those files using a pattern like the following: val genotypes = p.apply(FileIO.`match`.filepattern(opts.getGenotypesFilePattern())) .apply(FileIO.readMatches) .apply("ReadGenotypesFile", ParDo.of(new ReadFileAsBytes())) .apply("UnmarshalGenotypes", ParDo.of(new UnmarshalGenotypesDoFn())) .apply("GenotypesAsMap", Combine.globally[Genotypes, ibd.GenotypesMap](new CombineGenotypesFn)) .apply("ViewAsGeneticMap", View.asSingleton[ibd.GenotypesMap]) (the code snippet is Scala...) I have another input - just a single file containing some protobuf. How do I construct a single FileIO.ReadableFile rather than using the "match"? Trying to avoid CombineGlobally - I assume that would be more correct to let Beam know the expected data and perhaps more performant. Thanks in advance, rdm
Re: Strange 'gzip error' running Beam on Dataflow
The files have no content-encoding set. They are no big query exports but rather crafted by a service of mine. Note that my doFunc gets called for each line of the file, something that I don't think would happen - wouldn't it apply gunzip to the whole content? On Fri, Oct 12, 2018, 5:04 PM Jose Ignacio Honrado wrote: > Hi Randal, > > You might be experiencing the automatic decompressive transcoding from > GCS. Take a look at this to see if it helps: > https://cloud.google.com/storage/docs/transcoding > > It seems like a compressed file is expected (as for the gz extension), but > the file is returned decompressed by GCS. > > Any change these files in GCS are exported from BigQuery? I started to > "suffer" a similar issue cause the exports from BQ tables to GCS started > setting new metadata (content-encoding: gzip, content-type: text/csv) to > the output files and, as consequence, GZIP files were automatically > decompressed when downloading them (as explained in the previous link). > > Best, > > > El vie., 12 oct. 2018 23:40, Randal Moore escribió: > >> Using Beam Java SDK 2.6. >> >> I have a batch pipeline that has run successfully in its current several >> times. Suddenly I am getting strange errors complaining about the format of >> the input. As far as I know, the pipeline didn't change at all since the >> last successful run. The error: >> java.util.zip.ZipException: Not in GZIP format - Trace: >> org.apache.beam.sdk.util.UserCodeException >> indicates that something somewhere thinks the line of text is supposed to >> be gzipped. I don't know what is setting that expectation nor what code is >> thinking that it is supposed to be gzipped. >> >> The pipeline uses TextIO to read from a Google Cloud Storage Bucket. The >> content of the bucket object is individual "text" lines (actually each line >> is JSON encoded). This error is in the first doFn following the TextIO - >> that converts each string to an value object. >> >> My log message in the exception handler shows the exact text for the >> string that I am expecting. I tried logging the callstack to see where the >> GZIP exception is thrown - turns out to be a bit hard to follow (with a >> bunch of dataflow classes called at the line in the processElement method >> that first uses the string). >> >> >>- Changing the lines to pure text, like "hello" and "world", gets to >>the JSON parser, which throws an error (since it isn't JSON any more). >>- If I base64 encode the lines, I [still] get the GZIP exception. >>- I was running an older version of Beam so I upgraded to 2.6. Didn't >>help >>- The bucket object uses *application/octet-encoding* >>- Tried changing the read from the bucket from the default to >>explicitly using uncompressed. >>TextIO.read.from(job.inputsPath).withCompression(Compression. >>UNCOMPRESSED) >> >> One other details is that most of the code is written in Scala even >> though it uses the Java SDK for Beam. >> >> Any help appreciated! >> rdm >> >> >>
Strange 'gzip error' running Beam on Dataflow
Using Beam Java SDK 2.6. I have a batch pipeline that has run successfully in its current several times. Suddenly I am getting strange errors complaining about the format of the input. As far as I know, the pipeline didn't change at all since the last successful run. The error: java.util.zip.ZipException: Not in GZIP format - Trace: org.apache.beam.sdk.util.UserCodeException indicates that something somewhere thinks the line of text is supposed to be gzipped. I don't know what is setting that expectation nor what code is thinking that it is supposed to be gzipped. The pipeline uses TextIO to read from a Google Cloud Storage Bucket. The content of the bucket object is individual "text" lines (actually each line is JSON encoded). This error is in the first doFn following the TextIO - that converts each string to an value object. My log message in the exception handler shows the exact text for the string that I am expecting. I tried logging the callstack to see where the GZIP exception is thrown - turns out to be a bit hard to follow (with a bunch of dataflow classes called at the line in the processElement method that first uses the string). - Changing the lines to pure text, like "hello" and "world", gets to the JSON parser, which throws an error (since it isn't JSON any more). - If I base64 encode the lines, I [still] get the GZIP exception. - I was running an older version of Beam so I upgraded to 2.6. Didn't help - The bucket object uses *application/octet-encoding* - Tried changing the read from the bucket from the default to explicitly using uncompressed. TextIO.read.from(job.inputsPath).withCompression(Compression.UNCOMPRESSED ) One other details is that most of the code is written in Scala even though it uses the Java SDK for Beam. Any help appreciated! rdm
Status type mismatch between different API calls
I'm using dataflow. Found what seems to me to be "usage problem" with the available APIs. When I submit a job to the dataflow runner, I get back a DataflowPipelineJob or its superclass, PipelineResult which provides me the status of the job - as an enumerated type. But if I use a DataflowClient to inquire about the status of my job public Job getJob(@Nonnull String jobId) throws IOException { I get back a string. Why are these different? I would think this should be the same data type. Short of that, is there something that can translate between them? A list of strings strings that dataflow client might return? I am wrapping dataflow jobs with a higher level service that would like to translate all dataflow client status to its own. Therefore the enum is much nicer than the string. Am I overlooking something? randy
Re: [VOTE] [DISCUSSION] Remove support for Java 7
+1 On Tue, Oct 17, 2017 at 5:21 PM Raghu Angadiwrote: > +1. > > On Tue, Oct 17, 2017 at 2:11 PM, David McNeill > wrote: > >> The final version of Beam that supports Java 7 should be clearly stated >> in the docs, so those stuck on old production infrastructure for other java >> app dependencies know where to stop upgrading. >> >> David McNeill >> 021 721 015 >> >> >> >> On 18 October 2017 at 05:16, Ismaël Mejía wrote: >> >>> We have discussed recently in the developer mailing list about the >>> idea of removing support for Java 7 on Beam. There are multiple >>> reasons for this: >>> >>> - Java 7 has not received public updates for almost two years and most >>> companies are moving / have already moved to Java 8. >>> - A good amount of the systems Beam users rely on have decided to drop >>> Java 7 support, e.g. Spark, Flink, Elasticsearch, even Hadoop plans to >>> do it on version 3. >>> - Most Big data distributions and Cloud managed Spark/Hadoop services >>> have already moved to Java 8. >>> - Recent versions of core libraries Beam uses are moving to be Java 8 >>> only (or mostly), e.g. Guava, Google Auto, etc. >>> - Java 8 has some nice features that can make Beam code nicer e.g. >>> lambdas, streams. >>> >>> Considering that Beam is a ‘recent’ project we expect users to be >>> already using Java 8. However we wanted first to ask the opinion of >>> the Beam users on this subject. It could be the case that some of the >>> users are still dealing with some old cluster running on Java 7 or >>> have another argument to keep the Java 7 compatibility. >>> >>> So, please vote: >>> +1 Yes, go ahead and move Beam support to Java 8. >>> 0 Do whatever you want. I don’t have a preference. >>> -1 Please keep Java 7 compatibility (if possible add your argument to >>> keep supporting for Java 7). >>> >> >> >
Strange errors running on DataFlow
I have a batch pipeline that runs well with small inputs but fails with a larger dataset. Looking at stackdriver I find a fair number of the following: Request failed with code 400, will NOT retry: https://dataflow.googleapis.com/v1b3/projects/cgs-nonprod/locations/us-central1/jobs/2017-08-03_13_06_11-1588537374036956973/workItems:reportStatus How do I investigate to learn more about the cause? Am I reading this correctly that it is the reason the pipeline failed? Is this perhaps the result of memory pressure? How would I monitor the running job to determine its memory needs? Is there a better place to query about what is likely a dataflow-centric question? Thanks in advance! rdm
Re: API to query the state of a running dataflow job?
Thanks. I will create a JIRA ticket to try to explain. I am planning a service running in kubernetes that will submit dataflow jobs. It will need to know the status of jobs [across service restarts]. Alternatives might be to do some sort of GBK at the end of the job and post the result to pub/sub. That seemed complex - my last step is currently a Datastore.write, which needed to be finished before claiming the job is done, and DataStoreIO is a "termination" right? On Sun, Jul 9, 2017 at 10:04 PM Kenneth Knowles <k...@google.com> wrote: > (Speaking for Java, but I think Python is similar) > > There's nothing in the Beam API right now for querying a job unless you > have a handle on the original object returned by the runner. The nature of > the result of run() is particular to a runner, though it is easy to imagine > a feature whereby you can "attach" to a known running job. > > So I think your best option is to use runner-specific APIs for now. For > Dataflow that would be the cloud APIs [1]. You can see how it is done by > the Beam wrapper DataflowPipelineJob [2] as a reference. > > Out of curiosity - what sort of third-party app? It would super if you > could file a JIRA [3] describing your use case with some more details, to > help gain visibility. > > Kenn > > [1] > https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/get > [2] > https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java#L441 > [3] https://issues.apache.org/jira/secure/CreateIssue!default.jspa > > On Sun, Jul 9, 2017 at 2:54 PM, Randal Moore <rdmoor...@gmail.com> wrote: > >> Is this part of the Beam API or something I should look at the google >> docs for help? Assume a job is running in dataflow - how can an interested >> third-party app query the status if it knows the job-id? >> >> rdm >> > >
API to query the state of a running dataflow job?
Is this part of the Beam API or something I should look at the google docs for help? Assume a job is running in dataflow - how can an interested third-party app query the status if it knows the job-id? rdm
Re: Providing HTTP client to DoFn
Based on my understanding so far, I'm targeting Dataflow with a batch pipeline. Just starting to experiment with the setup/teardown with the local runner - that might work fine. Somewhat intrigued with the side inputs, though. The pipeline might iterate over 1,000,000 tuples of two integers. The integers are indices into a database of data. A given integer will be repeated in the inputs many times. Am I prematurely optimizing to rule out expanding the tuples to the expanded data as each value might be expanded 100 or more times? As side inputs, it might expand to ~100GB. Expanding the input would be significantly bigger. #1 how does Dataflow schedule the pipeline with a map side input - does it wait until the whole map is collected? #2 can the DoFn specify that it depends on only specific keys of the side input map? does that affect the scheduling of the DoFn? Thanks for any pointers... rdm On Wed, Jul 5, 2017 at 4:58 PM Lukasz Cwik <lc...@google.com> wrote: > That should have said: > ~100s MiBs per window in streaming pipelines > > On Wed, Jul 5, 2017 at 2:58 PM, Lukasz Cwik <lc...@google.com> wrote: > >> #1, side inputs supported sizes and performance are specific to a runner. >> For example, I know that Dataflow supports side inputs which are 1+ TiB >> (aggregate) in batch pipelines and ~100s MiBs per window because there have >> been several one off benchmarks/runs. What kinds of sizes/use case do you >> want to support, some runners will do a much better job with really small >> side inputs while others will be better with really large side inputs? >> >> #2, this depends on which library your using to perform the REST calls >> and whether it is thread safe. DoFns can be shared across multiple bundles >> and can contain methods marked with @Setup/@Teardown which only get invoked >> once per DoFn instance (which is relatively infrequently) and you could >> store an instance per DoFn instead of a singleton if the REST library was >> not thread safe. >> >> On Wed, Jul 5, 2017 at 2:45 PM, Randal Moore <rdmoor...@gmail.com> wrote: >> >>> I have a step in my beam pipeline that needs some data from a rest >>> service. The data acquired from the rest service is dependent on the >>> context of the data being processed and relatively large. The rest client I >>> am using isn't serializable - nor is it likely possible to make it so >>> (background threads, etc.). >>> >>> #1 What are the practical limits to the size of side inputs (e.g., I >>> could try to gather all the data from the rest service and provide it as a >>> side-input)? >>> >>> #2 Assuming that using the rest client is the better option, would a >>> singleton instance be safe way to instantiate the rest client? >>> >>> Thanks, >>> rdm >>> >> >> >
Providing HTTP client to DoFn
I have a step in my beam pipeline that needs some data from a rest service. The data acquired from the rest service is dependent on the context of the data being processed and relatively large. The rest client I am using isn't serializable - nor is it likely possible to make it so (background threads, etc.). #1 What are the practical limits to the size of side inputs (e.g., I could try to gather all the data from the rest service and provide it as a side-input)? #2 Assuming that using the rest client is the better option, would a singleton instance be safe way to instantiate the rest client? Thanks, rdm
Is this a valid usecase for Apache Beam and Google dataflow
Just starting looking at Beam this week as a candidate for executing some fairly CPU intensive work. I am curious if the stream-oriented features of Beam are a match for my usecase. My user will submit a large number of computations to the system (as a "job"). Those computations can be expressed in a series of DoFn whose results can be stored (in for example google datastore). My initial idea was to post the individual messages (anywhere from 1000 to 1,000,000 per job) to a google pub/sub topic that is watched by a google dataflow job. I can write a prototype that performs the appropriate transformation and posts the results. But... I cannot find any way to capture the notion of the completion of the original "user job". I want to post to pub/sub that all individual calculations are complete. I was hoping that I could write a CombineFn that would be able to post a message as each job finishes but Combine needs a window and I don't see how to define it. Is there a way to define a window that is defined by the user's job - I know exactly which individual computations are part of the user's job (and exactly how many). But all the grouping that I've discovered so far seems to be well defined at compile time (e.g., number of messages in the window, or number of partitions). Is this the wrong usecase for dataflow/beam? Is there a better way to express this problem? Thanks, rdm