Hi,
it seems that the issue is with mfa involved .
I think that this is a flink issue because when sending commands (e.g put
record etc') from the cli (while mfa is activated of course), it works fine
(meaning credentials and security token works fine).
update:
after disabling the mfa the FlinkKinesisConsumer/Producer works fine, which
also kind of makes me think that this is a Flink issue.

Best
Avi

On Fri, Dec 11, 2020 at 2:03 PM Matthias Pohl <matth...@ververica.com>
wrote:

> True, I got this wrong. Do you have any reason to assume that it's a Flink
> issue? The configuration looks correct (relying on the Flink docs [1]
> here). Have you considered asking in the AWS community for help?
>
> Best,
> Matthias
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#kinesis-producer
>
> On Thu, Dec 10, 2020 at 6:31 PM Avi Levi <a...@neosec.com> wrote:
>
>> Hi,
>> Thanks for your reply, The problem is actually with the
>> FlinkKinesisProducer and not the consumer ( i did consume from the
>> stream successfully ). the keys are valid
>>
>> On Thu, Dec 10, 2020 at 6:53 PM Matthias Pohl <matth...@ververica.com>
>> wrote:
>>
>>> Hi Avi,
>>> thanks for reaching out to the Flink community. I haven't worked with
>>> the KinesisConsumer. Unfortenately, I cannot judge whether there's
>>> something missing in your setup. But first of all: Could you confirm that
>>> the key itself is valid? Did you try to use it in other cases?
>>>
>>> Best,
>>> Matthias
>>>
>>> On Thu, Dec 10, 2020 at 12:48 PM Avi Levi <a...@apiseq.ai> wrote:
>>>
>>>> Hi ,
>>>> Any help here will be greatly appreciated I am about to throw the towel, 
>>>> very frustrating...
>>>> I am trying to put record on kinesalite with the following configuration :
>>>>
>>>> System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>>>>  "true")
>>>>   
>>>> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>>>>  "true")
>>>>   System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false")
>>>>   
>>>> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true")
>>>>
>>>>   val producerConfig = new Properties()
>>>>   producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>>>>   producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
>>>>   producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
>>>>   producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, 
>>>> "http://localhost:4567";)
>>>>   producerConfig.put( "VerifyCertificate", "false")
>>>>
>>>> However putting a record on the stream :
>>>>
>>>>   val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
>>>> producerConfig)
>>>>   producer.setFailOnError(true)
>>>>   producer.setDefaultStream(outputStreamName)
>>>>   producer.setDefaultPartition("0")
>>>>
>>>>   val kinesis =
>>>>     env.addSource(new FlinkKinesisConsumer[String](
>>>>       inputStreamName,new SimpleStringSchema, consumerConfig))
>>>>       .addSink(producer)
>>>>
>>>> yields:
>>>>
>>>> Exception name: UnrecognizedClientExceptionError message: The security 
>>>> token included in the request is invalid.6 response headers:
>>>> connection : close
>>>> content-length : 107
>>>> content-type : application/x-amz-json-1.1
>>>>
>>>>
>>>> ➜  ~ cat ~/.aws/credentials
>>>> [default]
>>>> aws_access_key_id = x
>>>> aws_secret_access_key = x
>>>> region = us-east-1
>>>>
>>>

Reply via email to