Hi all, I’ve tried a simple Flink application which uses FlinkKinesisConsumer. I noticed that when trying to consume from Kinesalite using the FlinkKinesisConsumer with EFO enabled, I run into SSL handshake errors. This is despite disabling certificate validation. Has anybody successfully tested FlinkKinesisConsumer with EFO enabled in this way or have any insights into what may be causing this?
Below I provide a code snippet to reproduce this error and the stack trace I'm seeing. Note that when I comment out the lines that set the properties enabling EFO, the code snippet works successfully. Any help would be much appreciated. Thanks, Charles Code snippet: public static void main(String[] args) throws Exception { System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true"); System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); final Properties props = new Properties(); props.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); props.put(ConsumerConfigConstants.AWS_ENDPOINT, "https://localhost:4567 "); props.put(ConsumerConfigConstants.AWS_REGION, Region.US_EAST_1.toString()); props.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); props.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "FAKE_ACCESS_KEY_ID"); props.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "FAKE_SECRET_KEY"); // Enhanced Fan-Out properties props.put( ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, ConsumerConfigConstants.RecordPublisherType.EFO.name()); props.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "ABCDEFGHIJKLMNOP"); FlinkKinesisConsumer < String > source = new FlinkKinesisConsumer("flinkkinesistest", new SimpleStringSchema(), props); DataStream < String > sourceStream = env.addSource(source); System.out.println(String.format("kinesis props: \n%s", props)); sourceStream.print(); env.execute("tester"); } Stack trace: org.apache.flink.kinesis.shaded.io.netty.handler.codec.DecoderException: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477) at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at org.apache.flink.kinesis.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at java.base/java.lang.Thread.run(Thread.java:832) Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:369) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:312) at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:307) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1357) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175) at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396) at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1267) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1254) at java.base/java.security.AccessController.doPrivileged(AccessController.java:691) at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1199) at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1548) at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1394) at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235) at org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284) at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) at org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) ... 16 more Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439) at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306) at java.base/sun.security.validator.Validator.validate(Validator.java:264) at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285) at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144) at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335) ... 30 more Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141) at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126) at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297) at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434) ... 35 more