Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
Great to hear that it is now working. Cheers, Till On Sat, Jun 13, 2020, 12:58 Felipe Gutierrez wrote: > yes. again it trapped me. It was the /etc/hosts that I change when I > am using VMs. Now, even with the INFO "INFO > org.apache.flink.api.java.typeutils.TypeExtractor - class > org.joda.time.DateTime does not contain a getter for field Millis" my > program is running. Thanks! > -- > -- Felipe Gutierrez > -- skype: felipe.o.gutierrez > -- https://felipeogutierrez.blogspot.com > > On Sat, Jun 13, 2020 at 12:40 PM Till Rohrmann > wrote: > > > > Hi Felipe, > > > > the problem why you cannot submit a job to the Flink cluster is that the > client cannot reach the blob server: > > > > Caused by: java.io.IOException: Could not connect to BlobServer at > address localhost/192.168.56.1:35193 > > > > Could you check whether the cluster has been properly started and is > reachable under 192.168.56.1:35193? You could also share the cluster logs > with us to further debug the problem. > > > > Cheers, > > Till > > > > On Sat, Jun 13, 2020 at 12:09 PM Felipe Gutierrez < > felipe.o.gutier...@gmail.com> wrote: > >> > >> Hi, I tried to change the joda.time maven version to be the same of > >> the flink-training example and I am getting this error on IntelliJ. > >> Maybe it is more precislyL > >> > >> 2020-06-13 12:04:27,333 INFO > >> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No > >> state backend has been configured, using default (Memory / JobManager) > >> MemoryStateBackend (data in heap memory / checkpoints to JobManager) > >> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, > >> maxStateSize: 5242880) > >> 2020-06-13 12:04:27,333 INFO > >> org.apache.flink.runtime.taskmanager.Task[] - > >> reducer -> flat-output -> Sink: sink (4/4) > >> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. > >> 2020-06-13 12:04:27,337 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > >> reducer -> flat-output -> Sink: sink (4/4) > >> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. > >> 2020-06-13 12:04:27,360 INFO > >> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No > >> state backend has been configured, using default (Memory / JobManager) > >> MemoryStateBackend (data in heap memory / checkpoints to JobManager) > >> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, > >> maxStateSize: 5242880) > >> 2020-06-13 12:04:27,360 INFO > >> org.apache.flink.runtime.taskmanager.Task[] - > >> reducer -> flat-output -> Sink: sink (3/4) > >> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. > >> 2020-06-13 12:04:27,362 INFO > >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > >> reducer -> flat-output -> Sink: sink (3/4) > >> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. > >> 2020-06-13 12:04:27,376 INFO > >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - > >> Initializing heap keyed state backend with stream factory. > >> 2020-06-13 12:04:27,381 INFO > >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - > >> Initializing heap keyed state backend with stream factory. > >> 2020-06-13 12:04:27,381 INFO > >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - > >> Initializing heap keyed state backend with stream factory. > >> 2020-06-13 12:04:27,389 INFO > >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - > >> Initializing heap keyed state backend with stream factory. > >> 2020-06-13 12:04:27,511 WARN > >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - > >> Falling back to default Kryo serializer because Chill serializer > >> couldn't be found. > >> java.lang.reflect.InvocationTargetException: null > >> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:?] > >> at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >> ~[?:?] > >> at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >> ~[?:?] > >> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] > >> at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436) > >> [classes/:?] > >> at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454) > >> [classes/:?] > >> at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289) > >> [classes/:?] > >> at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175) > >> [classes/:?] > >> at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) > >> [classes/:?] > >> at >
Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
yes. again it trapped me. It was the /etc/hosts that I change when I am using VMs. Now, even with the INFO "INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime does not contain a getter for field Millis" my program is running. Thanks! -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com On Sat, Jun 13, 2020 at 12:40 PM Till Rohrmann wrote: > > Hi Felipe, > > the problem why you cannot submit a job to the Flink cluster is that the > client cannot reach the blob server: > > Caused by: java.io.IOException: Could not connect to BlobServer at address > localhost/192.168.56.1:35193 > > Could you check whether the cluster has been properly started and is > reachable under 192.168.56.1:35193? You could also share the cluster logs > with us to further debug the problem. > > Cheers, > Till > > On Sat, Jun 13, 2020 at 12:09 PM Felipe Gutierrez > wrote: >> >> Hi, I tried to change the joda.time maven version to be the same of >> the flink-training example and I am getting this error on IntelliJ. >> Maybe it is more precislyL >> >> 2020-06-13 12:04:27,333 INFO >> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No >> state backend has been configured, using default (Memory / JobManager) >> MemoryStateBackend (data in heap memory / checkpoints to JobManager) >> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, >> maxStateSize: 5242880) >> 2020-06-13 12:04:27,333 INFO >> org.apache.flink.runtime.taskmanager.Task[] - >> reducer -> flat-output -> Sink: sink (4/4) >> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. >> 2020-06-13 12:04:27,337 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >> reducer -> flat-output -> Sink: sink (4/4) >> (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. >> 2020-06-13 12:04:27,360 INFO >> org.apache.flink.streaming.runtime.tasks.StreamTask [] - No >> state backend has been configured, using default (Memory / JobManager) >> MemoryStateBackend (data in heap memory / checkpoints to JobManager) >> (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, >> maxStateSize: 5242880) >> 2020-06-13 12:04:27,360 INFO >> org.apache.flink.runtime.taskmanager.Task[] - >> reducer -> flat-output -> Sink: sink (3/4) >> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. >> 2020-06-13 12:04:27,362 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph [] - >> reducer -> flat-output -> Sink: sink (3/4) >> (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. >> 2020-06-13 12:04:27,376 INFO >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - >> Initializing heap keyed state backend with stream factory. >> 2020-06-13 12:04:27,381 INFO >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - >> Initializing heap keyed state backend with stream factory. >> 2020-06-13 12:04:27,381 INFO >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - >> Initializing heap keyed state backend with stream factory. >> 2020-06-13 12:04:27,389 INFO >> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - >> Initializing heap keyed state backend with stream factory. >> 2020-06-13 12:04:27,511 WARN >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - >> Falling back to default Kryo serializer because Chill serializer >> couldn't be found. >> java.lang.reflect.InvocationTargetException: null >> at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> ~[?:?] >> at >> jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> ~[?:?] >> at >> jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> ~[?:?] >> at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436) >> [classes/:?] >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454) >> [classes/:?] >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289) >> [classes/:?] >> at >> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175) >> [classes/:?] >> at >> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) >> [classes/:?] >> at >> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) >> [classes/:?] >> at >> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71) >> [classes/:?] >> at >>
Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
Hi Felipe, the problem why you cannot submit a job to the Flink cluster is that the client cannot reach the blob server: Caused by: java.io.IOException: Could not connect to BlobServer at address localhost/192.168.56.1:35193 Could you check whether the cluster has been properly started and is reachable under 192.168.56.1:35193? You could also share the cluster logs with us to further debug the problem. Cheers, Till On Sat, Jun 13, 2020 at 12:09 PM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > Hi, I tried to change the joda.time maven version to be the same of > the flink-training example and I am getting this error on IntelliJ. > Maybe it is more precislyL > > 2020-06-13 12:04:27,333 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask [] - No > state backend has been configured, using default (Memory / JobManager) > MemoryStateBackend (data in heap memory / checkpoints to JobManager) > (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, > maxStateSize: 5242880) > 2020-06-13 12:04:27,333 INFO > org.apache.flink.runtime.taskmanager.Task[] - > reducer -> flat-output -> Sink: sink (4/4) > (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. > 2020-06-13 12:04:27,337 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > reducer -> flat-output -> Sink: sink (4/4) > (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. > 2020-06-13 12:04:27,360 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask [] - No > state backend has been configured, using default (Memory / JobManager) > MemoryStateBackend (data in heap memory / checkpoints to JobManager) > (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, > maxStateSize: 5242880) > 2020-06-13 12:04:27,360 INFO > org.apache.flink.runtime.taskmanager.Task[] - > reducer -> flat-output -> Sink: sink (3/4) > (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. > 2020-06-13 12:04:27,362 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > reducer -> flat-output -> Sink: sink (3/4) > (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. > 2020-06-13 12:04:27,376 INFO > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - > Initializing heap keyed state backend with stream factory. > 2020-06-13 12:04:27,381 INFO > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - > Initializing heap keyed state backend with stream factory. > 2020-06-13 12:04:27,381 INFO > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - > Initializing heap keyed state backend with stream factory. > 2020-06-13 12:04:27,389 INFO > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - > Initializing heap keyed state backend with stream factory. > 2020-06-13 12:04:27,511 WARN > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - > Falling back to default Kryo serializer because Chill serializer > couldn't be found. > java.lang.reflect.InvocationTargetException: null > at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:?] > at > jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:?] > at > jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:?] > at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436) > [classes/:?] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454) > [classes/:?] > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289) > [classes/:?] > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175) > [classes/:?] > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) > [classes/:?] > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > [classes/:?] > at org.apache.flink.runtime.io > .network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71) > [classes/:?] > at org.apache.flink.runtime.io > .network.api.writer.RecordWriter.emit(RecordWriter.java:117) > [classes/:?] > at org.apache.flink.runtime.io > .network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) > [classes/:?] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > [classes/:?] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) > [classes/:?] > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) > [classes/:?] > at
Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
Hi, I tried to change the joda.time maven version to be the same of the flink-training example and I am getting this error on IntelliJ. Maybe it is more precislyL 2020-06-13 12:04:27,333 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2020-06-13 12:04:27,333 INFO org.apache.flink.runtime.taskmanager.Task[] - reducer -> flat-output -> Sink: sink (4/4) (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. 2020-06-13 12:04:27,337 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - reducer -> flat-output -> Sink: sink (4/4) (bb24f17c4869d5de9f684e64ff97cf5b) switched from DEPLOYING to RUNNING. 2020-06-13 12:04:27,360 INFO org.apache.flink.streaming.runtime.tasks.StreamTask [] - No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880) 2020-06-13 12:04:27,360 INFO org.apache.flink.runtime.taskmanager.Task[] - reducer -> flat-output -> Sink: sink (3/4) (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. 2020-06-13 12:04:27,362 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - reducer -> flat-output -> Sink: sink (3/4) (0bd2d20546f4ee5eef1429bf5c80a1b4) switched from DEPLOYING to RUNNING. 2020-06-13 12:04:27,376 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - Initializing heap keyed state backend with stream factory. 2020-06-13 12:04:27,381 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - Initializing heap keyed state backend with stream factory. 2020-06-13 12:04:27,381 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - Initializing heap keyed state backend with stream factory. 2020-06-13 12:04:27,389 INFO org.apache.flink.runtime.state.heap.HeapKeyedStateBackend[] - Initializing heap keyed state backend with stream factory. 2020-06-13 12:04:27,511 WARN org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Falling back to default Kryo serializer because Chill serializer couldn't be found. java.lang.reflect.InvocationTargetException: null at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?] at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?] at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?] at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.getKryoInstance(KryoSerializer.java:436) [classes/:?] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:454) [classes/:?] at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:289) [classes/:?] at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175) [classes/:?] at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) [classes/:?] at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) [classes/:?] at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71) [classes/:?] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117) [classes/:?] at org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60) [classes/:?] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) [classes/:?] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) [classes/:?] at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) [classes/:?] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) [classes/:?] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) [classes/:?] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) [classes/:?] at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) [classes/:?] at org.apache.flink.streaming.examples.aggregate.util.TaxiRideSource.generateTaxiRideArray(TaxiRideSource.java:114) [classes/:?] at
Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
Hi Yun, it says an INFO "class org.joda.time.DateTime cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance", however I cannot submit my job. It is strange because I can start and run it on Intellij, but not on the standalone cluster in my machine. 2020-06-13 10:50:56,051 INFO org.apache.flink.core.fs.FileSystem - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available. 2020-06-13 10:50:56,143 INFO org.apache.flink.runtime.security.modules.HadoopModuleFactory - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath. 2020-06-13 10:50:56,164 INFO org.apache.flink.runtime.security.modules.JaasModule - Jaas file will be created as /tmp/jaas-837993701496785981.conf. 2020-06-13 10:50:56,169 INFO org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath. 2020-06-13 10:50:56,169 WARN org.apache.flink.runtime.security.SecurityUtils - Unable to install incompatible security context factory org.apache.flink.runtime.security.contexts.HadoopSecurityContextFactory 2020-06-13 10:50:56,171 INFO org.apache.flink.client.cli.CliFrontend - Running 'run' command. 2020-06-13 10:50:56,242 INFO org.apache.flink.client.cli.CliFrontend - Building program from JAR file 2020-06-13 10:50:57,084 INFO org.apache.flink.client.ClientUtils - Starting program (detached: false) 2020-06-13 10:50:59,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime does not contain a getter for field iMillis 2020-06-13 10:50:59,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime does not contain a setter for field iMillis 2020-06-13 10:50:59,021 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.joda.time.DateTime cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2020-06-13 10:50:59,028 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime does not contain a getter for field iMillis 2020-06-13 10:50:59,028 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.joda.time.DateTime does not contain a setter for field iMillis 2020-06-13 10:50:59,028 INFO org.apache.flink.api.java.typeutils.TypeExtractor - Class class org.joda.time.DateTime cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance. 2020-06-13 10:53:19,508 WARN org.apache.flink.util.ExecutorUtils - ExecutorService did not terminate in time. Shutting it down now. 2020-06-13 10:53:19,510 ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:148) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966) at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966) Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:276) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1764) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:106) at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:72) at
Re: How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
Hi Felipe, I tested the basic RideCleansingExercise[1] jobs that uses the TaxiRide type locally and it seems to be able to startup normally. Could you also share your current executing code and the full stacktrace of the exception ? Best, Yun [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/basics/RideCleansingExercise.java --Original Mail -- Sender:Felipe Gutierrez Send Date:Fri Jun 12 23:11:28 2020 Recipients:user Subject:How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example? Hi, I am using the flink training exercise TaxiRide [1] to execute a stream count of events. On the cluster and on my local machine I am receiving the message that joda.Time cannot be serialized "class org.joda.time.LocalDateTime is not a valid POJO type". However it is starting the job on the cluster, but not in my local machine. So I searched in the internet and it is requested to register the jodaTime class on the environment[2]. I did like this: env.getConfig().registerTypeWithKryoSerializer(DateTime.class, AvroKryoSerializerUtils.JodaDateTimeSerializer.class); env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class); env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class); and I added the joda and avro dependency on the pom.xml: joda-time joda-time org.apache.flink flink-avro ${project.version} I also tested using addDefaultKryoSerializer but I got the same error. For some reason, it is still not working. Does anyone have some hint of what could be happening? Thanks! Felipe [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com
How to add org.joda.time.DateTime in the StreamExecutionEnvironment to the TaxiRide training example?
Hi, I am using the flink training exercise TaxiRide [1] to execute a stream count of events. On the cluster and on my local machine I am receiving the message that joda.Time cannot be serialized "class org.joda.time.LocalDateTime is not a valid POJO type". However it is starting the job on the cluster, but not in my local machine. So I searched in the internet and it is requested to register the jodaTime class on the environment[2]. I did like this: env.getConfig().registerTypeWithKryoSerializer(DateTime.class, AvroKryoSerializerUtils.JodaDateTimeSerializer.class); env.getConfig().registerTypeWithKryoSerializer(LocalDate.class, AvroKryoSerializerUtils.JodaLocalDateSerializer.class); env.getConfig().registerTypeWithKryoSerializer(LocalTime.class, AvroKryoSerializerUtils.JodaLocalTimeSerializer.class); and I added the joda and avro dependency on the pom.xml: joda-time joda-time org.apache.flink flink-avro ${project.version} I also tested using addDefaultKryoSerializer but I got the same error. For some reason, it is still not working. Does anyone have some hint of what could be happening? Thanks! Felipe [1] https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/ververica/flinktraining/exercises/datastream_java/datatypes/TaxiRide.java [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/custom_serializers.html -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com