RE: Many S3V4AuthErrorRetryStrategy warn logs while reading/writing from S3

2021-09-24 Thread Hailu, Andreas
Thanks, Robert.

// ah

From: Robert Metzger 
Sent: Wednesday, September 22, 2021 1:49 PM
To: Hailu, Andreas [Engineering] 
Cc: user@flink.apache.org
Subject: Re: Many S3V4AuthErrorRetryStrategy warn logs while reading/writing 
from S3

Hey Andreas,

This could be related too 
https://github.com/apache/hadoop/pull/110/files#diff-0a2e55a2f79ea4079eb7b77b0dc3ee562b383076fa0ac168894d50c80a95131dR950<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_hadoop_pull_110_files-23diff-2D0a2e55a2f79ea4079eb7b77b0dc3ee562b383076fa0ac168894d50c80a95131dR950=DwMFaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=XrWyobBln-QQ652Sml6hW8XEUlQrYjx_rDoyvyu456U=EKNCzrkaFqM8LySKDKyE0xKQqHI_pE00Fxg2feI15Gg=>

I guess in Flink this would be

s3.endpoint: your-endpoint-hostname
Where your-endpoint-hostname is a region-specific endpoint, which you can 
probably look up from the S3 docs.


On Wed, Sep 22, 2021 at 7:07 PM Hailu, Andreas 
mailto:andreas.ha...@gs.com>> wrote:
Hi,

When reading/writing to and from S3 using the flink-fs-s3-hadoop plugin on 
1.11.2, we observe a lot of these WARN log statements in the logs:

WARN  S3V4AuthErrorRetryStrategy - Attempting to re-send the request to 
s3.amazonaws.com<https://urldefense.proofpoint.com/v2/url?u=http-3A__s3.amazonaws.com=DwMFaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=XrWyobBln-QQ652Sml6hW8XEUlQrYjx_rDoyvyu456U=nIZDYIPynFqOi400vqQM3FygFfZ-mwWYLqpD5v4w7K0=>
 with AWS V4 authentication. To avoid this warning in the future, please use 
region-specific endpoint to access buckets located in regions that require V4 
signing.

The applications complete successfully which is great, but I’m not sure what 
the root of the error is and I’m hesitant to silence it through our logging 
configurations. I saw something that looks similar here[1]. Is there a way for 
us to similarly have Flink’s AWS S3 client to use V4 strategy to begin with?

[1] 
https://stackoverflow.com/questions/39513518/aws-emr-writing-to-kms-encrypted-s3-parquet-files<https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_39513518_aws-2Demr-2Dwriting-2Dto-2Dkms-2Dencrypted-2Ds3-2Dparquet-2Dfiles=DwMFaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM=XrWyobBln-QQ652Sml6hW8XEUlQrYjx_rDoyvyu456U=wxsQRAaNJ8CqtAAJ6a4Klr26_e486CWtF8GWqvnQb4k=>



Andreas Hailu
Data Lake Engineering | Goldman Sachs & Co.




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>


Re: Many S3V4AuthErrorRetryStrategy warn logs while reading/writing from S3

2021-09-22 Thread Robert Metzger
Hey Andreas,

This could be related too
https://github.com/apache/hadoop/pull/110/files#diff-0a2e55a2f79ea4079eb7b77b0dc3ee562b383076fa0ac168894d50c80a95131dR950

I guess in Flink this would be

s3.endpoint: your-endpoint-hostname

Where your-endpoint-hostname is a region-specific endpoint, which you can
probably look up from the S3 docs.


On Wed, Sep 22, 2021 at 7:07 PM Hailu, Andreas  wrote:

> Hi,
>
>
>
> When reading/writing to and from S3 using the flink-fs-s3-hadoop plugin on
> 1.11.2, we observe a lot of these WARN log statements in the logs:
>
>
>
> *WARN  S3V4AuthErrorRetryStrategy - Attempting to re-send the request to
> s3.amazonaws.com  with AWS V4 authentication. To
> avoid this warning in the future, please use region-specific endpoint to
> access buckets located in regions that require V4 signing.*
>
>
>
> The applications complete successfully which is great, but I’m not sure
> what the root of the error is and I’m hesitant to silence it through our
> logging configurations. I saw something that looks similar here[1]. Is
> there a way for us to similarly have Flink’s AWS S3 client to use V4
> strategy to begin with?
>
>
>
> [1]
> https://stackoverflow.com/questions/39513518/aws-emr-writing-to-kms-encrypted-s3-parquet-files
>
>
>
> 
>
>
>
> *Andreas Hailu*
>
> *Data Lake Engineering *| Goldman Sachs & Co.
>
>
>
> --
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>


Many S3V4AuthErrorRetryStrategy warn logs while reading/writing from S3

2021-09-22 Thread Hailu, Andreas
Hi,

When reading/writing to and from S3 using the flink-fs-s3-hadoop plugin on 
1.11.2, we observe a lot of these WARN log statements in the logs:

WARN  S3V4AuthErrorRetryStrategy - Attempting to re-send the request to 
s3.amazonaws.com with AWS V4 authentication. To avoid this warning in the 
future, please use region-specific endpoint to access buckets located in 
regions that require V4 signing.

The applications complete successfully which is great, but I'm not sure what 
the root of the error is and I'm hesitant to silence it through our logging 
configurations. I saw something that looks similar here[1]. Is there a way for 
us to similarly have Flink's AWS S3 client to use V4 strategy to begin with?

[1] 
https://stackoverflow.com/questions/39513518/aws-emr-writing-to-kms-encrypted-s3-parquet-files



Andreas Hailu
Data Lake Engineering | Goldman Sachs & Co.




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-15 Thread Kathula, Sandeep
Hi Jan,

  Thanks for the reply. To answer your questions:


  1.  We are using RocksDB as backend.
  2.  We are using 10 minutes checkpointing interval.
  3.  We are getting 5,000 records per second at max each with size of around 
5KB from Kafka (25 MB/sec) which we are trying to write to S3. But as we are 
writing to S3 in parquet format 5 files once for every 5 minutes, its 
compressed and we estimate each file size to be around 100-150 MB in size.


   We even tried with 6 pods each with 4 CPU and 64GB of memory (32 GB 
going to off heap for RocksDB) but still not able to write bigger files.


Thanks,
Sandeep

From: Jan Lukavský 
Date: Tuesday, September 14, 2021 at 10:47 AM
To: "u...@beam.apache.org" 
Cc: user 
Subject: Re: Beam with Flink runner - Issues when writing to S3 in Parquet 
Format

This email is from an external sender.


Hi Sandeep,
a few questions:
 a) which state backend do you use for Flink?
 b) what is your checkpointingInterval set for FlinkRunner?
 c) how much data is there in your input Kafka topic(s)?

FileIO has to buffer all elements per window (by default) into state, so this 
might create a high pressure on state backend and/or heap, which could result 
in suboptimal performance. Due to the "connection loss" and timeout exceptions 
you describe I'd suppose there might be a lot of GC pressure.

 Jan
On 9/14/21 5:20 PM, Kathula, Sandeep wrote:
Hi,
   We have a simple Beam application which reads from Kafka, converts to 
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We have a 
fixed window of 5 minutes after conversion to PCollection and 
then writing to S3. We have around 320 columns in our data. Our intention is to 
write large files of size 128MB or more so that it won’t have a small file 
problem when reading back from Hive. But from what we observed it is taking too 
much memory to write to S3 (giving memory of 8GB to heap is not enough to write 
50 MB files and it is going OOM). When I increase memory for heap to 32GB then 
it take lot of time to write records to s3.
For instance it takes:

20 MB file - 30 sec
50 MB file - 1 min 16 sec
75 MB file - 2 min 15 sec
83 MB file - 2 min 40 sec

Code block to write to S3:
PCollection parquetRecord = …….

parquetRecord.apply(FileIO.write()
.via(ParquetIO.sink(getOutput_schema()))
.to(outputPath.isEmpty() ? outputPath() : outputPath)
.withNumShards(5)
.withNaming(new CustomFileNaming("snappy.parquet")));


We are also getting different exceptions like:


  1.  UserCodeException:

Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at 
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processE

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread David Morávek
Hi Sandeep,

Jan has already provided pretty good guidelines for getting more context on
the issue ;)

Because this is not for the first time, I would like to raise awareness,
that it's not OK to send a user related question to four Apache mailing
list (that I know of). Namely:

- user@flink.apache.org
- d...@flink.apache.org
- u...@beam.apache.org
- d...@beam.apache.org

Community focus is a very precious resource, that should be used wisely.
All of these mailings lists are answering many complex questions each day
and it's very unfortunate if any of this work needs to be duplicated. Next
time please focus Beam related user questions solely to u...@beam.apache.org
.

Thanks for your understanding. You can consult community guidelines [1][2]
if you are not sure where the particular question belongs to.

[1] https://flink.apache.org/community.html#mailing-lists
[2] https://beam.apache.org/community/contact-us/

Best,
D.

On Tue, Sep 14, 2021 at 5:47 PM Jan Lukavský  wrote:

> Hi Sandeep,
> a few questions:
>  a) which state backend do you use for Flink?
>  b) what is your checkpointingInterval set for FlinkRunner?
>  c) how much data is there in your input Kafka topic(s)?
>
> FileIO has to buffer all elements per window (by default) into state, so
> this might create a high pressure on state backend and/or heap, which could
> result in suboptimal performance. Due to the "connection loss" and timeout
> exceptions you describe I'd suppose there might be a lot of GC pressure.
>
>  Jan
> On 9/14/21 5:20 PM, Kathula, Sandeep wrote:
>
> Hi,
>
>We have a simple Beam application which reads from Kafka, converts to
> parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We
> have a fixed window of 5 minutes after conversion to
> PCollection and then writing to S3. We have around 320
> columns in our data. Our intention is to write large files of size 128MB or
> more so that it won’t have a small file problem when reading back from
> Hive. But from what we observed it is taking too much memory to write to S3
> (giving memory of 8GB to heap is not enough to write 50 MB files and it is
> going OOM). When I increase memory for heap to 32GB then it take lot of
> time to write records to s3.
>
> For instance it takes:
>
>
>
> 20 MB file - 30 sec
>
> 50 MB file - 1 min 16 sec
>
> 75 MB file - 2 min 15 sec
>
> 83 MB file - 2 min 40 sec
>
>
>
> Code block to write to S3:
>
> PCollection parquetRecord = …….
>
>
>
> parquetRecord.apply(FileIO.*write*()
> .via(ParquetIO.*sink*(getOutput_schema()))
> .to(outputPath.isEmpty() ? outputPath() : outputPath)
> .withNumShards(5)
> .withNaming(new CustomFileNaming("snappy.parquet")));
>
>
>
>
>
> We are also getting different exceptions like:
>
>
>
>1. *UserCodeException*:
>
>
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
>
> at
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>
> at
> com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
>
> at
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
>
> at
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>
> at
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
>
> at
>

Re: Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread Jan Lukavský

Hi Sandeep,
a few questions:
 a) which state backend do you use for Flink?
 b) what is your checkpointingInterval set for FlinkRunner?
 c) how much data is there in your input Kafka topic(s)?

FileIO has to buffer all elements per window (by default) into state, so 
this might create a high pressure on state backend and/or heap, which 
could result in suboptimal performance. Due to the "connection loss" and 
timeout exceptions you describe I'd suppose there might be a lot of GC 
pressure.


 Jan

On 9/14/21 5:20 PM, Kathula, Sandeep wrote:


Hi,

We have a simple Beam application which reads from Kafka, converts to 
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). 
We have a fixed window of 5 minutes after conversion to 
PCollection and then writing to S3. We have around 320 
columns in our data. Our intention is to write large files of size 
128MB or more so that it won’t have a small file problem when reading 
back from Hive. But from what we observed it is taking too much memory 
to write to S3 (giving memory of 8GB to heap is not enough to write 50 
MB files and it is going OOM). When I increase memory for heap to 32GB 
then it take lot of time to write records to s3.


For instance it takes:

20 MB file - 30 sec

50 MB file - 1 min 16 sec

75 MB file - 2 min 15 sec

83 MB file - 2 min 40 sec

Code block to write to S3:

PCollection parquetRecord = …….

parquetRecord.apply(FileIO./write/()
    .via(ParquetIO./sink/(getOutput_schema()))
    .to(outputPath.isEmpty() ? outputPath() : outputPath)
    .withNumShards(5)
    .withNaming(new CustomFileNaming("snappy.parquet")));

We are also getting different exceptions like:

 1. *UserCodeException*:

**

Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator


at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)


at 
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown 
Source)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)


at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)


at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)


at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)


at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)


at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)


at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)


at java.lang.Iterable.forEach(Iterable.java:75)

at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)


at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown 
Source)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)


at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)


at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)


at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)


at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)


at 
org.apache.flink.streaming.runtime

Beam with Flink runner - Issues when writing to S3 in Parquet Format

2021-09-14 Thread Kathula, Sandeep
Hi,
   We have a simple Beam application which reads from Kafka, converts to 
parquet and write to S3 with Flink runner (Beam 2.29 and Flink 1.12). We have a 
fixed window of 5 minutes after conversion to PCollection and 
then writing to S3. We have around 320 columns in our data. Our intention is to 
write large files of size 128MB or more so that it won’t have a small file 
problem when reading back from Hive. But from what we observed it is taking too 
much memory to write to S3 (giving memory of 8GB to heap is not enough to write 
50 MB files and it is going OOM). When I increase memory for heap to 32GB then 
it take lot of time to write records to s3.
For instance it takes:

20 MB file - 30 sec
50 MB file - 1 min 16 sec
75 MB file - 2 min 15 sec
83 MB file - 2 min 40 sec

Code block to write to S3:
PCollection parquetRecord = …….

parquetRecord.apply(FileIO.write()
.via(ParquetIO.sink(getOutput_schema()))
.to(outputPath.isEmpty() ? outputPath() : outputPath)
.withNumShards(5)
.withNaming(new CustomFileNaming("snappy.parquet")));


We are also getting different exceptions like:


  1.  UserCodeException:

Caused by: org.apache.beam.sdk.util.UserCodeException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
at 
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at 
com.intuit.data.platform.process.thrivev2.parquet.ParquetWriter$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.emit(DoFnOperator.java:1068)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$BufferedOutputManager.output(DoFnOperator.java:1022)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
at 
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.lambda$processElement$0(JsonFlattener.java:36)
at java.lang.Iterable.forEach(Iterable.java:75)
at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener.processElement(JsonFlattener.java:34)
at 
com.intuit.data.platform.process.thrivev2.flattener.JsonFlattener$DoFnInvoker.invokeProcessElement(Unknown
 Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:227)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:601)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutp

Re: Writing to S3 parquet files in Blink batch mode. Flink 1.10

2020-06-17 Thread Dmytro Dragan
Hi Jingsong,

Thank you for detailed clarification.

Best regards,
Dmytro Dragan | dd...@softserveinc.com | Lead Big Data Engineer | Big Data & 
Analytics | SoftServe


From: Jingsong Li 
Sent: Thursday, June 18, 2020 4:58:22 AM
To: Dmytro Dragan 
Cc: user@flink.apache.org 
Subject: Re: Writing to S3 parquet files in Blink batch mode. Flink 1.10

Hi Dmytro,

Yes, Batch mode must disabled checkpoint, So StreamingFileSink can not be used 
in batch mode (StreamingFileSink requires checkpoint whatever formats), we are 
refactoring it to more generic, and can be used in batch mode, but this is a 
future topic.
Currently, in batch mode, for sink, we must use `OutputFormat` with 
`FinalizeOnMaster` instead of `SinkFunction`.  We should implement the file 
committing in the method of `FinalizeOnMaster`. If you have enough time, you 
can implement a custom `OutputFormat`, it is complicated.

Now the status quo is:
- For 1.10, blink batch support writing to the hive table, if you can convert 
your table to a hive table with parquet and S3, it can be. [1]
- For 1.11, there is a new connector named `filesystem connector`, [2], you can 
define a table with parquet and S3, and writing to the table by SQL.
- For 1.11, moreover, both hive and filesystem connector support streaming 
writing by built-in reusing StreamingFileSink.

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html#writing-to-hive
[2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html

Best,
Jingsong

On Tue, Jun 16, 2020 at 10:50 PM Dmytro Dragan 
mailto:dd...@softserveinc.com>> wrote:
Hi guys,

In our use case we consider to write data to AWS S3 in parquet format using 
Blink Batch mode.
As far as I see from one side to write parquet file valid approach is to use 
StreamingFileSink with Parquet bulk-encoded format, but
Based to documentation and tests it works only with OnCheckpointRollingPolicy.

While Blink Batch mode requires disabled checkpoint.

Has anyone faced with similar issue?



--
Best, Jingsong Lee


Re: Writing to S3 parquet files in Blink batch mode. Flink 1.10

2020-06-17 Thread Jingsong Li
Hi Dmytro,

Yes, Batch mode must disabled checkpoint, So StreamingFileSink can not be
used in batch mode (StreamingFileSink requires checkpoint whatever
formats), we are refactoring it to more generic, and can be used in batch
mode, but this is a future topic.
Currently, in batch mode, for sink, we must use `OutputFormat` with
`FinalizeOnMaster` instead of `SinkFunction`.  We should implement the file
committing in the method of `FinalizeOnMaster`. If you have enough time,
you can implement a custom `OutputFormat`, it is complicated.

Now the status quo is:
- For 1.10, blink batch support writing to the hive table, if you can
convert your table to a hive table with parquet and S3, it can be. [1]
- For 1.11, there is a new connector named `filesystem connector`, [2], you
can define a table with parquet and S3, and writing to the table by SQL.
- For 1.11, moreover, both hive and filesystem connector support streaming
writing by built-in reusing StreamingFileSink.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html#writing-to-hive
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html

Best,
Jingsong

On Tue, Jun 16, 2020 at 10:50 PM Dmytro Dragan 
wrote:

> Hi guys,
>
>
>
> In our use case we consider to write data to AWS S3 in parquet format
> using Blink Batch mode.
>
> As far as I see from one side to write parquet file valid approach is to
> use *StreamingFileSink* with Parquet bulk-encoded format, but
>
> Based to documentation and tests it works only with
> OnCheckpointRollingPolicy.
>
>
>
> While Blink Batch mode requires disabled checkpoint.
>
>
>
> Has anyone faced with similar issue?
>
>
>


-- 
Best, Jingsong Lee


Writing to S3 parquet files in Blink batch mode. Flink 1.10

2020-06-16 Thread Dmytro Dragan
Hi guys,

In our use case we consider to write data to AWS S3 in parquet format using 
Blink Batch mode.
As far as I see from one side to write parquet file valid approach is to use 
StreamingFileSink with Parquet bulk-encoded format, but
Based to documentation and tests it works only with OnCheckpointRollingPolicy.

While Blink Batch mode requires disabled checkpoint.

Has anyone faced with similar issue?



Re: AvroParquetWriter issues writing to S3

2020-04-17 Thread Arvid Heise
Hi Diogo,

I saw similar issues already. The root cause is always users actually not
using any Flink specific stuff, but going to the Parquet Writer of Hadoop
directly. As you can see in your stacktrace, there is not one reference to
any Flink class.

The solution usually is to use the respective Flink sink instead of
bypassing them [1].
If you opt to implement it manually nonetheless, it's probably easier to
bundle Hadoop from a non-Flink dependency.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html

On Thu, Apr 16, 2020 at 5:36 PM Diogo Santos 
wrote:

> Hi Till,
>
> definitely seems to be a strange issue. The first time the job is loaded
> (with a clean instance of the Cluster) the job goes well, but if it is
> canceled or started again the issue came.
>
> I built an example here https://github.com/congd123/flink-s3-example
>
> You can generate the artifact of the Flink Job and start the cluster with
> the configuration on the docker-compose.
>
> Thanks for helping
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: AvroParquetWriter issues writing to S3

2020-04-16 Thread Diogo Santos
Hi Till,

definitely seems to be a strange issue. The first time the job is loaded
(with a clean instance of the Cluster) the job goes well, but if it is
canceled or started again the issue came. 

I built an example here https://github.com/congd123/flink-s3-example

You can generate the artifact of the Flink Job and start the cluster with
the configuration on the docker-compose.

Thanks for helping







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: AvroParquetWriter issues writing to S3

2020-04-16 Thread Till Rohrmann
For future reference, here is the stack trace in an easier to read format:

Caused by: java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeParserBucket at
org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825
 at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:196
 at
com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:88
 at
com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:121
 at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32
 at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25
 at
com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1714
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1434
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1356
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698
 at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680
 at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544
 at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524
 at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5052
 at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4998
 at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1335
 at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1309
 at
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904
 at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553
 at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910
 at
org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81
 at
org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:246
 at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:280
 at
org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535
undefined) at


On Thu, Apr 16, 2020 at 9:26 AM Till Rohrmann  wrote:

> Hi Diogo,
>
> thanks for reporting this issue. It looks quite strange to be honest.
> flink-s3-fs-hadoop-1.10.0.jar contains the DateTimeParserBucket class. So
> either this class wasn't loaded when starting the application from scratch
> or there could be a problem with the plugin mechanism on restarts. I'm
> pulling in Arvid who worked on the plugin mechanism and might be able to
> tell us more. In the meantime, could you provide us with the logs? They
> might tell us a bit more what happened.
>
> Cheers,
> Till
>
> On Wed, Apr 15, 2020 at 5:54 PM Diogo Santos 
> wrote:
>
>> Hi guys,
>>
>> I'm using AvroParquetWriter to write parquet files into S3 and when I
>> setup the cluster (starting fresh instances jobmanager/taskmanager etc),
>> the scheduled job starts executing without problems and could write the
>> files into S3 but if the job is canceled and starts again the job throws
>> the exception java.lang.NoClassDefFoundError:
>> org/joda/time/format/DateTimeParserBucket
>>
>> *Caused by: java.lang.NoClassDefFoundError:
>> org/joda/time/format/DateTimeParserBucket at 
>> *org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:196) at
>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:88)
>> at
>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:121)
>> at
>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>> at
>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>> at
>> com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1714)
>> at
>> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1434)
>> at
>> 

Re: AvroParquetWriter issues writing to S3

2020-04-16 Thread Till Rohrmann
Hi Diogo,

thanks for reporting this issue. It looks quite strange to be honest.
flink-s3-fs-hadoop-1.10.0.jar contains the DateTimeParserBucket class. So
either this class wasn't loaded when starting the application from scratch
or there could be a problem with the plugin mechanism on restarts. I'm
pulling in Arvid who worked on the plugin mechanism and might be able to
tell us more. In the meantime, could you provide us with the logs? They
might tell us a bit more what happened.

Cheers,
Till

On Wed, Apr 15, 2020 at 5:54 PM Diogo Santos 
wrote:

> Hi guys,
>
> I'm using AvroParquetWriter to write parquet files into S3 and when I
> setup the cluster (starting fresh instances jobmanager/taskmanager etc),
> the scheduled job starts executing without problems and could write the
> files into S3 but if the job is canceled and starts again the job throws
> the exception java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeParserBucket
>
> *Caused by: java.lang.NoClassDefFoundError:
> org/joda/time/format/DateTimeParserBucket at 
> *org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:196) at
> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:88)
> at
> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:121)
> at
> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
> at
> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
> at
> com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1714)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1434)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1356)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
> at
> com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5052)
> at
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4998)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1335)
> at
> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1309)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
> at
> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at
> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
> at
> org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:246)
> at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:280)
> at
> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
> at
> 
>
>
> Environment configuration:
> - apache flink 1.10
> - scala 2.12
> - the uber jar is in the application classloader (/lib)
> flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
> - in plugins folder exists the folder s3-fs-hadoop with the jar
> flink-s3-fs-hadoop-1.10.0.jar
>
> I can fix this issue adding the dependency joda-time to the flink lib
> folder and excluding the dependency joda-time from the hadoop-aws that is
> required by the application code.
>
> Do you know what is the root cause of this? Or if I could do another
> thing than adding the joda-time dependency on the flink lib folder?
>
> Thanks
>
> --
> cumprimentos,
> Diogo Santos
>


AvroParquetWriter issues writing to S3

2020-04-15 Thread Diogo Santos
Hi guys,

I'm using AvroParquetWriter to write parquet files into S3 and when I setup
the cluster (starting fresh instances jobmanager/taskmanager etc), the
scheduled job starts executing without problems and could write the files
into S3 but if the job is canceled and starts again the job throws the
exception java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeParserBucket

*Caused by: java.lang.NoClassDefFoundError:
org/joda/time/format/DateTimeParserBucket at
*org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:196) at
com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:88)
at
com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:121)
at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
at
com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
at
com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1714)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1434)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1356)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5052)
at
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4998)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1335)
at
com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1309)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
at
org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929) at
org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910) at
org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
at
org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:246)
at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:280)
at
org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
at



Environment configuration:
- apache flink 1.10
- scala 2.12
- the uber jar is in the application classloader (/lib)
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- in plugins folder exists the folder s3-fs-hadoop with the jar
flink-s3-fs-hadoop-1.10.0.jar

I can fix this issue adding the dependency joda-time to the flink lib
folder and excluding the dependency joda-time from the hadoop-aws that is
required by the application code.

Do you know what is the root cause of this? Or if I could do another
thing than adding the joda-time dependency on the flink lib folder?

Thanks

-- 
cumprimentos,
Diogo Santos


Re: Writing to S3

2018-11-15 Thread Steve Bistline
Hi Ken,

Thank you for the link... I had just found this and when I removed the
Hadoop dependencies ( not using in this project anyway ) things worked
fine.

Now just trying to figure out the credentials.

Thanks,

Steve

On Thu, Nov 15, 2018 at 7:12 PM Ken Krugler 
wrote:

> Hi Steve,
>
> This looks similar to
> https://stackoverflow.com/questions/52009823/flink-shaded-hadoop-s3-filesystems-still-requires-hdfs-default-and-hdfs-site-con
>
> I see that you have classes
> like org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration
> in your jar.
>
> Which makes me think you’re not excluding those properly.
>
> — Ken
>
>
> On Nov 15, 2018, at 3:58 PM, Steve Bistline 
> wrote:
>
> I am trying to write out to S3 from Flink with the following code and
> getting the error below. Tried adding the parser as a dependency, etc.
>
> Any help would be appreciated
>
> Table result = tableEnv.sql("SELECT 'Alert ',t_sKey, t_deviceID, t_sValue 
> FROM SENSORS WHERE t_sKey='TEMPERATURE' AND t_sValue > " + 
> TEMPERATURE_THRESHOLD);
> tableEnv.toAppendStream(result, Row.class).print();
> // Write to S3 bucket
> DataStream dsRow = tableEnv.toAppendStream(result, Row.class);
> dsRow.writeAsText("s3://csv-lifeai-ai/flink-alerts");
>
>
> ===
>
> avax.xml.parsers.FactoryConfigurationError: Provider for class 
> javax.xml.parsers.DocumentBuilderFactory cannot be created
>   at 
> javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311)
>   at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267)
>   at 
> javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2565)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2541)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2424)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.get(Configuration.java:1238)
>   at 
> org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(S3FileSystemFactory.java:98)
>   at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397)
>   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>   at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>   at 
> org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:218)
>   at 
> org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
>   at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:64)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:420)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:296)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)
>
>
> --
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>


Re: Writing to S3

2018-11-15 Thread Ken Krugler
Hi Steve,

This looks similar to 
https://stackoverflow.com/questions/52009823/flink-shaded-hadoop-s3-filesystems-still-requires-hdfs-default-and-hdfs-site-con
 


I see that you have classes like 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration in 
your jar.

Which makes me think you’re not excluding those properly.

— Ken


> On Nov 15, 2018, at 3:58 PM, Steve Bistline  wrote:
> 
> I am trying to write out to S3 from Flink with the following code and getting 
> the error below. Tried adding the parser as a dependency, etc.
> 
> Any help would be appreciated
> 
> Table result = tableEnv.sql("SELECT 'Alert ',t_sKey, t_deviceID, t_sValue 
> FROM SENSORS WHERE t_sKey='TEMPERATURE' AND t_sValue > " + 
> TEMPERATURE_THRESHOLD);
> tableEnv.toAppendStream(result, Row.class).print();
> // Write to S3 bucket
> DataStream dsRow = tableEnv.toAppendStream(result, Row.class);
> dsRow.writeAsText("s3://csv-lifeai-ai/flink-alerts");
> 
> ===
> 
> avax.xml.parsers.FactoryConfigurationError: Provider for class 
> javax.xml.parsers.DocumentBuilderFactory cannot be created
>   at 
> javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311)
>   at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267)
>   at 
> javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2565)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2541)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2424)
>   at 
> org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.get(Configuration.java:1238)
>   at 
> org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(S3FileSystemFactory.java:98)
>   at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397)
>   at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
>   at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>   at 
> org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:218)
>   at 
> org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
>   at 
> org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:64)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:420)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:296)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>   at java.lang.Thread.run(Thread.java:748)

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Writing to S3

2018-11-15 Thread Steve Bistline
I am trying to write out to S3 from Flink with the following code and
getting the error below. Tried adding the parser as a dependency, etc.

Any help would be appreciated

Table result = tableEnv.sql("SELECT 'Alert ',t_sKey, t_deviceID,
t_sValue FROM SENSORS WHERE t_sKey='TEMPERATURE' AND t_sValue > " +
TEMPERATURE_THRESHOLD);
tableEnv.toAppendStream(result, Row.class).print();
// Write to S3 bucket
DataStream dsRow = tableEnv.toAppendStream(result, Row.class);
dsRow.writeAsText("s3://csv-lifeai-ai/flink-alerts");


===

avax.xml.parsers.FactoryConfigurationError: Provider for class
javax.xml.parsers.DocumentBuilderFactory cannot be created
at 
javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311)
at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267)
at 
javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2565)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2541)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2424)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.get(Configuration.java:1238)
at 
org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(S3FileSystemFactory.java:98)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at 
org.apache.flink.api.common.io.FileOutputFormat.open(FileOutputFormat.java:218)
at 
org.apache.flink.api.java.io.TextOutputFormat.open(TextOutputFormat.java:88)
at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.open(OutputFormatSinkFunction.java:64)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:420)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:296)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
at java.lang.Thread.run(Thread.java:748)


Adding headers to tuples before writing to S3

2017-10-23 Thread ShB
Hi,

I'm working with Flink for data analytics and reporting. The use case is
that, when a user requests a report, a Flink cluster does some computations
on the data, generates the final report(a DataSet of tuples) and uploads the
report to S3, post which an email is sent to the corresponding email id. So
I need the uploaded report to be the final, complete one that is sent to the
user. 

I'm struggling with adding a header to the final tuple DataSet that I get,
before writing it to S3. The header will be a tuple of the same arity as the
final dataset, but with all Strings, whereas my final report tuple dataset
has Long, Double, etc.

I've been trying to write my own writeToS3 function, which creates a CSV
file with the header and the Dataset tuple and then uploads to S3, but I'm
having trouble scaling to larger dataset sizes.

Is there any other recommended way to do this? Is there any way I can extend
upon the Flink writeAsCsv method to do this?

Thanks!



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: NPE while writing to s3://

2017-03-02 Thread Till Rohrmann
Hi Sathi,

which version of Flink are you using? Since Flink 1.2 the RollingSink is
deprecated. It is now recommend to use the BucketingSink. Maybe this
problem is resolved with the newer sink.

Cheers,
Till
​

On Thu, Mar 2, 2017 at 9:44 AM, Sathi Chowdhury <
sathi.chowdh...@elliemae.com> wrote:

> I get the NPE from  the below code
>
> I am running this from my mac in a local flink cluster.
>
>
>
>
>
> RollingSink s3Sink = *new *RollingSink(*"s3://sc-sink1/"*
> );
> s3Sink.setBucketer(*new *DateTimeBucketer(*"-MM-dd--HHmm"*));
> s3Sink.setWriter(*new *StringWriter());
> s3Sink.setBatchSize(200);
> s3Sink.setPendingPrefix(*"file-"*);
> s3Sink.setPendingSuffix(*".txt"*);
> outStream.addSink(s3Sink).setParallelism(1);
>
> causes
>
>
>
> java.lang.NullPointerException
>
> at org.apache.flink.streaming.connectors.fs.RollingSink.
> openNewPartFile(RollingSink.java:463)
>
> at org.apache.flink.streaming.connectors.fs.RollingSink.
> invoke(RollingSink.java:410)
>
> at org.apache.flink.streaming.api.operators.StreamSink.
> processElement(StreamSink.java:39)
>
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:188)
>
> at org.apache.flink.streaming.runtime.tasks.
> OneInputStreamTask.run(OneInputStreamTask.java:67)
>
> at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:267)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:642)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> I even tried changing s3:// path to a local path, same issue
>
> The below works and spits out the stream, and the stream has data.
>
> outStream.writeAsText(*"/Users/schowdhury/flink/kinesisread"*+System.*currentTimeMillis*());
>
>
>
> am I missing something obvious?looks like it is trying to create a folder.
>
>
>
>
>
>
> =Notice to Recipient: This e-mail transmission, and any
> documents, files or previous e-mail messages attached to it may contain
> information that is confidential or legally privileged, and intended for
> the use of the individual or entity named above. If you are not the
> intended recipient, or a person responsible for delivering it to the
> intended recipient, you are hereby notified that you must not read this
> transmission and that any disclosure, copying, printing, distribution or
> use of any of the information contained in or attached to this transmission
> is STRICTLY PROHIBITED. If you have received this transmission in error,
> please immediately notify the sender by telephone or return e-mail and
> delete the original transmission and its attachments without reading or
> saving in any manner. Thank you. =
>


NPE while writing to s3://

2017-03-02 Thread Sathi Chowdhury
I get the NPE from  the below code
I am running this from my mac in a local flink cluster.


RollingSink s3Sink = new RollingSink("s3://sc-sink1/");
s3Sink.setBucketer(new DateTimeBucketer("-MM-dd--HHmm"));
s3Sink.setWriter(new StringWriter());
s3Sink.setBatchSize(200);
s3Sink.setPendingPrefix("file-");
s3Sink.setPendingSuffix(".txt");
outStream.addSink(s3Sink).setParallelism(1);
causes

java.lang.NullPointerException
at 
org.apache.flink.streaming.connectors.fs.RollingSink.openNewPartFile(RollingSink.java:463)
at 
org.apache.flink.streaming.connectors.fs.RollingSink.invoke(RollingSink.java:410)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:67)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)

I even tried changing s3:// path to a local path, same issue
The below works and spits out the stream, and the stream has data.

outStream.writeAsText("/Users/schowdhury/flink/kinesisread"+System.currentTimeMillis());



am I missing something obvious?looks like it is trying to create a folder.




=Notice to Recipient: This e-mail transmission, and any documents, 
files or previous e-mail messages attached to it may contain information that 
is confidential or legally privileged, and intended for the use of the 
individual or entity named above. If you are not the intended recipient, or a 
person responsible for delivering it to the intended recipient, you are hereby 
notified that you must not read this transmission and that any disclosure, 
copying, printing, distribution or use of any of the information contained in 
or attached to this transmission is STRICTLY PROHIBITED. If you have received 
this transmission in error, please immediately notify the sender by telephone 
or return e-mail and delete the original transmission and its attachments 
without reading or saving in any manner. Thank you. =


Re: Reading and Writing to S3

2017-01-11 Thread M. Dale
tting two different errors 
for the projects relating to, i think, how the core-site.xml file is being 
read. I am running the project locally in IntelliJ. I have the environment 
variable in run configurations set to HADOOP_HOME=path/to/dir-with-c 
ore-site.xml. I have also tried saving the core-site.xml in the 
src/main/resources folder but get the same errors. I want to know if my 
core-site.xml file is configured correctly for using s3a and how to have 
IntelliJ read the core-site.xml file? Also, are the core-site.xml 
configurations different for reading versus writing to s3?
This is my code for reading data from s3:
public class DesktopWriter { public static voidmain(String[] args) throws 
Exception {ExecutionEnvironment env =ExecutionEnvironment.createLoc 
alEnvironment();   DataSet data = env.readTextFile("s3://flink-t 
est/flink-test.txt");    data.print();    }}I get the error: Caused by: 
java.io.IOException: Cannot determine access key to Amazon S3. Please make sure 
to configure it by setting the configuration key 'fs.s3.accessKey'.This is my 
code for writing to S3:public class S3Sink {
    public static void main(String[] args) throws Exception {
        Map<String, String> configs = ConfigUtils.loadConfigs(“path/ 
to/config.yaml");

        final ParameterTool parameterTool = ParameterTool.fromMap(configs) ;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.get 
ExecutionEnvironment();
        env.getConfig(). disableSysoutLogging();
        env.getConfig(). setGlobalJobParameters( parameterTool); 

        DataStream messageStream = env
                .addSource(new FlinkKafkaConsumer09(
                        parameterTool.getRequired(" kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()) );

        messageStream.writeAsText(" s3a://flink-test/flinktest.txt 
").setParallelism(1);

        env.execute();
    }I get the error: Caused by: java.io.IOException: The given file URI 
(s3://flink-test/flinktest.txt ) points to the HDFS NameNode at flink-test, but 
the File System could not be initialized with that address: Unable to load AWS 
credentials from any provider in the chain
This is my core-site.xml:
            fs.defaultFS        
hdfs://localhost:9000                
fs.s3.impl        org.apache.hadoop.fs. 
s3a.S3AFileSystem    
                
fs.s3a.buffer.dir        /tmp    
                fs.s3a.awsAccessKeyId        
*    
                fs.s3a. 
awsSecretAccessKey        *    
This is my pom.xml:    
   org.apache.flink   flink-java   1.1.4       
org.apache.flink    flink-streaming-ja 
va_2.10   1.1.4    
   org.apache.flink   
flink-clients_2.10    1.1.4   
    org.apache.flink   
flink-connector-ka fka-0.9_2.10   
1.1.4       
com.amazonaws    aws-java-sdk  
  1.7.4       
org.apache.hadoop   hadoop-aws   2.7.2   
org.apache.httpcompon ents   
httpclient   4.2.5   
       org.apache.httpcompon 
ents   httpcore   
4.2.5   
Thanks!Sam

   




   



-- 

Samra KasimTechnologist
HUMANgEOVirginia Office
4350 N Fairfax Drive
Suite 950
Arlington, VA 22203E-Mail:  samra.ka...@thehumangeo.com 
Web:    http://www.thehumangeo.com/

   

Re: Reading and Writing to S3

2017-01-11 Thread Samra Kasim
h data to s3. However, I am getting two different
> errors for the projects relating to, i think, how the core-site.xml file is
> being read. I am running the project locally in IntelliJ. I have the
> environment variable in run configurations set to
> HADOOP_HOME=path/to/dir-with-c ore-site.xml. I have also tried saving the
> core-site.xml in the src/main/resources folder but get the same errors. I
> want to know if my core-site.xml file is configured correctly for using s3a
> and how to have IntelliJ read the core-site.xml file? Also, are the
> core-site.xml configurations different for reading versus writing to s3?
>
> This is my code for reading data from s3:
>
> public class DesktopWriter {
>
> public static void main(String[] args) throws Exception {
>
> ExecutionEnvironment env = ExecutionEnvironment.createLoc
> alEnvironment();
> DataSet data = env.readTextFile("s3://flink-t
> est/flink-test.txt");
> data.print();
> }
> }
> I get the error: Caused by: java.io.IOException: Cannot determine access
> key to Amazon S3. Please make sure to configure it by setting the
> configuration key 'fs.s3.accessKey'.
> This is my code for writing to S3:
> public class S3Sink {
> public static void main(String[] args) throws Exception {
> Map<String, String> configs = ConfigUtils.*loadConfigs*(“path/
> to/config.yaml");
>
> final ParameterTool parameterTool = ParameterTool.*fromMap*
> (configs) ;
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.*get
> ExecutionEnvironment*();
> env.getConfig(). disableSysoutLogging();
> env.getConfig(). setGlobalJobParameters( parameterTool);
>
> DataStream messageStream = env
> .addSource(new FlinkKafkaConsumer09(
> parameterTool.getRequired(" kafka.topic"),
> new SimpleStringSchema(),
> parameterTool.getProperties()) );
>
> messageStream.writeAsText(" s3a://flink-test/flinktest.txt "
> ).setParallelism(1);
>
> env.execute();
> }
> I get the error: Caused by: java.io.IOException: The given file URI
> (s3://flink-test/flinktest.txt ) points to the HDFS NameNode at flink-test,
> but the File System could not be initialized with that address: Unable to
> load AWS credentials from any provider in the chain
>
> This is my core-site.xml:
> 
> 
> fs.defaultFS
> hdfs://localhost:9000
> 
> 
> fs.s3.impl
> org.apache.hadoop.fs. s3a.S3AFileSystem
> 
>
> 
> 
> fs.s3a.buffer.dir
> /tmp
> 
>
> 
> 
> fs.s3a.awsAccessKeyId
> *
> 
>
> 
> 
> fs.s3a. awsSecretAccessKey
> *
> 
> 
> This is my pom.xml:
> 
> 
> org.apache.flink
> flink-java
> 1.1.4
> 
>
> 
> org.apache.flink
> flink-streaming-ja va_2.10
> 1.1.4
> 
>
> 
> org.apache.flink
> flink-clients_2.10 
> 1.1.4
> 
>
> 
> org.apache.flink
> flink-connector-ka fka-0.9_2.10
> 1.1.4
> 
>
> 
> com.amazonaws
> aws-java-sdk 
> 1.7.4
> 
>
> 
> org.apache.hadoop
> hadoop-aws
> 2.7.2
> 
>
> 
> org.apache.httpcompon ents
> httpclient
> 4.2.5
> 
> 
> org.apache.httpcompon ents
> httpcore
> 4.2.5
> 
> 
>
> Thanks!
> Sam
>
>
>
>
>
>
>


-- 


Samra Kasim

Technologist
*HUMAN*g*EO*

*Virginia Office*

*4350 N Fairfax Drive*
*Suite 950**Arlington, VA 22203*

E-Mail:  samra.ka...@thehumangeo.com  <samra.ka...@thehumangeo.com>
Web:http://www.thehumangeo.com/


Re: Reading and Writing to S3

2017-01-11 Thread M. Dale
Sam,   Don't point the variables at files, point them at the directories 
containing the files. Do you have fs.s3.impl property defined?
Concrete example:
/home/markus/hadoop-config directory has one file "core-site.xml" with 
thefollowing content:
            fs.s3.impl        
org.apache.hadoop.fs.s3a.S3AFileSystem    
                
fs.s3a.buffer.dir        /tmp    
                
fs.s3a.access.key        YOUR_ACCESS_KEY    

                
fs.s3a.secret.key        YOUR_SECRET_KEY    

/home/markus/flink-config directory has one file "flink-conf.yaml" with the 
following content point hadoopconf to the DIRECTORY containing core-site.xml:
fs.hdfs.hadoopconf: /home/markus/hadoop-config
In IntelliJ, go to Run - Edit Configurations -  andset 
the FLINK_CONF_DIR environment variable to point to the directory 
containingflink-conf.yaml (i.e in my case /home/markus/flink-config). So 
everything is pointing to directories where the code looks for well-known 
filenames.
With that, the following works to write to S3. (Maybe load events from 
collection at first):
events.writeAsText("s3:///")

env.execute 

On Wednesday, January 11, 2017 10:44 AM, Samra Kasim 
<samra.ka...@thehumangeo.com> wrote:
 

 Hi Markus,
Thanks for your help. I created an environment variable in IntelliJ for 
FLINK_CONF_DIR to point to the flink-conf.yaml and in it defined 
fs.hdfs.hadoopconf to point to the core-site.xml, but when I do that, I get the 
error: java.io.IOException: No file system found with scheme s3, referenced in 
file URI 's3://flink-test/ flinkoutputtest.txt'.
I have been able to get it to work by using the environment variable 
HADOOP_HOME to point directly to the core-site.xml, but when I do that and I 
push data from Kafka, I can see the message stream printed to my terminal, but 
no file gets saved to s3. I also don't see any errors. I have the correct AWS 
access id and key because i am able to read from files on s3 using Flink.
My code is below:    public static voidmain(String[] args) throws Exception {   
 Map<String,String> configs = ConfigUtils.loadConfigs("/ 
path/to/src/main/resources/ error-queue.yaml"); finalParameterTool 
parameterTool = ParameterTool.fromMap(configs) ;
StreamExecutionEnvironment env = StreamExecutionEnvironment. 
getExecutionEnvironment();   env.getConfig(). disableSysoutLogging();   
env.getConfig(). setGlobalJobParameters( parameterTool)
DataStream messageStream = env   .addSource(new 
FlinkKafkaConsumer09(   parameterTool.getRequired(" 
kafka.topic"),   new SimpleStringSchema(),  
 parameterTool.getProperties()) );messageStream.print();    
messageStream.writeAsText("s3: //flink-test/flinkoutputtest. 
txt").setParallelism(1); env.execute();
On Tue, Jan 10, 2017 at 4:06 PM, M. Dale <medal...@yahoo.com> wrote:

Sam,  I just happened to answer a similar question on Stackoverflow at Does 
Apache Flink AWS S3 Sink require Hadoop for local testing?. I also submitted a 
PR to make that (for me) a little clearer on the Apache Flink documentation 
(https://github.com/apache/fli nk/pull/3054/files).  
|  
|  
|  
|   ||

  |

  |
|  
|   |  
Does Apache Flink AWS S3 Sink require Hadoop for local testing?
 I am relatively new to Apache Flink and I am trying to create a simple project 
that produces a file to an AWS S3...  |   |

  |

  |

 
Let me know if that works for you.
Thanks,Markus 

On Tuesday, January 10, 2017 3:17 PM, Samra Kasim 
<samra.ka...@thehumangeo.com> wrote:
 

 Hi,
I am new to Flink and I've written two small test projects: 1) to read data 
from s3 and 2) to push data to s3. However, I am getting two different errors 
for the projects relating to, i think, how the core-site.xml file is being 
read. I am running the project locally in IntelliJ. I have the environment 
variable in run configurations set to HADOOP_HOME=path/to/dir-with-c 
ore-site.xml. I have also tried saving the core-site.xml in the 
src/main/resources folder but get the same errors. I want to know if my 
core-site.xml file is configured correctly for using s3a and how to have 
IntelliJ read the core-site.xml file? Also, are the core-site.xml 
configurations different for reading versus writing to s3?
This is my code for reading data from s3:
public class DesktopWriter { public static voidmain(String[] args) throws 
Exception {ExecutionEnvironment env =ExecutionEnvironment.createLoc 
alEnvironment();   DataSet data = env.readTextFile("s3://flink-t 
est/flink-test.txt");    data.print();    }}I get the error: Caused by: 
java.io.IOException: Cannot determine access key to Amazon S3. Please make sure 
to configure it by setting the configuration key 'fs.s3.accessKey'.This is my 
code for writing to S3:public class S3Sink {
    public static void main(S

Re: Reading and Writing to S3

2017-01-11 Thread Samra Kasim
Hi Markus,

Thanks for your help. I created an environment variable in IntelliJ for
FLINK_CONF_DIR to point to the flink-conf.yaml and in it defined
fs.hdfs.hadoopconf to point to the core-site.xml, but when I do that, I get
the error: java.io.IOException: No file system found with scheme s3,
referenced in file URI 's3://flink-test/flinkoutputtest.txt'.

I have been able to get it to work by using the environment variable
HADOOP_HOME to point directly to the core-site.xml, but when I do that and
I push data from Kafka, I can see the message stream printed to my
terminal, but no file gets saved to s3. I also don't see any errors. I have
the correct AWS access id and key because i am able to read from files on
s3 using Flink.

My code is below:

public static void main(String[] args) throws Exception {

Map<String, String> configs = ConfigUtils.loadConfigs("/
path/to/src/main/resources/error-queue.yaml");



final ParameterTool parameterTool = ParameterTool.fromMap(configs);



StreamExecutionEnvironment env = StreamExecutionEnvironment.
getExecutionEnvironment();

env.getConfig().disableSysoutLogging();

env.getConfig().setGlobalJobParameters(parameterTool)



DataStream messageStream = env

.addSource(new FlinkKafkaConsumer09(

parameterTool.getRequired("kafka.topic"),

new SimpleStringSchema(),

parameterTool.getProperties()));



messageStream.print();

messageStream.writeAsText("s3://flink-test/flinkoutputtest.
txt").setParallelism(1);



env.execute();

On Tue, Jan 10, 2017 at 4:06 PM, M. Dale <medal...@yahoo.com> wrote:

> Sam,
>   I just happened to answer a similar question on Stackoverflow at Does
> Apache Flink AWS S3 Sink require Hadoop for local testing?
> <http://stackoverflow.com/questions/41388003/does-apache-flink-aws-s3-sink-require-hadoop-for-local-testing>.
> I also submitted a PR to make that (for me) a little clearer on the Apache
> Flink documentation (https://github.com/apache/flink/pull/3054/files).
> Does Apache Flink AWS S3 Sink require Hadoop for local testing?
> I am relatively new to Apache Flink and I am trying to create a simple
> project that produces a file to an AWS S3...
>
> <http://stackoverflow.com/questions/41388003/does-apache-flink-aws-s3-sink-require-hadoop-for-local-testing>
>
> Let me know if that works for you.
>
> Thanks,
> Markus
>
>
> On Tuesday, January 10, 2017 3:17 PM, Samra Kasim <
> samra.ka...@thehumangeo.com> wrote:
>
>
> Hi,
>
> I am new to Flink and I've written two small test projects: 1) to read
> data from s3 and 2) to push data to s3. However, I am getting two different
> errors for the projects relating to, i think, how the core-site.xml file is
> being read. I am running the project locally in IntelliJ. I have the
> environment variable in run configurations set to
> HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the
> core-site.xml in the src/main/resources folder but get the same errors. I
> want to know if my core-site.xml file is configured correctly for using s3a
> and how to have IntelliJ read the core-site.xml file? Also, are the
> core-site.xml configurations different for reading versus writing to s3?
>
> This is my code for reading data from s3:
>
> public class DesktopWriter {
>
> public static void main(String[] args) throws Exception {
>
> ExecutionEnvironment env = ExecutionEnvironment.createLoc
> alEnvironment();
> DataSet data = env.readTextFile("s3://flink-t
> est/flink-test.txt");
> data.print();
> }
> }
> I get the error: Caused by: java.io.IOException: Cannot determine access
> key to Amazon S3. Please make sure to configure it by setting the
> configuration key 'fs.s3.accessKey'.
> This is my code for writing to S3:
> public class S3Sink {
> public static void main(String[] args) throws Exception {
> Map<String, String> configs = ConfigUtils.*loadConfigs*(“path/
> to/config.yaml");
>
> final ParameterTool parameterTool = ParameterTool.*fromMap*
> (configs) ;
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.*get
> ExecutionEnvironment*();
> env.getConfig(). disableSysoutLogging();
> env.getConfig(). setGlobalJobParameters( parameterTool);
>
> DataStream messageStream = env
> .addSource(new FlinkKafkaConsumer09(
> parameterTool.getRequired(" kafka.topic"),
> new SimpleStringSchema(),
> parameterTool.getProperties()) );
>
> messageStream.writeAsT

Re: Reading and Writing to S3

2017-01-10 Thread M. Dale
Sam,  I just happened to answer a similar question on Stackoverflow at Does 
Apache Flink AWS S3 Sink require Hadoop for local testing?. I also submitted a 
PR to make that (for me) a little clearer on the Apache Flink documentation 
(https://github.com/apache/flink/pull/3054/files).  
|  
|   
|   
|   ||

   |

  |
|  
|   |  
Does Apache Flink AWS S3 Sink require Hadoop for local testing?
 I am relatively new to Apache Flink and I am trying to create a simple project 
that produces a file to an AWS S3...  |   |

  |

  |

 
Let me know if that works for you.
Thanks,Markus 

On Tuesday, January 10, 2017 3:17 PM, Samra Kasim 
<samra.ka...@thehumangeo.com> wrote:
 

 Hi,
I am new to Flink and I've written two small test projects: 1) to read data 
from s3 and 2) to push data to s3. However, I am getting two different errors 
for the projects relating to, i think, how the core-site.xml file is being 
read. I am running the project locally in IntelliJ. I have the environment 
variable in run configurations set to 
HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the 
core-site.xml in the src/main/resources folder but get the same errors. I want 
to know if my core-site.xml file is configured correctly for using s3a and how 
to have IntelliJ read the core-site.xml file? Also, are the core-site.xml 
configurations different for reading versus writing to s3?
This is my code for reading data from s3:
public class DesktopWriter { public static voidmain(String[] args) throws 
Exception {ExecutionEnvironment env 
=ExecutionEnvironment.createLocalEnvironment();   DataSet data = 
env.readTextFile("s3://flink-test/flink-test.txt");    data.print();    }}I 
get the error: Caused by: java.io.IOException: Cannot determine access key to 
Amazon S3. Please make sure to configure it by setting the configuration key 
'fs.s3.accessKey'.This is my code for writing to S3:public class S3Sink {
    public static void main(String[] args) throws Exception {
        Map<String, String> configs = ConfigUtils.loadConfigs(“path/ 
to/config.yaml");

        final ParameterTool parameterTool = ParameterTool.fromMap(configs) ;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.get 
ExecutionEnvironment();
        env.getConfig(). disableSysoutLogging();
        env.getConfig(). setGlobalJobParameters( parameterTool); 

        DataStream messageStream = env
                .addSource(new FlinkKafkaConsumer09(
                        parameterTool.getRequired(" kafka.topic"),
                        new SimpleStringSchema(),
                        parameterTool.getProperties()) );

        messageStream.writeAsText(" 
s3a://flink-test/flinktest.txt").setParallelism(1);

        env.execute();
    }I get the error: Caused by: java.io.IOException: The given file URI 
(s3://flink-test/flinktest.txt) points to the HDFS NameNode at flink-test, but 
the File System could not be initialized with that address: Unable to load AWS 
credentials from any provider in the chain
This is my core-site.xml:
            fs.defaultFS        
hdfs://localhost:9000                
fs.s3.impl        org.apache.hadoop.fs. 
s3a.S3AFileSystem    
                
fs.s3a.buffer.dir        /tmp    
                fs.s3a.awsAccessKeyId        
*    
                fs.s3a. 
awsSecretAccessKey        *    
This is my pom.xml:    
   org.apache.flink   
flink-java   1.1.4   
    org.apache.flink    
flink-streaming-java_2.10   
1.1.4       
org.apache.flink   
flink-clients_2.10   1.1.4   
    org.apache.flink   
flink-connector-kafka-0.9_2.10   
1.1.4       
com.amazonaws    aws-java-sdk   
1.7.4       
org.apache.hadoop   hadoop-aws  
 2.7.2   
org.apache.httpcomponents   
httpclient   4.2.5   
       
org.apache.httpcomponents   
httpcore   4.2.5   

Thanks!Sam

   

Reading and Writing to S3

2017-01-10 Thread Samra Kasim
Hi,

I am new to Flink and I've written two small test projects: 1) to read data
from s3 and 2) to push data to s3. However, I am getting two different
errors for the projects relating to, i think, how the core-site.xml file is
being read. I am running the project locally in IntelliJ. I have the
environment variable in run configurations set to
HADOOP_HOME=path/to/dir-with-core-site.xml. I have also tried saving the
core-site.xml in the src/main/resources folder but get the same errors. I
want to know if my core-site.xml file is configured correctly for using s3a
and how to have IntelliJ read the core-site.xml file? Also, are the
core-site.xml configurations different for reading versus writing to s3?

This is my code for reading data from s3:

public class DesktopWriter {



public static void main(String[] args) throws Exception {



ExecutionEnvironment env =
ExecutionEnvironment.createLocalEnvironment();

DataSet data =
env.readTextFile("s3://flink-test/flink-test.txt");

data.print();

}

}

I get the error: Caused by: java.io.IOException: Cannot determine access
key to Amazon S3. Please make sure to configure it by setting the
configuration key 'fs.s3.accessKey'.

This is my code for writing to S3:

public class S3Sink {
public static void main(String[] args) throws Exception {
Map<String, String> configs = ConfigUtils.*loadConfigs*(“path/
to/config.yaml");

final ParameterTool parameterTool = ParameterTool.*fromMap*(configs)
;

StreamExecutionEnvironment env = StreamExecutionEnvironment.
*getExecutionEnvironment*();
env.getConfig().disableSysoutLogging();
env.getConfig().setGlobalJobParameters(parameterTool);

DataStream messageStream = env
.addSource(new FlinkKafkaConsumer09(
parameterTool.getRequired("kafka.topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));

messageStream.writeAsText("s3a://flink-test/flinktest.txt"
).setParallelism(1);

env.execute();
}

I get the error: Caused by: java.io.IOException: The given file URI
(s3://flink-test/flinktest.txt) points to the HDFS NameNode at flink-test,
but the File System could not be initialized with that address: Unable to
load AWS credentials from any provider in the chain

This is my core-site.xml:





fs.defaultFS

hdfs://localhost:9000





fs.s3.impl

org.apache.hadoop.fs.s3a.S3AFileSystem








fs.s3a.buffer.dir

/tmp








fs.s3a.awsAccessKeyId

*








fs.s3a.awsSecretAccessKey

*




This is my pom.xml:





org.apache.flink

flink-java

1.1.4







org.apache.flink

flink-streaming-java_2.10

1.1.4







org.apache.flink

flink-clients_2.10

1.1.4







org.apache.flink

flink-connector-kafka-0.9_2.10

1.1.4







com.amazonaws

aws-java-sdk

1.7.4







org.apache.hadoop

hadoop-aws

2.7.2







org.apache.httpcomponents

httpclient

4.2.5





org.apache.httpcomponents

httpcore

4.2.5





Thanks!
Sam