Re: Strange 'gzip error' running Beam on Dataflow

2018-10-12 Thread Randal Moore
The files have no content-encoding set. They are no big query exports but
rather crafted by a service of mine.

Note that my doFunc gets called for each line of the file, something that I
don't think would happen - wouldn't it apply gunzip to the whole content?

On Fri, Oct 12, 2018, 5:04 PM Jose Ignacio Honrado 
wrote:

> Hi Randal,
>
> You might be experiencing the automatic decompressive transcoding from
> GCS. Take a look at this to see if it helps:
> https://cloud.google.com/storage/docs/transcoding
>
> It seems like a compressed file is expected (as for the gz extension), but
> the file is returned decompressed by GCS.
>
> Any change these files in GCS are exported from BigQuery? I started to
> "suffer" a similar issue cause the exports from BQ tables to GCS started
> setting new metadata (content-encoding: gzip, content-type: text/csv) to
> the output files and, as consequence, GZIP files were automatically
> decompressed when downloading them (as explained in the previous link).
>
> Best,
>
>
> El vie., 12 oct. 2018 23:40, Randal Moore  escribió:
>
>> Using Beam Java SDK 2.6.
>>
>> I have a batch pipeline that has run successfully in its current several
>> times. Suddenly I am getting strange errors complaining about the format of
>> the input. As far as I know, the pipeline didn't change at all since the
>> last successful run. The error:
>> java.util.zip.ZipException: Not in GZIP format - Trace:
>> org.apache.beam.sdk.util.UserCodeException
>> indicates that something somewhere thinks the line of text is supposed to
>> be gzipped. I don't know what is setting that expectation nor what code is
>> thinking that it is supposed to be gzipped.
>>
>> The pipeline uses TextIO to read from a Google Cloud Storage Bucket. The
>> content of the bucket object is individual "text" lines (actually each line
>> is JSON encoded). This error is in the first doFn following the TextIO -
>> that  converts each string to an value object.
>>
>> My log message in the exception handler shows the exact text for the
>> string that I am expecting. I tried logging the callstack to see where the
>> GZIP exception is thrown - turns out to be a bit hard to follow (with a
>> bunch of dataflow classes called at the line in the processElement method
>> that first uses the string).
>>
>>
>>- Changing the lines to pure text, like "hello" and "world", gets to
>>the JSON parser, which throws an error (since it isn't JSON any more).
>>- If I base64 encode the lines, I [still] get the GZIP exception.
>>- I was running an older version of Beam so I upgraded to 2.6. Didn't
>>help
>>- The bucket object uses *application/octet-encoding*
>>- Tried changing the read from the bucket from the default to
>>explicitly using uncompressed.
>>TextIO.read.from(job.inputsPath).withCompression(Compression.
>>UNCOMPRESSED)
>>
>> One other details is that most of the code is written in Scala even
>> though it uses the Java SDK for Beam.
>>
>> Any help appreciated!
>> rdm
>>
>>
>>


Re: Strange 'gzip error' running Beam on Dataflow

2018-10-12 Thread Jose Ignacio Honrado
Hi Randal,

You might be experiencing the automatic decompressive transcoding from GCS.
Take a look at this to see if it helps:
https://cloud.google.com/storage/docs/transcoding

It seems like a compressed file is expected (as for the gz extension), but
the file is returned decompressed by GCS.

Any change these files in GCS are exported from BigQuery? I started to
"suffer" a similar issue cause the exports from BQ tables to GCS started
setting new metadata (content-encoding: gzip, content-type: text/csv) to
the output files and, as consequence, GZIP files were automatically
decompressed when downloading them (as explained in the previous link).

Best,

El vie., 12 oct. 2018 23:40, Randal Moore  escribió:

> Using Beam Java SDK 2.6.
>
> I have a batch pipeline that has run successfully in its current several
> times. Suddenly I am getting strange errors complaining about the format of
> the input. As far as I know, the pipeline didn't change at all since the
> last successful run. The error:
> java.util.zip.ZipException: Not in GZIP format - Trace:
> org.apache.beam.sdk.util.UserCodeException
> indicates that something somewhere thinks the line of text is supposed to
> be gzipped. I don't know what is setting that expectation nor what code is
> thinking that it is supposed to be gzipped.
>
> The pipeline uses TextIO to read from a Google Cloud Storage Bucket. The
> content of the bucket object is individual "text" lines (actually each line
> is JSON encoded). This error is in the first doFn following the TextIO -
> that  converts each string to an value object.
>
> My log message in the exception handler shows the exact text for the
> string that I am expecting. I tried logging the callstack to see where the
> GZIP exception is thrown - turns out to be a bit hard to follow (with a
> bunch of dataflow classes called at the line in the processElement method
> that first uses the string).
>
>
>- Changing the lines to pure text, like "hello" and "world", gets to
>the JSON parser, which throws an error (since it isn't JSON any more).
>- If I base64 encode the lines, I [still] get the GZIP exception.
>- I was running an older version of Beam so I upgraded to 2.6. Didn't
>help
>- The bucket object uses *application/octet-encoding*
>- Tried changing the read from the bucket from the default to
>explicitly using uncompressed.
>TextIO.read.from(job.inputsPath).withCompression(Compression.
>UNCOMPRESSED)
>
> One other details is that most of the code is written in Scala even though
> it uses the Java SDK for Beam.
>
> Any help appreciated!
> rdm
>
>
>


Strange 'gzip error' running Beam on Dataflow

2018-10-12 Thread Randal Moore
Using Beam Java SDK 2.6.

I have a batch pipeline that has run successfully in its current several
times. Suddenly I am getting strange errors complaining about the format of
the input. As far as I know, the pipeline didn't change at all since the
last successful run. The error:
java.util.zip.ZipException: Not in GZIP format - Trace:
org.apache.beam.sdk.util.UserCodeException
indicates that something somewhere thinks the line of text is supposed to
be gzipped. I don't know what is setting that expectation nor what code is
thinking that it is supposed to be gzipped.

The pipeline uses TextIO to read from a Google Cloud Storage Bucket. The
content of the bucket object is individual "text" lines (actually each line
is JSON encoded). This error is in the first doFn following the TextIO -
that  converts each string to an value object.

My log message in the exception handler shows the exact text for the string
that I am expecting. I tried logging the callstack to see where the GZIP
exception is thrown - turns out to be a bit hard to follow (with a bunch of
dataflow classes called at the line in the processElement method that first
uses the string).


   - Changing the lines to pure text, like "hello" and "world", gets to the
   JSON parser, which throws an error (since it isn't JSON any more).
   - If I base64 encode the lines, I [still] get the GZIP exception.
   - I was running an older version of Beam so I upgraded to 2.6. Didn't
   help
   - The bucket object uses *application/octet-encoding*
   - Tried changing the read from the bucket from the default to explicitly
   using uncompressed.
   TextIO.read.from(job.inputsPath).withCompression(Compression.UNCOMPRESSED
   )

One other details is that most of the code is written in Scala even though
it uses the Java SDK for Beam.

Any help appreciated!
rdm


Re: Spark storageLevel not taking effect

2018-10-12 Thread Juan Carlos Garcia
Hi Mike,

>From the documentation on
https://beam.apache.org/documentation/runners/spark/#pipeline-options-for-the-spark-runner

storageLevel The StorageLevel to use when caching RDDs in batch pipelines.
The Spark Runner automatically caches RDDs that are evaluated repeatedly.
This is a batch-only property as streaming pipelines in Beam are stateful,
which requires Spark DStream's StorageLevel to be MEMORY_ONLY. MEMORY_ONLY
So i think you are out of luck here.


On Thu, Oct 11, 2018 at 10:05 PM Mike Kaplinskiy 
wrote:

> Hey folks,
>
> Admittedly I may be a bit on the bleeding edge here, but I'm attempting to
> run a Beam pipeline on Spark which is running on top of Kubernetes.
> Specifically Beam 2.6.0 with Spark 2.4.0-rc2 running in client mode with a
> Kubernetes (1.11) driver. It's actually pretty cool - from a Kubernetes
> perspective, I start a pod which starts a ton of workers to do the parallel
> stuff and then cleans up after itself.
>
> One thing I can't seem to get working is setting the storage level for
> Spark RDDs via Beam. Specifically passing --storageLevel=MEMORY_AND_DISK
> seems to not work - the rdd still shows up as "Memory Deserialized 1x
> Replicated" in the Spark UI. I would expect it to be something closer to
> "Disk Memory Deserialized 1x Replicated." It *seems* to be serialized only
> in the sense that less memory is used (I assume it gets encoded).
>
> I even tried hardcoding storageLevel in BoundedDataset.java (based on the
> line number in the DAG viz). Unfortunately it still shows up as memory-only.
>
> Am I missing something that would let me spill data to disk?
>
> For reference, here's my exact command line:
> /opt/spark/bin/spark-submit
> --master 'k8s://https://kubernetes:443'
> --deploy-mode client
> --name $(MY_POD_NAME)
> --conf spark.executor.instances=20
> --conf spark.driver.host=$(MY_POD_IP)
> --conf spark.driver.port=7077
> --conf spark.kubernetes.container.image=$(MY_IMAGE)
> --conf spark.kubernetes.driver.pod.name=$(MY_POD_NAME)
> --conf spark.kubernetes.executor.podNamePrefix=$(MY_POD_NAME)
> --conf spark.executor.memory=5500m
> --conf spark.executor.memoryOverhead=1300m
> --conf spark.memory.fraction=0.45
> --conf spark.executor.cores=3
> --conf spark.kubernetes.executor.limit.cores=3
> --conf spark.default.parallelism=60
> --conf spark.kubernetes.allocation.batch.size=20
> --conf spark.kubernetes.driver.label.app=beam-datomic-smoketest
> --conf spark.kubernetes.node.selector.node.ladderlife.com/group=etl
> --conf
> spark.kubernetes.executor.annotation.iam.amazonaws.com/role=etl-role
> --conf spark.kubernetes.executor.secrets.google-cloud=/google-cloud-secrets
> --conf spark.kubernetes.executor.secretKeyRef.SENTRY_DSN=sentry-secrets:dsn
> --conf spark.executorEnv.STATSD_HOST=169.254.168.253
> --class ladder.my_beam_job
> local:///srv/beam_job.jar
> --runner=SparkRunner
> --storageLevel=MEMORY_AND_DISK
>
> Thanks,
> Mike.
>
> Ladder . The smart, modern way to insure your life.
>


-- 

JC


Re: ElasticIO retry configuration exception

2018-10-12 Thread Tim
Great! Thank you.

Feel free to add me as reviewer if you open a PR.

Tim

> On 12 Oct 2018, at 08:28, Wout Scheepers  
> wrote:
> 
> Hey Tim, Romain,
>  
> I created the ticket (BEAM-5725. I’ll try to fix it, as it’s time I made my 
> first PR.
> First will focus on getting a reproducible in a unit test.
> 
> Thanks!
> Wout
> 
> 
>  
> From: Tim Robertson 
> Reply-To: "user@beam.apache.org" 
> Date: Thursday, 11 October 2018 at 20:25
> To: "user@beam.apache.org" 
> Subject: Re: ElasticIO retry configuration exception
>  
> I took a super quick look at the code and I think Romain is correct.
>  
> 1. On a retry scenario it calls handleRetry()
> 2. Within handleRetry() it gets the DefaultRetryPredicate and calls 
> test(response) - this reads the response stream to JSON
> 3. When the retry is successful (no 429 code) the response is returned
> 4. The response is then passed in to checkForErrors(...)
> 5. This then tried to parse the response by reading the response stream. It 
> was already read in step 2 
>  
> Can you please open a Jira for this Wout? 
> https://issues.apache.org/jira/projects/BEAM/issues
> If you don't have an account I'll create it.
>  
> This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so 
> before released in 2.9.0. 
> However as soon as it is fixed it is fairly easy to bring into your own 
> project, by copying in the single ElasticsearchIO.java declared in the same 
> package.
>  
> Thank you for reporting the issue,
> Tim
>  
>  
>  
>  
> On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau  
> wrote:
> It looks more like a client issue where the stream is already read, maybe 
> give a try to reproduce it in a unit test in beam ES module? This will enable 
> us to help you more accurately.
> 
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>  
>  
> Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers 
>  a écrit :
> Hey Romain,
>  
> I’ve check and am using the same http client as beam 2.7.0.
> Just to be sure, I’ve created a minimal reproducible with a fresh project 
> with only the following dependencies in my build.gradle:
> dependencies {
> compile ('org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0')
> compile ('org.apache.beam:beam-runners-direct-java:2.7.0')
> compile ('org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0')
> compile 
> ('org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-io-common:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0')
> compile ('org.apache.beam:beam-sdks-java-io-jdbc:2.7.0')
> 
> 
> testCompile 'org.hamcrest:hamcrest-all:1.3'
> testCompile 'org.assertj:assertj-core:3.4.1'
> testCompile 'junit:junit:4.12'
> }
>  
> However, the problem still persists when writing a document to elastic with 
> the retryConfiguration set.
> I guess the problem lies at my elastic version, as JB implies?
>  
> Anyway, thanks for the suggestion.
>  
> Wout
>  
> From: Romain Manni-Bucau 
> Reply-To: "user@beam.apache.org" 
> Date: Wednesday, 10 October 2018 at 16:53
> To: "user@beam.apache.org" 
> Subject: Re: ElasticIO retry configuration exception
>  
> Hi Wout,
>  
> Maye check your classpath http client versions (against 
> https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
>  for instance).
> 
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>  
>  
> Le mer. 10 oct. 2018 à 15:37, Wout Scheepers 
>  a écrit :
> Hey JB,
> 
> Thanks for your fast reply.
> The elastic version we're using is 5.6.2.
> 
> "version": {
> "number": "5.6.2",
> "build_hash": "57e20f3",
> "build_date": "2017-09-23T13:16:45.703Z",
> "build_snapshot": false,
> "lucene_version": "6.6.1"
> }
> 
> 
> Wout
> 
> 
> 
> On 10/10/2018, 15:34, "Jean-Baptiste Onofré"  wrote:
> 
> Hi Wout,
> 
> what's the elasticsearch version ? (just to try to reproduce)
> 
> Thanks,
> Regards
> JB
> 
> On 10/10/2018 15:31, Wout Scheepers wrote:
> > Hey all,
> > 
> >  
> > 
> > When using .withRetryConfiguration()for ElasticsearchIO, I get the
> > following stacktrace:
> > 
> >  
> > 
> > Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
> > No content to map due to end-of-input
> > 
> > at [Source: (org.apache.http.nio.entity.ContentInputStream); line: 1,
> > column: 0]
> > 
> >at
> > 
> com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
> > 
> >at
> > 
> com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4133)
> > 
> >at
> > 

Re: ElasticIO retry configuration exception

2018-10-12 Thread Wout Scheepers
Hey Tim, Romain,

I created the ticket (BEAM-5725. I’ll try to fix it, as it’s time I made my 
first PR.
First will focus on getting a reproducible in a unit test.

Thanks!
Wout



From: Tim Robertson 
Reply-To: "user@beam.apache.org" 
Date: Thursday, 11 October 2018 at 20:25
To: "user@beam.apache.org" 
Subject: Re: ElasticIO retry configuration exception

I took a super quick look at the code and I think Romain is correct.

1. On a retry scenario it calls handleRetry()
2. Within handleRetry() it gets the DefaultRetryPredicate and calls 
test(response) - this reads the response stream to JSON
3. When the retry is successful (no 429 code) the response is returned
4. The response is then passed in to checkForErrors(...)
5. This then tried to parse the response by reading the response stream. It was 
already read in step 2

Can you please open a Jira for this Wout? 
https://issues.apache.org/jira/projects/BEAM/issues
If you don't have an account I'll create it.

This will not make 2.8.0 (just missed) so it will likely be 7 weeks or so 
before released in 2.9.0.
However as soon as it is fixed it is fairly easy to bring into your own 
project, by copying in the single ElasticsearchIO.java declared in the same 
package.

Thank you for reporting the issue,
Tim




On Thu, Oct 11, 2018 at 4:19 PM Romain Manni-Bucau 
mailto:rmannibu...@gmail.com>> wrote:
It looks more like a client issue where the stream is already read, maybe give 
a try to reproduce it in a unit test in beam ES module? This will enable us to 
help you more accurately.

Romain Manni-Bucau
@rmannibucau |  
Blog | Old 
Blog | Github 
| LinkedIn | 
Book


Le jeu. 11 oct. 2018 à 16:18, Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
a écrit :
Hey Romain,

I’ve check and am using the same http client as beam 2.7.0.

Just to be sure, I’ve created a minimal reproducible with a fresh project with 
only the following dependencies in my build.gradle:
dependencies {
compile ('org.apache.beam:beam-sdks-java-io-elasticsearch:2.7.0')
compile ('org.apache.beam:beam-runners-direct-java:2.7.0')
compile ('org.apache.beam:beam-runners-google-cloud-dataflow-java:2.7.0')
compile ('org.apache.beam:beam-sdks-java-extensions-protobuf:2.7.0')
compile 
('org.apache.beam:beam-sdks-java-extensions-google-cloud-platform-core:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-google-cloud-platform:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-common:2.7.0')
compile ('org.apache.beam:beam-sdks-java-extensions-json-jackson:2.7.0')
compile ('org.apache.beam:beam-sdks-java-io-jdbc:2.7.0')


testCompile 'org.hamcrest:hamcrest-all:1.3'
testCompile 'org.assertj:assertj-core:3.4.1'
testCompile 'junit:junit:4.12'
}


However, the problem still persists when writing a document to elastic with the 
retryConfiguration set.
I guess the problem lies at my elastic version, as JB implies?

Anyway, thanks for the suggestion.

Wout

From: Romain Manni-Bucau mailto:rmannibu...@gmail.com>>
Reply-To: "user@beam.apache.org" 
mailto:user@beam.apache.org>>
Date: Wednesday, 10 October 2018 at 16:53
To: "user@beam.apache.org" 
mailto:user@beam.apache.org>>
Subject: Re: ElasticIO retry configuration exception

Hi Wout,

Maye check your classpath http client versions (against 
https://github.com/apache/beam/blob/v2.7.0/sdks/java/io/elasticsearch/build.gradle
 for instance).

Romain Manni-Bucau
@rmannibucau |  
Blog | Old 
Blog | Github 
| LinkedIn | 
Book


Le mer. 10 oct. 2018 à 15:37, Wout Scheepers 
mailto:wout.scheep...@vente-exclusive.com>> 
a écrit :
Hey JB,

Thanks for your fast reply.
The elastic version we're using is 5.6.2.

"version": {
"number": "5.6.2",
"build_hash": "57e20f3",
"build_date": "2017-09-23T13:16:45.703Z",
"build_snapshot": false,
"lucene_version": "6.6.1"
}


Wout



On 10/10/2018, 15:34, "Jean-Baptiste Onofré" 
mailto:j...@nanthrax.net>> wrote:

Hi Wout,

what's the elasticsearch version ? (just to try to reproduce)

Thanks,
Regards
JB

On 10/10/2018 15:31, Wout Scheepers wrote:
> Hey all,
>
>
>
> When using .withRetryConfiguration()for ElasticsearchIO, I get the
> following stacktrace:
>
>
>
> Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException:
> No content to map due to end-of-input
>
> at [Source: