True, I got this wrong. Do you have any reason to assume that it's a Flink
issue? The configuration looks correct (relying on the Flink docs [1]
here). Have you considered asking in the AWS community for help?

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#kinesis-producer

On Thu, Dec 10, 2020 at 6:31 PM Avi Levi <a...@neosec.com> wrote:

> Hi,
> Thanks for your reply, The problem is actually with the
> FlinkKinesisProducer and not the consumer ( i did consume from the stream
> successfully ). the keys are valid
>
> On Thu, Dec 10, 2020 at 6:53 PM Matthias Pohl <matth...@ververica.com>
> wrote:
>
>> Hi Avi,
>> thanks for reaching out to the Flink community. I haven't worked with the
>> KinesisConsumer. Unfortenately, I cannot judge whether there's something
>> missing in your setup. But first of all: Could you confirm that the key
>> itself is valid? Did you try to use it in other cases?
>>
>> Best,
>> Matthias
>>
>> On Thu, Dec 10, 2020 at 12:48 PM Avi Levi <a...@apiseq.ai> wrote:
>>
>>> Hi ,
>>> Any help here will be greatly appreciated I am about to throw the towel, 
>>> very frustrating...
>>> I am trying to put record on kinesalite with the following configuration :
>>>
>>> System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>>>  "true")
>>>   
>>> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, 
>>> "true")
>>>   System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false")
>>>   
>>> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true")
>>>
>>>   val producerConfig = new Properties()
>>>   producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>>>   producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
>>>   producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
>>>   producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, 
>>> "http://localhost:4567";)
>>>   producerConfig.put( "VerifyCertificate", "false")
>>>
>>> However putting a record on the stream :
>>>
>>>   val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
>>> producerConfig)
>>>   producer.setFailOnError(true)
>>>   producer.setDefaultStream(outputStreamName)
>>>   producer.setDefaultPartition("0")
>>>
>>>   val kinesis =
>>>     env.addSource(new FlinkKinesisConsumer[String](
>>>       inputStreamName,new SimpleStringSchema, consumerConfig))
>>>       .addSink(producer)
>>>
>>> yields:
>>>
>>> Exception name: UnrecognizedClientExceptionError message: The security 
>>> token included in the request is invalid.6 response headers:
>>> connection : close
>>> content-length : 107
>>> content-type : application/x-amz-json-1.1
>>>
>>>
>>> ➜  ~ cat ~/.aws/credentials
>>> [default]
>>> aws_access_key_id = x
>>> aws_secret_access_key = x
>>> region = us-east-1
>>>
>>

Reply via email to