Hi all,

I’ve tried a simple Flink application which uses FlinkKinesisConsumer. I
noticed that when trying to consume from Kinesalite using the
FlinkKinesisConsumer with EFO enabled, I run into SSL handshake errors.
This is despite disabling certificate validation. Has anybody successfully
tested FlinkKinesisConsumer with EFO enabled in this way or have any
insights into what may be causing this?

Below I provide a code snippet to reproduce this error and the stack trace
I'm seeing. Note that when I comment out the lines that set the properties
enabling EFO, the code snippet works successfully. Any help would be much
appreciated.

Thanks,
Charles


Code snippet:
public static void main(String[] args) throws Exception {

System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY,
"true");

System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY,
"true");
    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    final Properties props = new Properties();
    props.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
    props.put(ConsumerConfigConstants.AWS_ENDPOINT, "https://localhost:4567
");
    props.put(ConsumerConfigConstants.AWS_REGION,
Region.US_EAST_1.toString());
    props.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "BASIC");
    props.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID,
"FAKE_ACCESS_KEY_ID");
    props.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY,
"FAKE_SECRET_KEY");

    // Enhanced Fan-Out properties
    props.put(
        ConsumerConfigConstants.RECORD_PUBLISHER_TYPE,
ConsumerConfigConstants.RecordPublisherType.EFO.name());
    props.put(ConsumerConfigConstants.EFO_CONSUMER_NAME,
"ABCDEFGHIJKLMNOP");

    FlinkKinesisConsumer < String > source =
        new FlinkKinesisConsumer("flinkkinesistest", new
SimpleStringSchema(), props);

    DataStream < String > sourceStream = env.addSource(source);

    System.out.println(String.format("kinesis props: \n%s", props));

    sourceStream.print();
    env.execute("tester");
}

Stack trace:
org.apache.flink.kinesis.shaded.io.netty.handler.codec.DecoderException:
javax.net.ssl.SSLHandshakeException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target
at
org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:477)
at
org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
at
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.kinesis.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.kinesis.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.kinesis.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
at
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at
org.apache.flink.kinesis.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at
org.apache.flink.kinesis.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at
org.apache.flink.kinesis.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed:
sun.security.provider.certpath.SunCertPathBuilderException: unable to find
valid certification path to requested target
at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
at
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:369)
at
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:312)
at
java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:307)
at
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1357)
at
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232)
at
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175)
at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396)
at
java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480)
at
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1267)
at
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1254)
at
java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
at
java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1199)
at
org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.runDelegatedTasks(SslHandler.java:1548)
at
org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1394)
at
org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
at
org.apache.flink.kinesis.shaded.io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
at
org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
at
org.apache.flink.kinesis.shaded.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
... 16 more
Caused by: sun.security.validator.ValidatorException: PKIX path building
failed: sun.security.provider.certpath.SunCertPathBuilderException: unable
to find valid certification path to requested target
at
java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
at
java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
at java.base/sun.security.validator.Validator.validate(Validator.java:264)
at
java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)
at
java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
at
java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)
... 30 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException:
unable to find valid certification path to requested target
at
java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at
java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at
java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
at
java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)
... 35 more

Reply via email to