Hi, it seems that the issue is with mfa involved . I think that this is a flink issue because when sending commands (e.g put record etc') from the cli (while mfa is activated of course), it works fine (meaning credentials and security token works fine). update: after disabling the mfa the FlinkKinesisConsumer/Producer works fine, which also kind of makes me think that this is a Flink issue.
Best Avi On Fri, Dec 11, 2020 at 2:03 PM Matthias Pohl <matth...@ververica.com> wrote: > True, I got this wrong. Do you have any reason to assume that it's a Flink > issue? The configuration looks correct (relying on the Flink docs [1] > here). Have you considered asking in the AWS community for help? > > Best, > Matthias > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#kinesis-producer > > On Thu, Dec 10, 2020 at 6:31 PM Avi Levi <a...@neosec.com> wrote: > >> Hi, >> Thanks for your reply, The problem is actually with the >> FlinkKinesisProducer and not the consumer ( i did consume from the >> stream successfully ). the keys are valid >> >> On Thu, Dec 10, 2020 at 6:53 PM Matthias Pohl <matth...@ververica.com> >> wrote: >> >>> Hi Avi, >>> thanks for reaching out to the Flink community. I haven't worked with >>> the KinesisConsumer. Unfortenately, I cannot judge whether there's >>> something missing in your setup. But first of all: Could you confirm that >>> the key itself is valid? Did you try to use it in other cases? >>> >>> Best, >>> Matthias >>> >>> On Thu, Dec 10, 2020 at 12:48 PM Avi Levi <a...@apiseq.ai> wrote: >>> >>>> Hi , >>>> Any help here will be greatly appreciated I am about to throw the towel, >>>> very frustrating... >>>> I am trying to put record on kinesalite with the following configuration : >>>> >>>> System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, >>>> "true") >>>> >>>> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, >>>> "true") >>>> System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false") >>>> >>>> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,"true") >>>> >>>> val producerConfig = new Properties() >>>> producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") >>>> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x") >>>> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x") >>>> producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, >>>> "http://localhost:4567") >>>> producerConfig.put( "VerifyCertificate", "false") >>>> >>>> However putting a record on the stream : >>>> >>>> val producer = new FlinkKinesisProducer(new SimpleStringSchema(), >>>> producerConfig) >>>> producer.setFailOnError(true) >>>> producer.setDefaultStream(outputStreamName) >>>> producer.setDefaultPartition("0") >>>> >>>> val kinesis = >>>> env.addSource(new FlinkKinesisConsumer[String]( >>>> inputStreamName,new SimpleStringSchema, consumerConfig)) >>>> .addSink(producer) >>>> >>>> yields: >>>> >>>> Exception name: UnrecognizedClientExceptionError message: The security >>>> token included in the request is invalid.6 response headers: >>>> connection : close >>>> content-length : 107 >>>> content-type : application/x-amz-json-1.1 >>>> >>>> >>>> ➜ ~ cat ~/.aws/credentials >>>> [default] >>>> aws_access_key_id = x >>>> aws_secret_access_key = x >>>> region = us-east-1 >>>> >>>