Hi, Thanks for your reply, The problem is actually with the FlinkKinesisProducer and not the consumer ( i did consume from the stream successfully ). the keys are valid
On Thu, Dec 10, 2020 at 6:53 PM Matthias Pohl <matth...@ververica.com> wrote: > Hi Avi, > thanks for reaching out to the Flink community. I haven't worked with the > KinesisConsumer. Unfortenately, I cannot judge whether there's something > missing in your setup. But first of all: Could you confirm that the key > itself is valid? Did you try to use it in other cases? > > Best, > Matthias > > On Thu, Dec 10, 2020 at 12:48 PM Avi Levi <a...@apiseq.ai> wrote: > >> Hi , >> Any help here will be greatly appreciated I am about to throw the towel, >> very frustrating... >> I am trying to put record on kinesalite with the following configuration : >> >> System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, >> "true") >> >> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, >> "true") >> System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false") >> >> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true") >> >> val producerConfig = new Properties() >> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") >> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x") >> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x") >> producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, >> "http://localhost:4567") >> producerConfig.put( "VerifyCertificate", "false") >> >> However putting a record on the stream : >> >> val producer = new FlinkKinesisProducer(new SimpleStringSchema(), >> producerConfig) >> producer.setFailOnError(true) >> producer.setDefaultStream(outputStreamName) >> producer.setDefaultPartition("0") >> >> val kinesis = >> env.addSource(new FlinkKinesisConsumer[String]( >> inputStreamName,new SimpleStringSchema, consumerConfig)) >> .addSink(producer) >> >> yields: >> >> Exception name: UnrecognizedClientExceptionError message: The security token >> included in the request is invalid.6 response headers: >> connection : close >> content-length : 107 >> content-type : application/x-amz-json-1.1 >> >> >> ➜ ~ cat ~/.aws/credentials >> [default] >> aws_access_key_id = x >> aws_secret_access_key = x >> region = us-east-1 >> >