Hi Aljoscha, I have tried different version of Flink V 1.0.0 and 1.0.3 and Kafka version 0.10.0.0. Ahmad
On Wed, Jun 1, 2016 at 10:39 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > This is unrelated to joda time or Kryo, that's just an info message in the > log. > > What version of Flink and Kafka are you using? > > > > On Wed, 1 Jun 2016 at 07:02 arpit srivastava <arpit8...@gmail.com> wrote: > >> Flink uses kryo serialization which doesn't support joda time object >> serialization. >> >> Use java.util.date or you have to change kryo. >> >> Thanks, >> Arpit >> >> On Tue, May 31, 2016 at 11:18 PM, ahmad Sa P <aspp...@gmail.com> wrote: >> >>> Hi >>> I have a problem at running a sample code from the hands-in examples of >>> Apache Flink, >>> I used the following code to send output of a stream to already running >>> Apache Kafka, and get the below error. Could anyone tell me what is going >>> wrong? >>> >>> Best regards >>> Ahmad >>> >>> public class RideCleansing { >>> >>> private static final String LOCAL_KAFKA_BROKER = "localhost:9092"; >>> public static final String CLEANSED_RIDES_TOPIC = "mytopic"; >>> >>> >>> public static void main(String[] args) throws Exception { >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> DataStream<TaxiRide> rides = env.addSource(new >>> TaxiRideGenerator("C://data/nycTaxiRides.gz", 1000.0f)); >>> >>> DataStream<TaxiRide> filteredRides = rides.filter(new NYCFilter()); >>> >>> filteredRides.addSink(new FlinkKafkaProducer<>(LOCAL_KAFKA_BROKER, >>> CLEANSED_RIDES_TOPIC, >>> new TaxiRideSchema())); >>> >>> env.execute("Taxi Ride Cleansing"); >>> } >>> >>> Error: >>> 18:43:15,734 INFO org.apache.flink.api.java.typeutils.TypeExtractor >>> - class org.joda.time.DateTime is not a valid POJO type >>> Exception in thread "main" java.lang.NoClassDefFoundError: >>> kafka/producer/Partitioner >>> at java.lang.ClassLoader.defineClass1(Native Method) >>> at java.lang.ClassLoader.defineClass(ClassLoader.java:760) >>> at >>> java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) >>> at java.net.URLClassLoader.defineClass(URLClassLoader.java:455) >>> at java.net.URLClassLoader.access$100(URLClassLoader.java:73) >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:367) >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:360) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>> at >>> com.dataArtisans.flinkTraining.exercises.dataStreamJava.rideCleansing.RideCleansing.main(RideCleansing.java:51) >>> Caused by: java.lang.ClassNotFoundException: kafka.producer.Partitioner >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:372) >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:361) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:360) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>> ... 13 more >>> >>> >>