Re: Unable to serialize org.apache.kafka.common.config.types.Password

2018-12-25 Thread tao xiao
Thanks, it works

On Wed, 26 Dec 2018 at 10:07 fudian.fd  wrote:

> The exception is very clear that the SourceFunction should be
> serializable. Password is not serializable. You can try to set the kafka
> consumer properties such as this:
>
> props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required
> subject=\"test\" secret=\"test\";");
>
> The String value will be parsed to Password object.(refer to the method
> org.apache.kafka.common.config.ConfigDef.parseType)
>
> Regards,
> Dian
>
>
> 在 2018年12月25日,下午11:04,tao xiao  写道:
>
> Hi team,
>
> I am passing a security enabled kafka consumer properties to
> FlinkKafkaConsumer but keep getting this
> error java.io.NotSerializableException? what is the best way to handle this?
>
> I use Flink 1.7.1 and here is the consumer property that produces the
> exception
>
> props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required
> subject=\"test\" secret=\"test\";"));
>
> stacktrace
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: The implementation of
> the FlinkKafkaConsumerBase is not serializable. The object probably
> contains or references non serializable fields.
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397)
> at
> org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69)
> Caused by: java.io.NotSerializableException:
> org.apache.kafka.common.config.types.Password
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at java.util.Hashtable.writeObject(Hashtable.java:1157)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> ... 5 more
>
>
>


Re: Unable to serialize org.apache.kafka.common.config.types.Password

2018-12-25 Thread fudian.fd
The exception is very clear that the SourceFunction should be serializable. 
Password is not serializable. You can try to set the kafka consumer properties 
such as this:

props.put(SaslConfigs.SASL_JAAS_CONFIG, "LoginModule required subject=\"test\" 
secret=\"test\";");

The String value will be parsed to Password object.(refer to the method 
org.apache.kafka.common.config.ConfigDef.parseType)

Regards,
Dian


> 在 2018年12月25日,下午11:04,tao xiao  写道:
> 
> Hi team,
> 
> I am passing a security enabled kafka consumer properties to 
> FlinkKafkaConsumer but keep getting this error 
> java.io.NotSerializableException? what is the best way to handle this?
> 
> I use Flink 1.7.1 and here is the consumer property that produces the 
> exception
> 
> props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required 
> subject=\"test\" secret=\"test\";"));
> 
> stacktrace
> Exception in thread "main" 
> org.apache.flink.api.common.InvalidProgramException: The implementation of 
> the FlinkKafkaConsumerBase is not serializable. The object probably contains 
> or references non serializable fields.
>   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397)
>   at 
> org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69)
> Caused by: java.io.NotSerializableException: 
> org.apache.kafka.common.config.types.Password
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>   at java.util.Hashtable.writeObject(Hashtable.java:1157)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
>   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
>   ... 5 more
> 



smime.p7s
Description: S/MIME cryptographic signature


Unable to serialize org.apache.kafka.common.config.types.Password

2018-12-25 Thread tao xiao
Hi team,

I am passing a security enabled kafka consumer properties to
FlinkKafkaConsumer but keep getting this
error java.io.NotSerializableException? what is the best way to handle this?

I use Flink 1.7.1 and here is the consumer property that produces the
exception

props.put(SaslConfigs.SASL_JAAS_CONFIG, new Password("LoginModule required
subject=\"test\" secret=\"test\";"));

stacktrace
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The implementation of
the FlinkKafkaConsumerBase is not serializable. The object probably
contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1471)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1415)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1397)
at
org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob.main(KafkaEventsGeneratorJob.java:69)
Caused by: java.io.NotSerializableException:
org.apache.kafka.common.config.types.Password
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.Hashtable.writeObject(Hashtable.java:1157)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 5 more