Hello,

Kinesalite does not support EFO, so unfortunately you will need to hit the
real service for any end to end test.

Thanks,
Danny

On Tue, 25 Apr 2023, 20:10 Charles Tan, <ctangu...@gmail.com> wrote:

> Hi all,
>
> I’ve tried a simple Flink application which uses FlinkKinesisConsumer. I
> noticed that when trying to consume from Kinesalite using the
> FlinkKinesisConsumer with EFO enabled, I run into SSL handshake errors.
> This is despite disabling certificate validation. Has anybody successfully
> tested FlinkKinesisConsumer with EFO enabled in this way or have any
> insights into what may be causing this?
>
> Below I provide a code snippet to reproduce this error and the stack trace
> I'm seeing. Note that when I comment out the lines that set the properties
> enabling EFO, the code snippet works successfully. Any help would be much
> appreciated.
>
> Thanks,
> Charles
>
>
> Code snippet:
> public static void main(String[] args) throws Exception {
>
> System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,
> "true");
>
> System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
> "true");
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>
>     final Properties props = new Properties();
>     props.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
> "TRIM_HORIZON");
>     props.put(ConsumerConfigConstants.AWS_ENDPOINT, "
> https://localhost:4567";);
>     props.put(ConsumerConfigConstants.AWS_REGION,
> Region.US_EAST_1.toString());
>     props.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
>     props.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID,
> "FAKE_ACCESS_KEY_ID");
>     props.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
> "FAKE_SECRET_KEY");
>
>     // Enhanced Fan-Out properties
>     props.put(
>         ConsumerConfigConstants.RECORD_PUBLISHER_TYPE,
> ConsumerConfigConstants.RecordPublisherType.EFO.name());
>     props.put(ConsumerConfigConstants.EFO_CONSUMER_NAME,
> "ABCDEFGHIJKLMNOP");
>
>     FlinkKinesisConsumer < String > source =
>         new FlinkKinesisConsumer("flinkkinesistest", new
> SimpleStringSchema(), props);
>
>     DataStream < String > sourceStream = env.addSource(source);
>
>     System.out.println(String.format("kinesis props: \n%s", props));
>
>     sourceStream.print();
>     env.execute("tester");
> }
>
> Stack trace:
> org.apache.flink.kinesis.shaded.io.netty.handler.codec.DecoderException:
> javax.net.ssl.SSLHandshakeException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
> valid certification path to requested target
> at
> org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477)
> at
> org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
> at
> org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
> at
> org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> at
> org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at java.base/java.lang.Thread.run(Thread.java:832)
> Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed:
> sun.security.provider.certpath.SunCertPathBuilderException: unable to find
> valid certification path to requested target
> at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
> at
> java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:369)
> at
> java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:312)
> at
> java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:307)
> at
> java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1357)
> at
> java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232)
> at
> java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175)
> at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396)
> at
> java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480)
> at
> java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1267)
> at
> java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1254)
> at
> java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
> at
> java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1199)
> at
> org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1548)
> at
> org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1394)
> at
> org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
> at
> org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
> at
> org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
> at
> org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
> ... 16 more
> Caused by: sun.security.validator.ValidatorException: PKIX path building
> failed: sun.security.provider.certpath.SunCertPathBuilderException: unable
> to find valid certification path to requested target
> at
> java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
> at
> java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
> at java.base/sun.security.validator.Validator.validate(Validator.java:264)
> at
> java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)
> at
> java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
> at
> java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)
> ... 30 more
> Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
> unable to find valid certification path to requested target
> at
> java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
> at
> java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
> at
> java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
> at
> java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)
> ... 35 more
>

Reply via email to