How are jks files passed to workers? I think for Kafka these files have to
exist on local file system (i.e. accessible by file:///). That was the
tricky part with Dataflow. There does not seem to be any way to pass the
files easily in Dataflow.

I don't know if the error you are seeing could be caused by missing files.
If it is caused by the missing files, you could work around it since you
are willing to change the code.

On Mon, Feb 27, 2017 at 4:56 AM, [TA] Jonas Grabber <
[email protected]> wrote:

> Hello,
>
> I'm currently having issues connecting from Google Dataflow through
> SASL_SSL with a self-signed certificate to a broker running Kafka
> 0.10.20. I'm using the latest SNAPSHOT versions of both the Beam SDK and
> KafkaIO.
>
> With the DirectRunner everything's fine. I provide either the JVM or the
> Kafka consumer config with a truststore (jks) that holds the self-signed
> certificate's CA. I see data coming through.
>
> Using the Dataflow runner I see that the workers successfully sign into
> the Kafka brokers running on non-GCP servers. Shortly after I get
> BUFFER_UNDERFLOW's and SSL-related exceptions(see excerpts at the end).
> No data is coming through, the Workers are unable to even acquire metadata.
>
> I tried various methods of providing the truststore to the workers,
> overriding SSL security/authentication checks etc. I put that logic into
> the Kafka Consumer Factory function as well as in KafkaIO.start().
>
>
> Does somebody have experience or could assist with this issue? Or even
> point me in some direction..
>
> If this is the wrong place for this kind of issue, may I ask you to
> point me to the right place?
>
>
> Excerpts:
>
> First a single statement from:
> org.apache.kafka.common.security.authenticator.AbstractLogin
> > Successfully logged in.
>
> Repeating log statements:
> > SSLHandshake handshakeUnwrap: handshakeStatus NEED_UNWRAP status
> BUFFER_UNDERFLOW
> > SSLHandshake NEED_UNWRAP channelId -1, handshakeResult Status =
> BUFFER_UNDERFLOW HandshakeStatus = NEED_UNWRAP
> bytesConsumed = 0 bytesProduced = 0, appReadBuffer pos 0, netReadBuffer
> pos 0, netWriteBuffer pos 219
> > SSLHandshake NEED_WRAP channelId -1, handshakeResult Status = OK
> HandshakeStatus = NEED_UNWRAP
> bytesConsumed = 0 bytesProduced = 219, appReadBuffer pos 0,
> netReadBuffer pos 0, netWriteBuffer pos 0
>
> Repeating exceptions
> > message: Connection with kafkabroker.notgooglecloud.com/W.X.Y.Z
> disconnected
> > logger: org.apache.kafka.common.network.Selector
>
> > javax.net.ssl.SSLException: Unrecognized SSL message, plaintext
> connection?
> >       at ...
> >       at
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(
> SslTransportLayer.java:412)
> >       at ...
> >       at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> awaitMetadataUpdate(ConsumerNetworkClient.java:139)
> >       at ...
> >       at
> org.apache.kafka.clients.consumer.KafkaConsumer.
> position(KafkaConsumer.java:1272)
> >       at
> org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.
> start(KafkaIO.java:869)
> >       at ...
> >       at
> com.google.cloud.dataflow.worker.util.common.worker.
> MapTaskExecutor.execute(MapTaskExecutor.java:72)
>

Reply via email to