Re: Flink kafka connector with JAAS configurations crashed

2018-03-19 Thread Nico Kruber
Hi,
I'm no expert on Kafka here, but as the tasks are run on the worker
nodes (where the TaskManagers are run), please double-check whether the
file under /data/apps/spark/kafka_client_jaas.conf on these nodes also
contains the same configuration as on the node running the JobManager,
i.e. an appropriate entry for 'KafkaClient'.


Regards
Nico

On 13/03/18 08:42, sundy wrote:
> 
> Hi ,all 
> 
> I use the code below to set kafka JASS config,   the
> serverConfig.jasspath is  /data/apps/spark/kafka_client_jaas.conf,   but
> on flink standalone deployment, it crashs. I am sure the
> kafka_client_jass.conf is valid, cause other applications(Spark
> streaming) are still working fine with it. So I think it may be not the
> problem caused by kafka 0.10 client.
> 
> System.setProperty("java.security.auth.login.config", serverConfig.jasspath);
> properties.setProperty("security.protocol", "SASL_PLAINTEXT");
> properties.setProperty("sasl.mechanism", "PLAIN");
> 
> 
> Exceptions msgs are:
> 
> org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
>   at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:56)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:91)
>   at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:422)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.common.KafkaException: 
> java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in 
> the JAAS configuration. System property 'java.security.auth.login.config' is 
> /data/apps/spark/kafka_client_jaas.conf
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
>   at 
> org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
>   at 
> org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657)
>   ... 11 more
> Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' 
> entry in the JAAS configuration. System property 
> 'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
>   at 
> org.apache.kafka.common.security.JaasUtils.defaultJaasConfig(JaasUtils.java:85)
>   at 
> org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:67)
>   at 
> org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:85)
>   ... 15 more
> 
> 
> 
> File content looks like below:
> 
> KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule
> required username="admin" password=“xxx"; };
> 
> It seems like the kafka_client_jaas.conf file has been read, but the
> KafkaClient entry could not be resolved. That’s very strange, other
> applications with the same config are working fine. And I wrote a simple
> Java code to test the the file, it works fine too.
> 
> 
> public static void main(String[] args) {
>   Map maps = new HashMap<>();
>   System.setProperty("java.security.auth.login.config",
> "/data/apps/spark/kafka_client_jaas.conf");
>   Configuration jassConfig = JaasUtils.jaasConfig(LoginType.CLIENT, maps);
>   AppConfigurationEntry object[] =
> jassConfig.getAppConfigurationEntry("KafkaClient");
>   for(AppConfigurationEntry entry : object){
>     System.out.println(entry.getOptions());
>   }
> }
> 
> 
> 
> 
>  
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Flink kafka connector with JAAS configurations crashed

2018-03-13 Thread sundy

Hi ,all 

I use the code below to set kafka JASS config,   the serverConfig.jasspath is  
/data/apps/spark/kafka_client_jaas.conf,   but on flink standalone deployment, 
it crashs. I am sure the kafka_client_jass.conf is valid, cause other 
applications(Spark streaming) are still working fine with it. So I think it may 
be not the problem caused by kafka 0.10 client.
System.setProperty("java.security.auth.login.config", serverConfig.jasspath);
properties.setProperty("security.protocol", "SASL_PLAINTEXT");
properties.setProperty("sasl.mechanism", "PLAIN");

Exceptions msgs are:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:717)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:597)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:579)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09PartitionDiscoverer.initializeConnections(Kafka09PartitionDiscoverer.java:56)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:91)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:422)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: 
java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the 
JAAS configuration. System property 'java.security.auth.login.config' is 
/data/apps/spark/kafka_client_jaas.conf
at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:94)
at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:93)
at 
org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:51)
at 
org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:84)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:657)
... 11 more
Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' 
entry in the JAAS configuration. System property 
'java.security.auth.login.config' is /data/apps/spark/kafka_client_jaas.conf
at 
org.apache.kafka.common.security.JaasUtils.defaultJaasConfig(JaasUtils.java:85)
at 
org.apache.kafka.common.security.JaasUtils.jaasConfig(JaasUtils.java:67)
at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:85)
... 15 more


File content looks like below:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="admin"
  password=“xxx";
};

It seems like the kafka_client_jaas.conf file has been read, but the 
KafkaClient entry could not be resolved.  That’s very strange, other 
applications with the same config are working fine. And I wrote a simple Java 
code to test the the file, it works fine too. 


public static void main(String[] args) {
  Map maps = new HashMap<>();
  System.setProperty("java.security.auth.login.config", 
"/data/apps/spark/kafka_client_jaas.conf");
  Configuration jassConfig = JaasUtils.jaasConfig(LoginType.CLIENT, maps);
  AppConfigurationEntry object[] = 
jassConfig.getAppConfigurationEntry("KafkaClient");
  for(AppConfigurationEntry entry : object){
System.out.println(entry.getOptions());
  }
}