Hi,
Thanks for your reply, The problem is actually with the
FlinkKinesisProducer and not the consumer ( i did consume from the stream
successfully ). the keys are valid

On Thu, Dec 10, 2020 at 6:53 PM Matthias Pohl <matth...@ververica.com>
wrote:

> Hi Avi,
> thanks for reaching out to the Flink community. I haven't worked with the
> KinesisConsumer. Unfortenately, I cannot judge whether there's something
> missing in your setup. But first of all: Could you confirm that the key
> itself is valid? Did you try to use it in other cases?
>
> Best,
> Matthias
>
> On Thu, Dec 10, 2020 at 12:48 PM Avi Levi <a...@apiseq.ai> wrote:
>
>> Hi ,
>> Any help here will be greatly appreciated I am about to throw the towel, 
>> very frustrating...
>> I am trying to put record on kinesalite with the following configuration :
>>
>> System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
>>  "true")
>>   
>> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, 
>> "true")
>>   System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false")
>>   
>> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true")
>>
>>   val producerConfig = new Properties()
>>   producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
>>   producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
>>   producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
>>   producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, 
>> "http://localhost:4567";)
>>   producerConfig.put( "VerifyCertificate", "false")
>>
>> However putting a record on the stream :
>>
>>   val producer = new FlinkKinesisProducer(new SimpleStringSchema(), 
>> producerConfig)
>>   producer.setFailOnError(true)
>>   producer.setDefaultStream(outputStreamName)
>>   producer.setDefaultPartition("0")
>>
>>   val kinesis =
>>     env.addSource(new FlinkKinesisConsumer[String](
>>       inputStreamName,new SimpleStringSchema, consumerConfig))
>>       .addSink(producer)
>>
>> yields:
>>
>> Exception name: UnrecognizedClientExceptionError message: The security token 
>> included in the request is invalid.6 response headers:
>> connection : close
>> content-length : 107
>> content-type : application/x-amz-json-1.1
>>
>>
>> ➜  ~ cat ~/.aws/credentials
>> [default]
>> aws_access_key_id = x
>> aws_secret_access_key = x
>> region = us-east-1
>>
>

Reply via email to