Assistance configuring access to GoogleCloudStorage for row format streaming file sink

2020-11-13 Thread orionemail
Hi,

I am running flink 1.10.1 initially on my local development machine - Macbook 
Pro. I'm struggling to understand how to write to Google Cloud storage using 
the StreamingfileSink (S3 works fine).

There error I am seeing:

"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find 
a file system implementation for scheme 'gs'. The scheme is not directly 
supported by Flink and no Hadoop file system to support this scheme could be 
loaded.
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at 
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.(Buckets.java:117)"

I have put the gcs-connector-hadoop2-latest.jar in a subdir in plugins/

plugins
├── gcs-connector
│ └── gcs-connector-hadoop2-latest.jar

In flink-yaml.conf I have added:

fs.gs.impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
google.cloud.auth.service.account.enable: true
google.cloud.auth.service.account.json.keyfile: ~/key.json

This mirrors the setup I used for s3 storage.

My implementation is a simple test reading data from a kinesis stream and 
outputing to gcp.

DataStream input = getKinesisSource(env, kinesisStream);

final StreamingFileSink sink = StreamingFileSink
.forRowFormat(new Path("gs://some-gcp-bucket"), new 
SimpleStringEncoder("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(2))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(1))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.build();

//input.print();
input.addSink(sink);

Not sure what else to try. Any pointers appreciated.

Sent with [ProtonMail](https://protonmail.com) Secure Email.

Re: Using S3 as a streaming File source

2020-09-02 Thread orionemail
OK thanks for the notice on the cost point. I will check the cost calculations.

This already does have SNS enabled for another solution to this problem, but 
I'm trying to use the minimal amount of different software components at this 
stage of the pipeline. My prefered approach would have been them to send this 
data directly to a Kinesis/Kafka stream but that is not an option at this time.

Thanks for the assistance.

Sent with [ProtonMail](https://protonmail.com) Secure Email.

‐‐‐ Original Message ‐‐‐
On Tuesday, 1 September 2020 17:53, Ayush Verma  wrote:

> Word of caution. Streaming from S3 is really cost prohibitive as the only way 
> to detect new files is to continuously spam the S3 List API.
>
> On Tue, Sep 1, 2020 at 4:50 PM Jörn Franke  wrote:
>
>> Why don’t you get an S3 notification on SQS and do the actions from there?
>>
>> You will probably need to write the content of the files to a no sql 
>> database .
>>
>> Alternatively send the s3 notification to Kafka and read flink from there.
>>
>> https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html
>>
>>> Am 01.09.2020 um 16:46 schrieb orionemail :
>>
>>> 
>>> Hi,
>>>
>>> I have a S3 bucket that is continuously written to by millions of devices. 
>>> These upload small compressed archives.
>>>
>>> What I want to do is treat the tar gzipped (.tgz) files as a streaming 
>>> source and process each archive. The archive contains three files that each 
>>> might need to be processed.
>>>
>>> I see that
>>>
>>> env.readFile(f
>>>
>>> ,
>>>
>>> bucket
>>>
>>> ,
>>>
>>> FileProcessingMode.
>>>
>>> PROCESS_CONTINUOUSLY
>>>
>>> ,
>>>
>>> 1L
>>>
>>> ).print()
>>>
>>> ;
>>>
>>> might do what I need, but I am unsure how best to implement 'f' - the 
>>> InputFileFormat. Is there a similar example for me to reference?
>>>
>>> Or is this idea not workable with this method? I need to ensure exactly 
>>> once, and also trigger removal of the files after processing.
>>>
>>> Thanks,
>>>
>>> Sent with [ProtonMail](https://protonmail.com) Secure Email.

Using S3 as a streaming File source

2020-09-01 Thread orionemail
Hi,

I have a S3 bucket that is continuously written to by millions of devices. 
These upload small compressed archives.

What I want to do is treat the tar gzipped (.tgz) files as a streaming source 
and process each archive. The archive contains three files that each might need 
to be processed.

I see that

env.readFile(f

,

bucket

,

FileProcessingMode.

PROCESS_CONTINUOUSLY

,

1L

).print()

;

might do what I need, but I am unsure how best to implement 'f' - the 
InputFileFormat. Is there a similar example for me to reference?

Or is this idea not workable with this method? I need to ensure exactly once, 
and also trigger removal of the files after processing.

Thanks,

Sent with [ProtonMail](https://protonmail.com) Secure Email.

Re: Please help, I need to bootstrap keyed state into a stream

2020-08-10 Thread orionemail
I recently was in the same situation as Marco, the docs do explain what you 
need to do, but without experience with Flink it might still not be obvious 
what you need to do.

What I did initially:

Setup the job to run in a 'write a save state' mode by implementing a command 
line switch I could use when running the job:

flink run somejob.jar -d /some/path

The code then when run with this switch ran *only* the required code to setup a 
version of state and write that to a savestate.

This worked and I was on my way.

However, I then decided to split this out into a new flink 'jar' with the sole 
purpose of creating a save state.  This is a cleaner approach in my case and 
also removes dependancies (my state was loaded from DynamoDB) that were only 
required in this one instance.

As rebuilding the state from this application is intended to only be done the 
once, with checkpoints/savestates the main approach going forward.

Just remember to name your Operators with the same ID/name to make sure it is 
compatible.

Sent with ProtonMail Secure Email.

‐‐‐ Original Message ‐‐‐
On Monday, 10 August 2020 07:27, Tzu-Li Tai  wrote:

> Hi,
>
> For the NullPointerException, what seems to be happening is that you are
> setting NULL values in your MapState, that is not allowed by the API.
>
> Otherwise, the code that you showed for bootstrapping state seems to be
> fine.
>
> > I have yet to find a working example that shows how to do both
> > (bootstrapping state and start a streaming application with that state)
>
> Not entirely sure what you mean here by "doing both".
> The savepoint written using the State Processor API (what you are doing in
> the bootstrap() method) is a savepoint that may be restored from as you
> would with a typical Flink streaming job restore.
> So, usually the bootstrapping part happens as a batch "offline" job, while
> you keep your streaming job as a separate job. What are you trying to
> achieve with having both written within the same job?
>
> Cheers,
> Gordon
>
>
> -
>
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




Re: Is there a way to access the number of 'keys' that have been used for partioning?

2020-07-16 Thread orionemail
Thanks for the response, I thought as much.

Sent with [ProtonMail](https://protonmail.com) Secure Email.

‐‐‐ Original Message ‐‐‐
On Wednesday, 15 July 2020 17:12, Chesnay Schepler  wrote:

> This information is not readily available; in fact Flink itself doesn't know 
> how many keys there are at any point.
> You'd have to calculate it yourself.
>
> On 15/07/2020 17:11, orionemail wrote:
>
>> Hi,
>>
>> I need to query the number of keys that a stream has been split by, is there 
>> a way to do this?
>>
>> Thanks,
>>
>> O

Is there a way to access the number of 'keys' that have been used for partioning?

2020-07-15 Thread orionemail
Hi,

I need to query the number of keys that a stream has been split by, is there a 
way to do this?

Thanks,

O

Internal state and external stores conditional usage advice sought (dynamodb asyncIO)

2020-06-08 Thread orionemail
Hi,

Following on from an earlier email my approach has changed but I am still 
unsure how to best acheive my goal.

I have records coming through a kinesis stream into flink:

{ id: 
  var1: 
  ...
}

'id' needs to be replaced with a value from a DB store, or if not present in 
the DB generate in flink a new ID, cache the value and then store back in the 
db.  This is essentially a basic ID mapping service.

Currently for each record I use asyncIO to get a value from Dynamo or generate 
and write the new value back to the DB.

This is unnecissary as I should be able to cache this value after the first 
time it is seen/generated.

What I want to do is cache the value from the DB after first fetch in some form 
of local state  but also update the DB.

My confusion is over which of the API's or what I should use to do this?

Currently my code looks something like:

source = KeyedStream getKinesisSource().keyBy(pojo - pojo.id)

SingleOutputStreamOperator ps = AsycDataStream.unorderedWait(source,
new DynoProcessingCode(),
..
..).process(new processFunction())

class processFunction extends ProcessFunction {
..
}

If I insert a KeyedProcessFunction after the keyby and before the asyncIO I 
could abort the Async process if the ID has already been read from the cache, 
but if I do need to fetch from the db, how do I store that in the keyed cache 
in the Async IO process?  It seems that maybe that is not possible and I should 
use Operator State?

Any help appreciated.

Thanks,

O

Sent with [ProtonMail](https://protonmail.com) Secure Email.

Re: Suggestions for using both broadcast sync and conditional async-io

2020-06-04 Thread orionemail
Thanks for the response, I had not seen the state processor API, somehow I 
missed that.

Regarding your second point,  this is basically an ID mapping service so I need 
the ID's persisted in the DynamoDB (or indeed any other external store) so that 
other applications may also use the 'mapped' ID value (Also so that any new 
mappings are generated/stored back should the Flink job be restarted or 
redeployed).  Maybe I do not need to use asyncIO and this could be implemented 
as a side output sink providing this data is always keyed on the original ID I 
suppose?

Thanks for your response this is certainly food for thought.

O.

Sent with [ProtonMail](https://protonmail.com) Secure Email.

‐‐‐ Original Message ‐‐‐
On Thursday, 4 June 2020 07:03, Tzu-Li (Gordon) Tai  wrote:

> Hi,
>
> For the initial DB fetch and state bootstrapping:
> That's exactly what the State Processor API is for, have you looked at that 
> already?
> It currently does support bootstrapping broadcast state [1], so that should 
> be good news for you.
>
> As a side note, I may be missing something, is broadcast state really 
> necessary in your use case?
> If in your current application, for each record you lookup DynamoDB with the 
> current key of the record,
> then in the new architecture where you move the DynamoDB database into the 
> application as Flink state, you should co-partition the entries with the 
> input record stream.
> If for each record you need to do cross-key lookups, then of course broadcast 
> state is required.
>
> As for the AsyncIO process -
> From my understanding, in the new architecture, you should no longer need the 
> AsyncIO process / lookup to DynamoDB to generate the new mapping, as all 
> information is locally available in Flink state after the bootstrap.
> So, when a record is processed, you check Flink state for existing mapping 
> and proceed, or generate a new mapping and write it to Flink state.
> Essentially, in this scenario Flink state replaces DynamoDB and all lookups 
> are local.
>
> Cheers,
> Gordon
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1
>
> On Wed, Jun 3, 2020 at 10:15 PM orionemail  wrote:
>
>> Hi,
>>
>> My current application makes use of a DynamoDB database too map a key to a 
>> value. As each record enters the system the async-io calls this db and 
>> requests a value for the key but if that value doesn't exist a new value is 
>> generated and inserted.  I have managed to do all this in one update 
>> operation to the dynamodb so performance isn't too bad.  This is usable for 
>> our current load, but our load will increase considerably in the near future 
>> and as writes are expensive (each update even if it actually returns the 
>> existing value is classed as a write) this could be a cost factor going 
>> forward.
>>
>> Looking at broadcast state seems like it might be the answer.  DynamoDB 
>> allows 'streams' of table modification events to be output to what is 
>> essentially a kinesis stream, so it might be possible to avoid the majority 
>> of write calls by storing local copies of the mapping.  I should also point 
>> out that these mappings are essentially capped.  The majority of events that 
>> come through will have an existing mapping.
>>
>> My idea is to try the following:
>>
>> 1. Application startup request the entire dataset from the DB (this is ~5m 
>> key:value pairs)
>> 2. Inject this data into flink state somehow, possibly via broadcast state?
>> 3. Subscribe to the DyanmoDB stream via broadcast state to capture updates 
>> to this table and update the flink state
>> 4. When a record is processed, check flink state for existing mapping and 
>> proceed if found.  If not, then AsyncIO process as before to generate a new 
>> mapping
>> 5. DynamoDB writes the new value to the stream so all operators get the new 
>> value via broadcast state
>>
>> Is this idea workable?  I am unsure about the initial DB fetch and the 
>> AsyncIO process should a new value need to be inserted.
>>
>> Any thoughts appreciated.
>>
>> Thanks
>>
>> O

Suggestions for using both broadcast sync and conditional async-io

2020-06-03 Thread orionemail
Hi,

My current application makes use of a DynamoDB database too map a key to a 
value. As each record enters the system the async-io calls this db and requests 
a value for the key but if that value doesn't exist a new value is generated 
and inserted.  I have managed to do all this in one update operation to the 
dynamodb so performance isn't too bad.  This is usable for our current load, 
but our load will increase considerably in the near future and as writes are 
expensive (each update even if it actually returns the existing value is 
classed as a write) this could be a cost factor going forward.

Looking at broadcast state seems like it might be the answer.  DynamoDB allows 
'streams' of table modification events to be output to what is essentially a 
kinesis stream, so it might be possible to avoid the majority of write calls by 
storing local copies of the mapping.  I should also point out that these 
mappings are essentially capped.  The majority of events that come through will 
have an existing mapping.

My idea is to try the following:

1. Application startup request the entire dataset from the DB (this is ~5m 
key:value pairs)
2. Inject this data into flink state somehow, possibly via broadcast state?
3. Subscribe to the DyanmoDB stream via broadcast state to capture updates to 
this table and update the flink state
4. When a record is processed, check flink state for existing mapping and 
proceed if found.  If not, then AsyncIO process as before to generate a new 
mapping
5. DynamoDB writes the new value to the stream so all operators get the new 
value via broadcast state

Is this idea workable?  I am unsure about the initial DB fetch and the AsyncIO 
process should a new value need to be inserted.

Any thoughts appreciated.

Thanks

O

Re: How To subscribe a Kinesis Stream using enhance fanout?

2020-05-15 Thread orionemail
Hi,

We also recently needed this functionality, unfortunately we were unable to 
implement it ourselves so changed our plan accordingly.

However we very much see the benefit for this feature and would be interested 
in following the JIRA ticket.

Thanks

‐‐‐ Original Message ‐‐‐
On Thursday, 14 May 2020 11:34, Xiaolong Wang  
wrote:

> Thanks, I'll check it out.
>
> On Thu, May 14, 2020 at 6:26 PM Tzu-Li (Gordon) Tai  
> wrote:
>
>> Hi Xiaolong,
>>
>> You are right, the way the Kinesis connector is implemented / the way the 
>> AWS APIs are used, does not allow it to consume Kinesis streams with 
>> enhanced fan-out enabled consumers [1].
>> Could you open a JIRA ticket for this?
>> As far as I can tell, this could be a valuable contribution to the connector 
>> for Kinesis users who require dedicated throughput isolated from other 
>> running consumers.
>>
>> Cheers,
>> Gordon
>>
>> [1] 
>> https://docs.aws.amazon.com/streams/latest/dev/building-enhanced-consumers-api.html
>>
>> On Wed, May 13, 2020 at 1:44 PM Xiaolong Wang  
>> wrote:
>>
>>> Hello Flink Community!
>>>
>>>   I'm currently coding on a project relying on AWS Kinesis. With the 
>>> provided connector (flink-connector-kinesis_2.11;1.10.0), I can consume the 
>>> message.
>>>
>>>  But as the main stream is used among several other teams, I was 
>>> required to use the enhance fanout of Kinesis. I checked the connector code 
>>> and found no implementations.
>>>
>>>  Has this issue occurred to anyone before?
>>>
>>> Thanks for your help.

StreamingFileSink to a S3 Bucket on a remote account using STS

2020-04-20 Thread orionemail
Hi,

New to both AWS and Flink but currently have a need to write incoming data into 
a S3 bucket managed via AWS Tempory credentials.

I am unable to get this to work, but I am not entirely sure on the steps 
needed.  I can write to S3 buckets that are not 'remote' and managed by STS 
tempory credentials fine.

I am using flink 1.9.1, as this will when deployed live in EMR.

My flink-conf.yml contains the following entries:

fs.s3a.bucket.sky-rdk-telemetry.aws.credentials.provider: > 
org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
fs.s3a.bucket.sky-rdk-telemetry.assumed.role.credentials.provider: 
org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
fs.s3a.bucket.sky-rdk-telemetry.access-key: x
fs.s3a.bucket.sky-rdk-telemetry.secret-key: 
fs.s3a.bucket.sky-rdk-telemetry.assumed.role.arn: 
fs.s3a.bucket.sky-rdk-telemetry.assumed.role.session.name: 

And my POM contains




com.amazonaws
aws-java-sdk-bom
1.11.700
pom
import





com.amazonaws
aws-java-sdk-sts
1.11.700


I have put the jar flink-s3-fs-hadoop-1.9.1.jar into the plugins directory.

Running my test Jar I am getting exceptions related to Class not found for 
org/apache/flink/fs/s3base/shaded/com/amazonaws/services/securitytoken/model/AWSSecurityTokenServiceException

and poking around I see this is shaded into a package in Kinesis.  I have added 
some rules to maven shade to rewrite the package as needed but this still 
doesn't help.

Am I heading in the correct direction?  Searching has not turned up much 
information that I have been able to make use of.

Thanks for your time,

J

Sent with [ProtonMail](https://protonmail.com) Secure Email.