Re: S3 for state backend in Flink 1.4.0

2018-06-01 Thread Stephan Ewen
A heads up on this front:

  - For state backends during checkpointing, I would suggest to use the
flink-s3-fs-presto, which is quite a bit faster than the flink-s3-fs-hadoop
by avoiding a bunch of unnecessary metadata operations.

  - We have started work on re-writing the Bucketing Sink to make it work
with the shaded S3 filesystems (like flink-s3-fs-presto). We are also
adding a more powerful internal abstraction that uses multipart uploads for
faster incremental persistence of result chunks on checkpoints. This should
be in 1.6, happy to share more as soon as it is out...


On Wed, Feb 7, 2018 at 3:56 PM, Marchant, Hayden 
wrote:

> WE actually got it working. Essentially, it's an implementation of
> HadoopFilesytem, and was written with the idea that it can be used with
> Spark (since it has broader adoption than Flink as of now). We managed to
> get it configured, and found the latency to be much lower than by using the
> s3 connector. There are a lot less copying operations etc... happening
> under the hood when using this native API which explains the better
> performance.
>
> Happy to provide assistance offline if you're interested.
>
> Thanks
> Hayden
>
> -Original Message-
> From: Edward Rojas [mailto:edward.roja...@gmail.com]
> Sent: Thursday, February 01, 2018 6:09 PM
> To: user@flink.apache.org
> Subject: RE: S3 for state backend in Flink 1.4.0
>
> Hi Hayden,
>
> It seems like a good alternative. But I see it's intended to work with
> spark, did you manage to get it working with Flink ?
>
> I some tests but I get several errors when trying to create a file, either
> for checkpointing or saving data.
>
> Thanks in advance,
> Regards,
> Edward
>
>
>
> --
> Sent from: https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-
> 2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.
> nabble.com_=DwICAg=j-EkbjBYwkAB4f8ZbVn1Fw=g-
> 5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c=MW1NZ-mLVkooOHg-
> TWiOE7j2e9PCk7EOAmahXApcLtQ=b8kvNKIjylDuKlc2munyBj1da85y8a
> Z8brJsO24R2GU=
>


RE: S3 for state backend in Flink 1.4.0

2018-02-07 Thread Marchant, Hayden
WE actually got it working. Essentially, it's an implementation of 
HadoopFilesytem, and was written with the idea that it can be used with Spark 
(since it has broader adoption than Flink as of now). We managed to get it 
configured, and found the latency to be much lower than by using the s3 
connector. There are a lot less copying operations etc... happening under the 
hood when using this native API which explains the better performance.

Happy to provide assistance offline if you're interested.

Thanks
Hayden

-Original Message-
From: Edward Rojas [mailto:edward.roja...@gmail.com] 
Sent: Thursday, February 01, 2018 6:09 PM
To: user@flink.apache.org
Subject: RE: S3 for state backend in Flink 1.4.0

Hi Hayden,

It seems like a good alternative. But I see it's intended to work with spark, 
did you manage to get it working with Flink ?

I some tests but I get several errors when trying to create a file, either for 
checkpointing or saving data. 

Thanks in advance,
Regards,
Edward



--
Sent from: 
https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_=DwICAg=j-EkbjBYwkAB4f8ZbVn1Fw=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c=MW1NZ-mLVkooOHg-TWiOE7j2e9PCk7EOAmahXApcLtQ=b8kvNKIjylDuKlc2munyBj1da85y8aZ8brJsO24R2GU=


RE: S3 for state backend in Flink 1.4.0

2018-02-01 Thread Edward Rojas
Hi Hayden,

It seems like a good alternative. But I see it's intended to work with
spark, did you manage to get it working with Flink ?

I some tests but I get several errors when trying to create a file, either
for checkpointing or saving data. 

Thanks in advance,
Regards,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RE: S3 for state backend in Flink 1.4.0

2018-02-01 Thread Marchant, Hayden
Edward,

We are using Object Storage for checkpointing. I'd like to point out that we 
were seeing performance problems using the S3 protocol. Btw, we had quite a few 
problems using the flink-s3-fs-hadoop jar with Object Storage and had to do 
some ugly hacking to get it working all over. We recently discovered an 
alternative connector developed by IBM Research called stocator. It's a 
streaming writer and performs better than using the S3 protocol.

Here is a link to the library - https://github.com/SparkTC/stocator, and a blog 
explaining about it - 
http://www.spark.tc/stocator-the-fast-lane-connecting-object-stores-to-spark/

Good luck!!

-Original Message-
From: Edward Rojas [mailto:edward.roja...@gmail.com] 
Sent: Wednesday, January 31, 2018 3:02 PM
To: user@flink.apache.org
Subject: RE: S3 for state backend in Flink 1.4.0

Hi,

We are having a similar problem when trying to use Flink 1.4.0 with IBM Object 
Storage for reading and writing data. 

We followed
https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.4_ops_deployment_aws.html=DwICAg=j-EkbjBYwkAB4f8ZbVn1Fw=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk=gY41yFjnJzQNaL3R1YK7HzG8XUyBn0kJ6_3m-4t7E7k=
and the suggestion on 
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D851=DwICAg=j-EkbjBYwkAB4f8ZbVn1Fw=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk=bDXNhnIV4KFTK9Byg5w2R_8UlWiXH05uAp9rkWJm_jo=.

We put the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder and 
we added the configuration on the flink-config.yaml:

s3.access-key: 
s3.secret-key: 
s3.endpoint: s3.us-south.objectstorage.softlayer.net 

With this we can read from IBM Object Storage without any problem when using 
env.readTextFile("s3://flink-test/flink-test.txt");

But we are having problems when trying to write. 
We are using a kafka consumer to read from the bus, we're making some 
processing and after saving  some data on Object Storage.

When using stream.writeAsText("s3://flink-test/data.txt").setParallelism(1);
The file is created but only when the job finish (or we stop it). But we need 
to save the data without stopping the job, so we are trying to use a Sink.

But when using a BucketingSink, we get the error: 
java.io.IOException: No FileSystem for scheme: s3 at 
org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1196)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)


Do you have any idea how could we make it work using Sink?

Thanks,
Regards,

Edward



--
Sent from: 
https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_=DwICAg=j-EkbjBYwkAB4f8ZbVn1Fw=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c=jSLW4Ugl1FvEta-R0h_thQVZ6tQ2LsUX10cRoIWNNkk=vN9sFldnlnzHZPgOBi42Rwfq1Hbq79gUPUNLgi0zmSM=


Re: S3 for state backend in Flink 1.4.0

2018-01-31 Thread Edward Rojas
Hi Aljoscha,

Thinking a little bit more about this, although IBM Object storage is
compatible with Amazon's S3, it's not an eventually consistent file system,
but rather immediately consistent. 

So we won't need the support for eventually consistent FS for our use case
to work, but we would only need that the BucketingSink uses the Flink
FileSystem abstraction instead of directly using the Hadoop FileSystem
abstraction.

Is this something that could be released earlier ?  Or do you have any idea
how we could achieve it ?

Regards,
Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 for state backend in Flink 1.4.0

2018-01-31 Thread Aljoscha Krettek
Hi,

Unfortunately not yet, though it's high on my personal list of stuff that I 
want to get resolved. It won't make it into 1.5.0 but I think 1.6.0.

Best,
Aljoscha

> On 31. Jan 2018, at 16:31, Edward Rojas  wrote:
> 
> Thanks Aljoscha.  That makes sense. 
> Do you have a more specific date for the changes on BucketingSink and/or the
> PR to be released ?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: S3 for state backend in Flink 1.4.0

2018-01-31 Thread Edward Rojas
Thanks Aljoscha.  That makes sense. 
Do you have a more specific date for the changes on BucketingSink and/or the
PR to be released ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: S3 for state backend in Flink 1.4.0

2018-01-31 Thread Aljoscha Krettek
Hi Edward,

The problem here is that readTextFile() and writeAsText() use the Flink 
FileSystem abstraction underneath, which will pick up the s3 filesystem from 
opt. The BucketingSink, on the other hand, uses the Hadoop FileSystem 
abstraction directly, meaning that there has to be some HadoopFilesystem 
implementation for s3 in the path for this to work.

Also, the BucketingSink currently has some shortcomings when used with 
eventually consistent file systems, such as S3. We are planning to solve those 
problems after releasing 1.5 and there is also an open PR that provides an 
alternative sink that works with those kinds of file system: 
https://github.com/apache/flink/pull/4607 


Best,
Aljoscha

> On 31. Jan 2018, at 14:01, Edward Rojas  wrote:
> 
> Hi,
> 
> We are having a similar problem when trying to use Flink 1.4.0 with IBM
> Object Storage for reading and writing data. 
> 
> We followed
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html
> and the suggestion on https://issues.apache.org/jira/browse/FLINK-851.
> 
> We put the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder
> and we added the configuration on the flink-config.yaml:
> 
> s3.access-key: 
> s3.secret-key: 
> s3.endpoint: s3.us-south.objectstorage.softlayer.net 
> 
> With this we can read from IBM Object Storage without any problem when using
> env.readTextFile("s3://flink-test/flink-test.txt");
> 
> But we are having problems when trying to write. 
> We are using a kafka consumer to read from the bus, we're making some
> processing and after saving  some data on Object Storage.
> 
> When using stream.writeAsText("s3://flink-test/data.txt").setParallelism(1);
> The file is created but only when the job finish (or we stop it). But we
> need to save the data without stopping the job, so we are trying to use a 
> Sink.
> 
> But when using a BucketingSink, we get the error: 
> java.io.IOException: No FileSystem for scheme: s3
> at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
>   at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1196)
>   at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>   at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
> 
> 
> Do you have any idea how could we make it work using Sink?
> 
> Thanks,
> Regards,
> 
> Edward
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



RE: S3 for state backend in Flink 1.4.0

2018-01-31 Thread Edward Rojas
Hi,

We are having a similar problem when trying to use Flink 1.4.0 with IBM
Object Storage for reading and writing data. 

We followed
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html
and the suggestion on https://issues.apache.org/jira/browse/FLINK-851.

We put the flink-s3-fs-hadoop jar from the opt/ folder to the lib/ folder
and we added the configuration on the flink-config.yaml:

s3.access-key: 
s3.secret-key: 
s3.endpoint: s3.us-south.objectstorage.softlayer.net 

With this we can read from IBM Object Storage without any problem when using
env.readTextFile("s3://flink-test/flink-test.txt");

But we are having problems when trying to write. 
We are using a kafka consumer to read from the bus, we're making some
processing and after saving  some data on Object Storage.

When using stream.writeAsText("s3://flink-test/data.txt").setParallelism(1);
The file is created but only when the job finish (or we stop it). But we
need to save the data without stopping the job, so we are trying to use a 
Sink.

But when using a BucketingSink, we get the error: 
java.io.IOException: No FileSystem for scheme: s3
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2798)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1196)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)


Do you have any idea how could we make it work using Sink?

Thanks,
Regards,

Edward



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


RE: S3 for state backend in Flink 1.4.0

2018-01-28 Thread Marchant, Hayden
I see that we can still use the other implementation, but were hoping that we'd 
benefit from the bug fix done in Flink 1.4.0 around 'repeated' load of 
configuration.  I'll check with the old implementation and see if it still 
works.

We also have seen discussions on a more native protocol that interfaces 
directly to IBM Object Storage that can be configured through the hdfs-site.xml 
called stocator that might speed things up. 

-Original Message-
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Thursday, January 25, 2018 6:30 PM
To: Marchant, Hayden [ICG-IT] <hm97...@imceu.eu.ssmb.com>
Cc: user@flink.apache.org
Subject: Re: S3 for state backend in Flink 1.4.0

Hi,

Did you try overriding that config and it didn't work? That dependency is in 
fact still using the Hadoop S3 FS implementation but is shading everything to 
our own namespace so that there can't be any version conflicts. If that doesn't 
work then we need to look into this further.

The way you usually use this is by putting the flink-s3-fs-hadoop jar from the 
opt/ folder to the lib/ folder. I'm not sure including it as a dependency will 
work but it might. You also don't have to use flink-s3-fs-hadoop dependency if 
using the regular Hadoop S3 support worked for you before. It's only an 
additional option.

Best,
Aljoscha

> On 24. Jan 2018, at 16:33, Marchant, Hayden <hayden.march...@citi.com> wrote:
> 
> Hi,
> 
> We have a Flink Streaming application that uses S3 for storing checkpoints. 
> We are not using 'regular' S3, but rather IBM Object Storage which has an 
> S3-compatible connector. We had quite some challenges in overiding the 
> endpoint from the default s3.amnazonaws.com to our internal IBM Object 
> Storage endpoint. In 1.3.2, we managed to get this working by providing our 
> own jets3t.properties file that overrode s3service.s3-endpoint 
> (https://urldefense.proofpoint.com/v2/url?u=https-3A__jets3t.s3.amazonaws.com_toolkit_configuration.html=DwIFAg=j-EkbjBYwkAB4f8ZbVn1Fw=g-5xYRH8L3aCnCNTROw5LrsB5gbTayWjXSm6Nil9x0c=pGMzFMafCab1RjHp3FDDKhlafEqeVPGytcX4PMbDk5Y=K2NJPrY_Mdv0u0B2CIvuckgr26dlraUJwZEU6aq5yXM=)
> 
> When upgrading to 1.4.0, we added dependency to the flink-s3-fs-hadoop 
> artifact. Seems that our overriding with jets3t.properties is no longer 
> relevant since does not use the Hadoop implementation anymore. 
> 
> Is there a way to overide this default endpoint, or with the presto endpoint 
> can we use this? Please note that if we provide the endpoint in the URL for 
> the state backend, it simply appends s3.amazonaws.com to the url. For example 
> s3://myobjectstorageendpoint.s3.amazonaws.com.
> 
> Are there any other solutions such as to 'rollback' to the Hadoop 
> implementation of S3?
> 
> Thanks,
> Hayden