Hi Avi, thanks for reaching out to the Flink community. I haven't worked with the KinesisConsumer. Unfortenately, I cannot judge whether there's something missing in your setup. But first of all: Could you confirm that the key itself is valid? Did you try to use it in other cases?
Best, Matthias On Thu, Dec 10, 2020 at 12:48 PM Avi Levi <a...@apiseq.ai> wrote: > Hi , > Any help here will be greatly appreciated I am about to throw the towel, very > frustrating... > I am trying to put record on kinesalite with the following configuration : > > System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, > "true") > System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, > "true") > System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false") > > System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true") > > val producerConfig = new Properties() > producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") > producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x") > producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x") > producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567") > producerConfig.put( "VerifyCertificate", "false") > > However putting a record on the stream : > > val producer = new FlinkKinesisProducer(new SimpleStringSchema(), > producerConfig) > producer.setFailOnError(true) > producer.setDefaultStream(outputStreamName) > producer.setDefaultPartition("0") > > val kinesis = > env.addSource(new FlinkKinesisConsumer[String]( > inputStreamName,new SimpleStringSchema, consumerConfig)) > .addSink(producer) > > yields: > > Exception name: UnrecognizedClientExceptionError message: The security token > included in the request is invalid.6 response headers: > connection : close > content-length : 107 > content-type : application/x-amz-json-1.1 > > > ➜ ~ cat ~/.aws/credentials > [default] > aws_access_key_id = x > aws_secret_access_key = x > region = us-east-1 >