True, I got this wrong. Do you have any reason to assume that it's a Flink issue? The configuration looks correct (relying on the Flink docs [1] here). Have you considered asking in the AWS community for help?
Best, Matthias [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#kinesis-producer On Thu, Dec 10, 2020 at 6:31 PM Avi Levi <a...@neosec.com> wrote: > Hi, > Thanks for your reply, The problem is actually with the > FlinkKinesisProducer and not the consumer ( i did consume from the stream > successfully ). the keys are valid > > On Thu, Dec 10, 2020 at 6:53 PM Matthias Pohl <matth...@ververica.com> > wrote: > >> Hi Avi, >> thanks for reaching out to the Flink community. I haven't worked with the >> KinesisConsumer. Unfortenately, I cannot judge whether there's something >> missing in your setup. But first of all: Could you confirm that the key >> itself is valid? Did you try to use it in other cases? >> >> Best, >> Matthias >> >> On Thu, Dec 10, 2020 at 12:48 PM Avi Levi <a...@apiseq.ai> wrote: >> >>> Hi , >>> Any help here will be greatly appreciated I am about to throw the towel, >>> very frustrating... >>> I am trying to put record on kinesalite with the following configuration : >>> >>> System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, >>> "true") >>> >>> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, >>> "true") >>> System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false") >>> >>> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true") >>> >>> val producerConfig = new Properties() >>> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") >>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x") >>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x") >>> producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, >>> "http://localhost:4567") >>> producerConfig.put( "VerifyCertificate", "false") >>> >>> However putting a record on the stream : >>> >>> val producer = new FlinkKinesisProducer(new SimpleStringSchema(), >>> producerConfig) >>> producer.setFailOnError(true) >>> producer.setDefaultStream(outputStreamName) >>> producer.setDefaultPartition("0") >>> >>> val kinesis = >>> env.addSource(new FlinkKinesisConsumer[String]( >>> inputStreamName,new SimpleStringSchema, consumerConfig)) >>> .addSink(producer) >>> >>> yields: >>> >>> Exception name: UnrecognizedClientExceptionError message: The security >>> token included in the request is invalid.6 response headers: >>> connection : close >>> content-length : 107 >>> content-type : application/x-amz-json-1.1 >>> >>> >>> ➜ ~ cat ~/.aws/credentials >>> [default] >>> aws_access_key_id = x >>> aws_secret_access_key = x >>> region = us-east-1 >>> >>