Hello,

I'm currently having issues connecting from Google Dataflow through
SASL_SSL with a self-signed certificate to a broker running Kafka
0.10.20. I'm using the latest SNAPSHOT versions of both the Beam SDK and
KafkaIO.

With the DirectRunner everything's fine. I provide either the JVM or the
Kafka consumer config with a truststore (jks) that holds the self-signed
certificate's CA. I see data coming through.

Using the Dataflow runner I see that the workers successfully sign into
the Kafka brokers running on non-GCP servers. Shortly after I get
BUFFER_UNDERFLOW's and SSL-related exceptions(see excerpts at the end).
No data is coming through, the Workers are unable to even acquire metadata.

I tried various methods of providing the truststore to the workers,
overriding SSL security/authentication checks etc. I put that logic into
the Kafka Consumer Factory function as well as in KafkaIO.start().


Does somebody have experience or could assist with this issue? Or even
point me in some direction..

If this is the wrong place for this kind of issue, may I ask you to
point me to the right place?


Excerpts:

First a single statement from:
org.apache.kafka.common.security.authenticator.AbstractLogin
> Successfully logged in.

Repeating log statements:
> SSLHandshake handshakeUnwrap: handshakeStatus NEED_UNWRAP status
BUFFER_UNDERFLOW
> SSLHandshake NEED_UNWRAP channelId -1, handshakeResult Status =
BUFFER_UNDERFLOW HandshakeStatus = NEED_UNWRAP
bytesConsumed = 0 bytesProduced = 0, appReadBuffer pos 0, netReadBuffer
pos 0, netWriteBuffer pos 219
> SSLHandshake NEED_WRAP channelId -1, handshakeResult Status = OK
HandshakeStatus = NEED_UNWRAP
bytesConsumed = 0 bytesProduced = 219, appReadBuffer pos 0,
netReadBuffer pos 0, netWriteBuffer pos 0

Repeating exceptions
> message: Connection with kafkabroker.notgooglecloud.com/W.X.Y.Z
disconnected
> logger: org.apache.kafka.common.network.Selector

> javax.net.ssl.SSLException: Unrecognized SSL message, plaintext
connection?
>       at ...
>       at
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:412)
>       at ...
>       at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:139)
>       at ...
>       at
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1272)
>       at
org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.start(KafkaIO.java:869)
>       at ...
>       at
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:72)

Reply via email to