Hello, Kinesalite does not support EFO, so unfortunately you will need to hit the real service for any end to end test.
Thanks, Danny On Tue, 25 Apr 2023, 20:10 Charles Tan, <ctangu...@gmail.com> wrote: > Hi all, > > I’ve tried a simple Flink application which uses FlinkKinesisConsumer. I > noticed that when trying to consume from Kinesalite using the > FlinkKinesisConsumer with EFO enabled, I run into SSL handshake errors. > This is despite disabling certificate validation. Has anybody successfully > tested FlinkKinesisConsumer with EFO enabled in this way or have any > insights into what may be causing this? > > Below I provide a code snippet to reproduce this error and the stack trace > I'm seeing. Note that when I comment out the lines that set the properties > enabling EFO, the code snippet works successfully. Any help would be much > appreciated. > > Thanks, > Charles > > > Code snippet: > public static void main(String[] args) throws Exception { > > System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, > "true"); > > System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, > "true"); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > final Properties props = new Properties(); > props.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, > "TRIM_HORIZON"); > props.put(ConsumerConfigConstants.AWS_ENDPOINT, " > https://localhost:4567"); > props.put(ConsumerConfigConstants.AWS_REGION, > Region.US_EAST_1.toString()); > props.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC"); > props.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, > "FAKE_ACCESS_KEY_ID"); > props.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, > "FAKE_SECRET_KEY"); > > // Enhanced Fan-Out properties > props.put( > ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, > ConsumerConfigConstants.RecordPublisherType.EFO.name()); > props.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, > "ABCDEFGHIJKLMNOP"); > > FlinkKinesisConsumer < String > source = > new FlinkKinesisConsumer("flinkkinesistest", new > SimpleStringSchema(), props); > > DataStream < String > sourceStream = env.addSource(source); > > System.out.println(String.format("kinesis props: \n%s", props)); > > sourceStream.print(); > env.execute("tester"); > } > > Stack trace: > org.apache.flink.kinesis.shaded.io.netty.handler.codec.DecoderException: > javax.net.ssl.SSLHandshakeException: PKIX path building failed: > sun.security.provider.certpath.SunCertPathBuilderException: unable to find > valid certification path to requested target > at > org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477) > at > org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > at > org.apache.flink.kinesis.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) > at > org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) > at > org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) > at > org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) > at > org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) > at > org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) > at > org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.base/java.lang.Thread.run(Thread.java:832) > Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: > sun.security.provider.certpath.SunCertPathBuilderException: unable to find > valid certification path to requested target > at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131) > at > java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:369) > at > java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:312) > at > java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:307) > at > java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1357) > at > java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232) > at > java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175) > at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396) > at > java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480) > at > java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1267) > at > java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1254) > at > java.base/java.security.AccessController.doPrivileged(AccessController.java:691) > at > java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1199) > at > org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1548) > at > org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1394) > at > org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235) > at > org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284) > at > org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507) > at > org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446) > ... 16 more > Caused by: sun.security.validator.ValidatorException: PKIX path building > failed: sun.security.provider.certpath.SunCertPathBuilderException: unable > to find valid certification path to requested target > at > java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439) > at > java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306) > at java.base/sun.security.validator.Validator.validate(Validator.java:264) > at > java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285) > at > java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144) > at > java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335) > ... 30 more > Caused by: sun.security.provider.certpath.SunCertPathBuilderException: > unable to find valid certification path to requested target > at > java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141) > at > java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126) > at > java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297) > at > java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434) > ... 35 more >