Hello, I'm currently having issues connecting from Google Dataflow through SASL_SSL with a self-signed certificate to a broker running Kafka 0.10.20. I'm using the latest SNAPSHOT versions of both the Beam SDK and KafkaIO.
With the DirectRunner everything's fine. I provide either the JVM or the Kafka consumer config with a truststore (jks) that holds the self-signed certificate's CA. I see data coming through. Using the Dataflow runner I see that the workers successfully sign into the Kafka brokers running on non-GCP servers. Shortly after I get BUFFER_UNDERFLOW's and SSL-related exceptions(see excerpts at the end). No data is coming through, the Workers are unable to even acquire metadata. I tried various methods of providing the truststore to the workers, overriding SSL security/authentication checks etc. I put that logic into the Kafka Consumer Factory function as well as in KafkaIO.start(). Does somebody have experience or could assist with this issue? Or even point me in some direction.. If this is the wrong place for this kind of issue, may I ask you to point me to the right place? Excerpts: First a single statement from: org.apache.kafka.common.security.authenticator.AbstractLogin > Successfully logged in. Repeating log statements: > SSLHandshake handshakeUnwrap: handshakeStatus NEED_UNWRAP status BUFFER_UNDERFLOW > SSLHandshake NEED_UNWRAP channelId -1, handshakeResult Status = BUFFER_UNDERFLOW HandshakeStatus = NEED_UNWRAP bytesConsumed = 0 bytesProduced = 0, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 219 > SSLHandshake NEED_WRAP channelId -1, handshakeResult Status = OK HandshakeStatus = NEED_UNWRAP bytesConsumed = 0 bytesProduced = 219, appReadBuffer pos 0, netReadBuffer pos 0, netWriteBuffer pos 0 Repeating exceptions > message: Connection with kafkabroker.notgooglecloud.com/W.X.Y.Z disconnected > logger: org.apache.kafka.common.network.Selector > javax.net.ssl.SSLException: Unrecognized SSL message, plaintext connection? > at ... > at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:412) > at ... > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:139) > at ... > at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1272) > at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.start(KafkaIO.java:869) > at ... > at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:72)
