using FileIO to read a single input file

2021-12-13 Thread Randal Moore
I have some side inputs that I would like to add to my pipeline. Some of
them are based on a file pattern, so I found that I can collect the
contents of those files using a pattern like the following:

val genotypes =
p.apply(FileIO.`match`.filepattern(opts.getGenotypesFilePattern()))
  .apply(FileIO.readMatches)
  .apply("ReadGenotypesFile", ParDo.of(new ReadFileAsBytes()))
  .apply("UnmarshalGenotypes", ParDo.of(new UnmarshalGenotypesDoFn()))
  .apply("GenotypesAsMap", Combine.globally[Genotypes,
ibd.GenotypesMap](new CombineGenotypesFn))
  .apply("ViewAsGeneticMap", View.asSingleton[ibd.GenotypesMap])

(the code snippet is Scala...)

I have another input - just a single file containing some protobuf. How do
I construct a single FileIO.ReadableFile rather than using the "match"?
Trying to avoid CombineGlobally  - I assume that would be more correct to
let Beam know the expected data and perhaps more performant.

Thanks in advance,
rdm


Re: Strange 'gzip error' running Beam on Dataflow

2018-10-12 Thread Randal Moore
The files have no content-encoding set. They are no big query exports but
rather crafted by a service of mine.

Note that my doFunc gets called for each line of the file, something that I
don't think would happen - wouldn't it apply gunzip to the whole content?

On Fri, Oct 12, 2018, 5:04 PM Jose Ignacio Honrado 
wrote:

> Hi Randal,
>
> You might be experiencing the automatic decompressive transcoding from
> GCS. Take a look at this to see if it helps:
> https://cloud.google.com/storage/docs/transcoding
>
> It seems like a compressed file is expected (as for the gz extension), but
> the file is returned decompressed by GCS.
>
> Any change these files in GCS are exported from BigQuery? I started to
> "suffer" a similar issue cause the exports from BQ tables to GCS started
> setting new metadata (content-encoding: gzip, content-type: text/csv) to
> the output files and, as consequence, GZIP files were automatically
> decompressed when downloading them (as explained in the previous link).
>
> Best,
>
>
> El vie., 12 oct. 2018 23:40, Randal Moore  escribió:
>
>> Using Beam Java SDK 2.6.
>>
>> I have a batch pipeline that has run successfully in its current several
>> times. Suddenly I am getting strange errors complaining about the format of
>> the input. As far as I know, the pipeline didn't change at all since the
>> last successful run. The error:
>> java.util.zip.ZipException: Not in GZIP format - Trace:
>> org.apache.beam.sdk.util.UserCodeException
>> indicates that something somewhere thinks the line of text is supposed to
>> be gzipped. I don't know what is setting that expectation nor what code is
>> thinking that it is supposed to be gzipped.
>>
>> The pipeline uses TextIO to read from a Google Cloud Storage Bucket. The
>> content of the bucket object is individual "text" lines (actually each line
>> is JSON encoded). This error is in the first doFn following the TextIO -
>> that  converts each string to an value object.
>>
>> My log message in the exception handler shows the exact text for the
>> string that I am expecting. I tried logging the callstack to see where the
>> GZIP exception is thrown - turns out to be a bit hard to follow (with a
>> bunch of dataflow classes called at the line in the processElement method
>> that first uses the string).
>>
>>
>>- Changing the lines to pure text, like "hello" and "world", gets to
>>the JSON parser, which throws an error (since it isn't JSON any more).
>>- If I base64 encode the lines, I [still] get the GZIP exception.
>>- I was running an older version of Beam so I upgraded to 2.6. Didn't
>>help
>>- The bucket object uses *application/octet-encoding*
>>- Tried changing the read from the bucket from the default to
>>explicitly using uncompressed.
>>TextIO.read.from(job.inputsPath).withCompression(Compression.
>>UNCOMPRESSED)
>>
>> One other details is that most of the code is written in Scala even
>> though it uses the Java SDK for Beam.
>>
>> Any help appreciated!
>> rdm
>>
>>
>>


Strange 'gzip error' running Beam on Dataflow

2018-10-12 Thread Randal Moore
Using Beam Java SDK 2.6.

I have a batch pipeline that has run successfully in its current several
times. Suddenly I am getting strange errors complaining about the format of
the input. As far as I know, the pipeline didn't change at all since the
last successful run. The error:
java.util.zip.ZipException: Not in GZIP format - Trace:
org.apache.beam.sdk.util.UserCodeException
indicates that something somewhere thinks the line of text is supposed to
be gzipped. I don't know what is setting that expectation nor what code is
thinking that it is supposed to be gzipped.

The pipeline uses TextIO to read from a Google Cloud Storage Bucket. The
content of the bucket object is individual "text" lines (actually each line
is JSON encoded). This error is in the first doFn following the TextIO -
that  converts each string to an value object.

My log message in the exception handler shows the exact text for the string
that I am expecting. I tried logging the callstack to see where the GZIP
exception is thrown - turns out to be a bit hard to follow (with a bunch of
dataflow classes called at the line in the processElement method that first
uses the string).


   - Changing the lines to pure text, like "hello" and "world", gets to the
   JSON parser, which throws an error (since it isn't JSON any more).
   - If I base64 encode the lines, I [still] get the GZIP exception.
   - I was running an older version of Beam so I upgraded to 2.6. Didn't
   help
   - The bucket object uses *application/octet-encoding*
   - Tried changing the read from the bucket from the default to explicitly
   using uncompressed.
   TextIO.read.from(job.inputsPath).withCompression(Compression.UNCOMPRESSED
   )

One other details is that most of the code is written in Scala even though
it uses the Java SDK for Beam.

Any help appreciated!
rdm


Status type mismatch between different API calls

2018-02-02 Thread Randal Moore
I'm using dataflow. Found what seems to me to be "usage problem" with the
available APIs.
When I submit a job to the dataflow runner, I get back a
DataflowPipelineJob or its superclass, PipelineResult which provides me the
status of the job - as an enumerated type. But if I use a DataflowClient to
inquire about the status of my job
  public Job getJob(@Nonnull String jobId) throws IOException {
I get back a string. Why are these different? I would think this should be
the same data type. Short of that, is there something that can translate
between them? A list of strings strings that dataflow client might return?

I am wrapping dataflow jobs with a higher level service that would like to
translate all dataflow client status to its own. Therefore the enum is much
nicer than the string.

Am I overlooking something?
randy


Re: [VOTE] [DISCUSSION] Remove support for Java 7

2017-10-17 Thread Randal Moore
+1

On Tue, Oct 17, 2017 at 5:21 PM Raghu Angadi  wrote:

> +1.
>
> On Tue, Oct 17, 2017 at 2:11 PM, David McNeill 
> wrote:
>
>> The final version of Beam that supports Java 7 should be clearly stated
>> in the docs, so those stuck on old production infrastructure for other java
>> app dependencies know where to stop upgrading.
>>
>> David McNeill
>> 021 721 015
>>
>>
>>
>> On 18 October 2017 at 05:16, Ismaël Mejía  wrote:
>>
>>> We have discussed recently in the developer mailing list about the
>>> idea of removing support for Java 7 on Beam. There are multiple
>>> reasons for this:
>>>
>>> - Java 7 has not received public updates for almost two years and most
>>> companies are moving / have already moved to Java 8.
>>> - A good amount of the systems Beam users rely on have decided to drop
>>> Java 7 support, e.g. Spark, Flink, Elasticsearch, even Hadoop plans to
>>> do it on version 3.
>>> - Most Big data distributions and Cloud managed Spark/Hadoop services
>>> have already moved to Java 8.
>>> - Recent versions of core libraries Beam uses are moving to be Java 8
>>> only (or mostly), e.g. Guava, Google Auto, etc.
>>> - Java 8 has some nice features that can make Beam code nicer e.g.
>>> lambdas, streams.
>>>
>>> Considering that Beam is a ‘recent’ project we expect users to be
>>> already using Java 8. However we wanted first to ask the opinion of
>>> the Beam users on this subject. It could be the case that some of the
>>> users are still dealing with some old cluster running on Java 7 or
>>> have another argument to keep the Java 7 compatibility.
>>>
>>> So, please vote:
>>> +1 Yes, go ahead and move Beam support to Java 8.
>>>  0 Do whatever you want. I don’t have a preference.
>>> -1 Please keep Java 7 compatibility (if possible add your argument to
>>> keep supporting for Java 7).
>>>
>>
>>
>


Strange errors running on DataFlow

2017-08-03 Thread Randal Moore
I have a batch pipeline that runs well with small inputs but fails with a
larger dataset.
Looking at stackdriver I find a fair number of the following:

Request failed with code 400, will NOT retry:
https://dataflow.googleapis.com/v1b3/projects/cgs-nonprod/locations/us-central1/jobs/2017-08-03_13_06_11-1588537374036956973/workItems:reportStatus

How do I investigate to learn more about the cause?
Am I reading this correctly that it is the reason the pipeline failed?
Is this perhaps the result of memory pressure?
How would I monitor the running job to determine its memory needs?
Is there a better place to query about what is likely a dataflow-centric
question?

Thanks in advance!
rdm


Re: API to query the state of a running dataflow job?

2017-07-10 Thread Randal Moore
Thanks.  I will create a JIRA ticket to try to explain. I am planning a
service running in kubernetes that will submit dataflow jobs.  It will need
to know the status of jobs [across service restarts]. Alternatives might be
to do some sort of GBK at the end of the job and post the result to
pub/sub.  That seemed complex - my last step is currently a
Datastore.write, which needed to be finished before claiming the job is
done, and DataStoreIO is a "termination" right?



On Sun, Jul 9, 2017 at 10:04 PM Kenneth Knowles <k...@google.com> wrote:

> (Speaking for Java, but I think Python is similar)
>
> There's nothing in the Beam API right now for querying a job unless you
> have a handle on the original object returned by the runner. The nature of
> the result of run() is particular to a runner, though it is easy to imagine
> a feature whereby you can "attach" to a known running job.
>
> So I think your best option is to use runner-specific APIs for now. For
> Dataflow that would be the cloud APIs [1]. You can see how it is done by
> the Beam wrapper DataflowPipelineJob [2] as a reference.
>
> Out of curiosity - what sort of third-party app? It would super if you
> could file a JIRA [3] describing your use case with some more details, to
> help gain visibility.
>
> Kenn
>
> [1]
> https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs/get
> [2]
> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java#L441
> [3] https://issues.apache.org/jira/secure/CreateIssue!default.jspa
>
> On Sun, Jul 9, 2017 at 2:54 PM, Randal Moore <rdmoor...@gmail.com> wrote:
>
>> Is this part of the Beam API or something I should look at the google
>> docs for help?  Assume a job is running in dataflow - how can an interested
>> third-party app query the status if it knows the job-id?
>>
>> rdm
>>
>
>


API to query the state of a running dataflow job?

2017-07-09 Thread Randal Moore
Is this part of the Beam API or something I should look at the google docs
for help?  Assume a job is running in dataflow - how can an interested
third-party app query the status if it knows the job-id?

rdm


Re: Providing HTTP client to DoFn

2017-07-05 Thread Randal Moore
Based on my understanding so far, I'm targeting Dataflow with a batch
pipeline. Just starting to experiment with the setup/teardown with the
local runner - that might work fine.

Somewhat intrigued with the side inputs, though.  The pipeline might
iterate over 1,000,000 tuples of two integers.  The integers are indices
into a database of data. A given integer will be repeated in the inputs
many times.  Am I prematurely optimizing to rule out expanding the tuples
to the expanded data as each value might be expanded 100 or more times? As
side inputs, it might expand to ~100GB.  Expanding the input would be
significantly bigger.

#1 how does Dataflow schedule the pipeline with a map side input - does it
wait until the whole map is collected?
#2 can the DoFn specify that it depends on only specific keys of the side
input map?  does that affect the scheduling of the DoFn?

Thanks for any pointers...
rdm

On Wed, Jul 5, 2017 at 4:58 PM Lukasz Cwik <lc...@google.com> wrote:

> That should have said:
> ~100s MiBs per window in streaming pipelines
>
> On Wed, Jul 5, 2017 at 2:58 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> #1, side inputs supported sizes and performance are specific to a runner.
>> For example, I know that Dataflow supports side inputs which are 1+ TiB
>> (aggregate) in batch pipelines and ~100s MiBs per window because there have
>> been several one off benchmarks/runs. What kinds of sizes/use case do you
>> want to support, some runners will do a much better job with really small
>> side inputs while others will be better with really large side inputs?
>>
>> #2, this depends on which library your using to perform the REST calls
>> and whether it is thread safe. DoFns can be shared across multiple bundles
>> and can contain methods marked with @Setup/@Teardown which only get invoked
>> once per DoFn instance (which is relatively infrequently) and you could
>> store an instance per DoFn instead of a singleton if the REST library was
>> not thread safe.
>>
>> On Wed, Jul 5, 2017 at 2:45 PM, Randal Moore <rdmoor...@gmail.com> wrote:
>>
>>> I have a step in my beam pipeline that needs some data from a rest
>>> service. The data acquired from the rest service is dependent on the
>>> context of the data being processed and relatively large. The rest client I
>>> am using isn't serializable - nor is it likely possible to make it so
>>> (background threads, etc.).
>>>
>>> #1 What are the practical limits to the size of side inputs (e.g., I
>>> could try to gather all the data from the rest service and provide it as a
>>> side-input)?
>>>
>>> #2 Assuming that using the rest client is the better option, would a
>>> singleton instance be safe way to instantiate the rest client?
>>>
>>> Thanks,
>>> rdm
>>>
>>
>>
>


Providing HTTP client to DoFn

2017-07-05 Thread Randal Moore
I have a step in my beam pipeline that needs some data from a rest service.
The data acquired from the rest service is dependent on the context of the
data being processed and relatively large. The rest client I am using isn't
serializable - nor is it likely possible to make it so (background threads,
etc.).

#1 What are the practical limits to the size of side inputs (e.g., I could
try to gather all the data from the rest service and provide it as a
side-input)?

#2 Assuming that using the rest client is the better option, would a
singleton instance be safe way to instantiate the rest client?

Thanks,
rdm


Is this a valid usecase for Apache Beam and Google dataflow

2017-06-20 Thread Randal Moore
Just starting looking at Beam this week as a candidate for executing some
fairly CPU intensive work.  I am curious if the stream-oriented features of
Beam are a match for my usecase. My user will submit a large number of
computations to the system (as a "job").  Those computations can be
expressed in a series of DoFn whose results can be stored (in for example
google datastore).

My initial idea was to post the individual messages (anywhere from 1000 to
1,000,000 per job) to a google pub/sub topic that is watched by a google
dataflow job. I can write a prototype that performs the appropriate
transformation and posts the results.

But...

I cannot find any way to capture the notion of the completion of the
original "user job".  I want to post to pub/sub that all individual
calculations are complete. I was hoping that I could write a CombineFn that
would be able to post a message as each job finishes but Combine needs a
window and I don't see how to define it.

Is there a way to define a window that is defined by the user's job - I
know exactly which individual computations are part of the user's job (and
exactly how many).  But all the grouping that I've discovered so far seems
to be well defined at compile time (e.g., number of messages in the window,
or number of partitions).

Is this the wrong usecase for dataflow/beam?  Is there a better way to
express this problem?

Thanks,
rdm