[jira] [Created] (FLINK-9686) Flink Kinesis Producer: Enable Kinesis authentication via AssumeRole

2018-06-28 Thread Franz Thoma (JIRA)
Franz Thoma created FLINK-9686:
--

 Summary: Flink Kinesis Producer: Enable Kinesis authentication via 
AssumeRole
 Key: FLINK-9686
 URL: https://issues.apache.org/jira/browse/FLINK-9686
 Project: Flink
  Issue Type: Improvement
  Components: Kinesis Connector
Reporter: Franz Thoma


h2. Current situation:

FlinkKinesisProducer can authenticate with Kinesis by retrieving credentials 
via one of the following mechanisms:
 * Environment variables
 * System properties
 * An AWS profile
 * Directly provided credentials (\{{BASIC}})
 * AWS's own default heuristic (\{{AUTO}})

For streaming across AWS accounts, it is considered good practise to enable 
access to the remote Kinesis stream via a role, rather than passing credentials 
for the remote account.
h2. Proposed change:

Add a new credentials provider specifying a role ARN, session name, and an 
additional credentials provider supplying the credentials for assuming the role.

Config example for assuming role {{}} with auto-detected 
credentials:{{}}
{code:java}
aws.credentials.provider: ASSUME_ROLE
aws.credentials.provider.role.arn: 
aws.credentials.provider.role.sessionName: my-session-name
aws.credentials.provider.role.provider: AUTO
{code}
{{ASSUME_ROLE}} credentials providers can be nested, i.e. it is possible to 
assume a role which in turn is allowed to assume another role:
{code:java}
aws.credentials.provider: ASSUME_ROLE
aws.credentials.provider.role.arn: 
aws.credentials.provider.role.sessionName: my-session-name
aws.credentials.provider.role.provider: ASSUME_ROLE
aws.credentials.provider.role.provider.role.arn: 
aws.credentials.provider.role.provider.role.sessionName: my-nested-session-name
aws.credentials.provider.role.provider.role.provider: AUTO
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9374) Flink Kinesis Producer does not backpressure

2018-05-16 Thread Franz Thoma (JIRA)
Franz Thoma created FLINK-9374:
--

 Summary: Flink Kinesis Producer does not backpressure
 Key: FLINK-9374
 URL: https://issues.apache.org/jira/browse/FLINK-9374
 Project: Flink
  Issue Type: Bug
Reporter: Franz Thoma
 Attachments: after.png, before.png

The {{FlinkKinesisProducer}} just accepts records and forwards it to a 
{{KinesisProducer}} from the Amazon Kinesis Producer Library (KPL). The KPL 
internally holds an unbounded queue of records that have not yet been sent.

Since Kinesis is rate-limited to 1MB per second per shard, this queue may grow 
indefinitely if Flink sends records faster than the KPL can forward them to 
Kinesis.

One way to circumvent this problem is to set a record TTL, so that queued 
records are dropped after a certain amount of time, but this will lead to data 
loss under high loads.

Currently the only time the queue is flushed is during checkpointing: 
{{FlinkKinesisProducer}} consumes records at arbitrary rate, either until a 
checkpoint is reached (and will wait until the queue is flushed), or until 
out-of-memory, whichever is reached first. (This gets worse due to the fact 
that the Java KPL is only a thin wrapper around a C++ process, so it is not 
even the Java process that runs out of memory, but the C++ process.) The 
implicit rate-limit due to checkpointing leads to a ragged throughput graph 
like this (the periods with zero throughput are the wait times before a 
checkpoint):

!file:///home/fthoma/projects/flink/before.png!!before.png! Throughput limited 
by checkpointing only

My proposed solution is to add a config option {{queueLimit}} to set a maximum 
number of records that may be waiting in the KPL queue. If this limit is 
reached, the {{FlinkKinesisProducer}} should trigger a {{flush()}} and wait 
(blocking) until the queue length is below the limit again. This automatically 
leads to backpressuring, since the {{FlinkKinesisProducer}} cannot accept 
records while waiting. For compatibility, {{queueLimit}} is set to 
{{Integer.MAX_VALUE}} by default, so the behavior is unchanged unless a client 
explicitly sets the value. Setting a »sane« default value is not possible 
unfortunately, since sensible values for the limit depend on the record size 
(the limit should be chosen so that about 10–100MB of records per shard are 
accumulated before flushing, otherwise the maximum Kinesis throughput may not 
be reached).

!after.png! Throughput with a queue limit of 10 records (the spikes are 
checkpoints, where the queue is still flushed completely)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)