Re: How to perform this join operation?
Till, An issue with your suggestion is that the job state may grow unbounded. You are managing expiration of data from the cache in the operator, but the state is partitioned by the stream key. That means if we no longer observe a key, the state associated with that key will never be removed. In my data set keys come and go, and many will never be observed again. That will lead to continuous state growth over time. On Mon, May 2, 2016 at 6:06 PM, Elias Levywrote: > Thanks for the suggestion. I ended up implementing it a different way. > > [...] > > On Wed, Apr 20, 2016 at 10:13 AM, Till Rohrmann > wrote: > >> orry for the late reply. You're right that with the windowed join you >> would have to deal with pairs where the timestamp of (x,y) is not >> necessarily earlier than the timestamp of z. Moreover, by using sliding >> windows you would receive duplicates as you've described. Using tumbling >> windows would mean that you lose join matches if (x,y) lives in an earlier >> window. Thus, in order to solve your problem you would have to write a >> custom stream operator. >> >> The stream operator would do the following: Collecting the inputs from >> (x,y) and z which are already keyed. Thus, we know that x=z holds true. >> Using a priority queue we order the elements because we don't know how the >> arrive at the operator. Whenever we receive a watermark indicating that no >> earlier events can arrive anymore, we can go through the two priority >> queues to join the elements. The queues are part of the operators state so >> that we don't lose information in case of a recovery. >> >> I've sketched such an operator here [1]. I hope this helps you to get >> started. >> > > >
Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)
Right now I'm using Flink 1.0.2...to which version should I downgrade? The hardware seems to be ok..how could I detect a faulty hardware? These errors appeared in every run of my job after I moved the temporary directory from ssd to hdd and I extended my pipeline with a dataset that grows as the pipeline goes on,accumulating data from intermediate datasets.. On 20 May 2016 18:31, "Fabian Hueske"wrote: > The problem seems to occur quite often. > Did you update your Flink version recently? If so, could you try to > downgrade and see if the problem disappears. > > Is it otherwise possible that it is cause by faulty hardware? > > 2016-05-20 18:05 GMT+02:00 Flavio Pompermaier : > >> This time (Europed instead of Europe): >> >> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce >> (GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) -> Map >> (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread >> 'SortMerger spilling thread' terminated due to an exception: The datetime >> zone id 'Europd/Rome' is not recognised >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) >> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: >> Thread 'SortMerger spilling thread' terminated due to an exception: The >> datetime zone id 'Europd/Rome' is not recognised >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) >> at >> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) >> at >> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) >> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) >> ... 3 more >> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' >> terminated due to an exception: The datetime zone id 'Europd/Rome' is not >> recognised >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) >> Caused by: java.lang.IllegalArgumentException: The datetime zone id >> 'Europd/Rome' is not recognised >> at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229) >> at >> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94) >> at >> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74) >> at >> de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252) >> at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) >> at >> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) >> at >> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) >> >> >> >> On Fri, May 20, 2016 at 4:25 PM, Flavio Pompermaier > > wrote: >> >>> This time another error (rerialization instead of serialization): >>> >>> com.esotericsoftware.kryo.KryoException: Unable to find class: >>> it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo >>> at >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >>> at >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >>> at >>> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>> at >>>
Re: inheritance of Program interface in Program.java in org.apache.flink.api.common
then this.program of PackageProgram's object has null. [1] in run() of CliFrontend.java, invokes executeProgramBlocking(program, client, userParallelism) [2] and this method invokes client.runBlocking(program, parallelism) [3] and this method invokes runBlocking(prog.getPlanWithJars(), parallelism, prog.getSavepointPath()) that is overloaded function[4] and prog.getPlanWithJars() invokes getPlan() of PackagedProgram.java[5] and getPlan() invokes createPlanFromProgram(this.program, this.args); [6] and createPlanFromProgram's source is folowing private static Plan createPlanFromProgram(Program program, String[] options) throws ProgramInvocationException { try { return program.getPlan(options); } catch (Throwable t) { throw new ProgramInvocationException("Error while calling the program: " + t.getMessage(), t); } } as we checked, this.program has null. so if we invoke program.getPlan(options), exception will happen. but when i runned program, exception didn't occur. i'd appreciate if you explain this. [1] else if (hasMainMethod(mainClass)) { this.program = null; [2] if (options.getDetachedMode() || (yarnCluster != null yarnCluster.isDetached())) { exitCode = executeProgramDetached(program, client, userParallelism); } else { exitCode = executeProgramBlocking(program, client, userParallelism); } [3] protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) { LOG.info("Starting execution of program"); JobSubmissionResult result; try { result = client.runBlocking(program, parallelism); } [4] public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader()); if (prog.isUsingProgramEntryPoint()) { return runBlocking(prog.getPlanWithJars(), parallelism, prog.getSavepointPath()); } [5] public JobWithJars getPlanWithJars() throws ProgramInvocationException { if (isUsingProgramEntryPoint()) { return new JobWithJars(getPlan(), getAllLibraries(), classpaths, userCodeClassLoader); [6] private Plan getPlan() throws ProgramInvocationException { if (this.plan == null) { Thread.currentThread().setContextClassLoader(this.userCodeClassLoader); this.plan = createPlanFromProgram(this.program, this.args); } return this.plan; } -Original Message- From: "Ufuk Celebi"u...@apache.org To: user@flink.apache.org; "윤형덕"ynoo...@naver.com; Cc: Sent: 2016-05-20 (금) 19:30:01 Subject: Re: inheritance of Program interface in Program.java in org.apache.flink.api.common On Thu, May 19, 2016 at 4:46 PM, 윤형덕 ynoo...@naver.com wrote: how can this.mainClass that doesn't override getPlan method that is abstract method of Program interface(program.class) and has only static main method be instantiate as Program? This is only called if the class is actually a subclass of Program. That's why there is the `isAssignable` check. Otherwise, we check that there is a mainMethod (else if (hasMainMethod(mainClass)). If this is not the case, a ProgramInvocationException is thrown.
Logging Exceptions
Hello! Using flink 1.0.2, I noticed that exceptions thrown during a flink program would show up on the flink dashboard in the 'Exceptions' tab. That's great! However, I don't think flink currently logs this same exception. I was hoping there would be an equivalent `log.error` call so that third party logging frameworks can also act upon such errors. If this currently the known behavior, would it be troublesome to also make a `log.error` call around the code that is responsible for sending the exception to the dashboard? If there's a misconfiguration on my end, let me know! Thanks! David
Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)
The problem seems to occur quite often. Did you update your Flink version recently? If so, could you try to downgrade and see if the problem disappears. Is it otherwise possible that it is cause by faulty hardware? 2016-05-20 18:05 GMT+02:00 Flavio Pompermaier: > This time (Europed instead of Europe): > > java.lang.Exception: The data preparation for task 'CHAIN GroupReduce > (GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) -> Map (Key > Extractor)' , caused an error: Error obtaining the sorted input: Thread > 'SortMerger spilling thread' terminated due to an exception: The datetime > zone id 'Europd/Rome' is not recognised > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger spilling thread' terminated due to an exception: The > datetime zone id 'Europd/Rome' is not recognised > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) > at > org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger spilling thread' > terminated due to an exception: The datetime zone id 'Europd/Rome' is not > recognised > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) > Caused by: java.lang.IllegalArgumentException: The datetime zone id > 'Europd/Rome' is not recognised > at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229) > at > de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94) > at > de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74) > at > de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) > at > org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) > > > > On Fri, May 20, 2016 at 4:25 PM, Flavio Pompermaier > wrote: > >> This time another error (rerialization instead of serialization): >> >> com.esotericsoftware.kryo.KryoException: Unable to find class: >> it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo >> at >> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) >> at >> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) >> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) >> at >> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >> at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >> at >>
Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)
This time (Europed instead of Europe): java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(PassaggioWithComprAndVend.java:132)) -> Map (Key Extractor)' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: The datetime zone id 'Europd/Rome' is not recognised at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: The datetime zone id 'Europd/Rome' is not recognised at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619) at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079) at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450) ... 3 more Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: The datetime zone id 'Europd/Rome' is not recognised at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800) Caused by: java.lang.IllegalArgumentException: The datetime zone id 'Europd/Rome' is not recognised at org.joda.time.DateTimeZone.forID(DateTimeZone.java:229) at de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.readTimeZone(JodaDateTimeSerializer.java:94) at de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:74) at de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer.read(JodaDateTimeSerializer.java:59) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556) at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75) at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344) at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796) On Fri, May 20, 2016 at 4:25 PM, Flavio Pompermaierwrote: > This time another error (rerialization instead of serialization): > > com.esotericsoftware.kryo.KryoException: Unable to find class: > it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) > at > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) > at > org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at >
Re: Run jar job in local cluster
Nope - thanks for advice ;) I will try that tomorrow 2016-05-20 11:24 GMT+02:00 Ufuk Celebi: > I would suggest to set the log level to DEBUG and check the logs why > the client can not connect to your elastic search cluster. Did you try > that? > > On Mon, May 9, 2016 at 3:54 PM, rafal green > wrote: > > Dear Sir or Madam, > > > > Can you tell me why I have a problem with elasticsearch in local cluster? > > > > I analysed this example: > > > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html > > > > My flink and elasticsearch config are default (only I change node.name > to > > "node-1") > > > > This example run on my IntelliJIdea 15 but on local cluster I have a > > problem. Of course WordCount and SocketTextStreamWordCount works fine. > > > > > > I spend 2 days to try find solution (With uncle google ;) ) but It's not > > easy > > > > val config = new java.util.HashMap[String, String] > > config.put("bulk.flush.max.actions", "1") > > config.put("cluster.name", "elasticsearch") > > config.put("path.home", > > > "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2") > > > > val transports = new util.ArrayList[InetSocketAddress] > > transports.add(new > > InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300)) > > > > > > > > Error output: > > > > java.lang.RuntimeException: Client is not connected to any Elasticsearch > > nodes! > > at > > > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) > > at > > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) > > at > > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) > > at java.lang.Thread.run(Thread.java:745) > > > > 05/08/2016 22:57:02 Job execution switched to status FAILING. > > java.lang.RuntimeException: Client is not connected to any Elasticsearch > > nodes! > > at > > > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) > > at > > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) > > at > > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) > > at java.lang.Thread.run(Thread.java:745) > > 05/08/2016 22:57:02 Job execution switched to status FAILED. > > > > > > The program finished with the following exception: > > > > org.apache.flink.client.program.ProgramInvocationException: The program > > execution failed: Job execution failed. > > at org.apache.flink.client.program.Client.runBlocking(Client.java:381) > > at org.apache.flink.client.program.Client.runBlocking(Client.java:355) > > at > > > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) > > at > > > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541) > > at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69) > > at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > > > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > > at > > > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > > at > > > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860) > > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327) > > at > > > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187) > > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238) > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > > execution failed. > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807) > > at > > >
[RichFlattMapfunction] Configuration File
Hi folks I'm extending a RichFlatMapFunction in order to use states on a keyed stream. Concerning this i have two questions: 1. I have a var state_item: ValueState[Option[String]] as a local variable in this class. Initialized with state_item = getRuntimeContext.getState(new ValueStateDescriptor. in the open function. Is the field state_item for every key different? In other words if I have a key with val1 and val2 will these get two different states? 2. The open function contains a override def open(conf: Configuration) configuration. Is there a way to input a custom configuration in there? Thanks Simon
Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)
This time another error (rerialization instead of serialization): com.esotericsoftware.kryo.KryoException: Unable to find class: it.okkam.flink.entitons.*rerialization*.pojo.EntitonQuadPojo at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: it.okkam.flink.entitons.rerialization.pojo.EntitonQuadPojo at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136) ... 20 more On Fri, May 20, 2016 at 12:48 PM, Flavio Pompermaierwrote: > Hi Ufuk, > my records could be quite large Pojos (I think some MB). > The only thing I do to configure Kryo is: > > env.registerTypeWithKryoSerializer(DateTime.class, > JodaDateTimeSerializer.class ); > > Best, > Flavio > > On Fri, May 20, 2016 at 12:38 PM, Ufuk Celebi wrote: > >> @Stefano: the records are serialized anyway for batch jobs. The >> spilling deserializer is only relevant if single records are very >> large. How large are your records? In any case, I don't expect this to >> be the problem. >> >> @Flavio: The class name "typo" errors (Vdhicle instead of Vehicle and >> ttil instead of util) look like some kind of data corruption and would >> need further investigation. The other failure you reported might be >> related to this. As a starting point, how do you configure the Kryo >> serializer? >> >> On Fri, May 20, 2016 at 10:02 AM, Flavio Pompermaier >> wrote: >> > Today I've got this other strange error.. Obviously I don't have a >> > VdhicleEvent class, but a VehicleEvent class :( >> > >> > java.lang.RuntimeException: Cannot instantiate class. >> > at >> > >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) >> > at >> > >> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >> > at >> > >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >> > at >> > >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >> > at >> > >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >> > at >> > >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >> > at >> > >> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >> > at >>
Re: TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type
Ok..great to hear that! Thanks to all for the support On Fri, May 20, 2016 at 3:53 PM, Ufuk Celebiwrote: > On Fri, May 20, 2016 at 3:27 PM, Aljoscha Krettek > wrote: > > I think it might just be a warning. When using Kryo it is in the end a > > GenericTypeInformation but the TypeAnalyzer might still initially try to > > analyze it as a POJO. > > Yes, that's correct. > > To get back at the initial question: it's safe to ignore it then. ;) >
Re: TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type
On Fri, May 20, 2016 at 3:27 PM, Aljoscha Krettekwrote: > I think it might just be a warning. When using Kryo it is in the end a > GenericTypeInformation but the TypeAnalyzer might still initially try to > analyze it as a POJO. Yes, that's correct. To get back at the initial question: it's safe to ignore it then. ;)
Re: TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type
Indeed you can test this problem with: import org.apache.flink.api.java.ExecutionEnvironment; import org.joda.time.DateTime; import de.javakaffee.kryoserializers.jodatime.JodaDateTimeSerializer; public class DateTimeError { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class ); env.fromElements(DateTime.now(), DateTime.now()).print(); } } The very first line of the output is: 2016-05-20 15:39:02 INFO TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type So the program actually runs successfully but there's this annoying LOG statement :( On Fri, May 20, 2016 at 3:27 PM, Aljoscha Krettekwrote: > I think it might just be a warning. When using Kryo it is in the end a > GenericTypeInformation but the TypeAnalyzer might still initially try to > analyze it as a POJO. > > On Fri, 20 May 2016 at 12:51 Ufuk Celebi wrote: > >> I tried to reproduce this and if you set up the depdency >> >> >>de.javakaffee >>kryo-serializers >>0.28 >> >> >> and register the Kryo type as suggested you should not see any log >> messages (e.g. the type should be treated as a generic type and not a >> pojo type). >> >> Does the program successfully execute for you? >> >> – Ufuk >> >> >> On Thu, May 19, 2016 at 5:49 PM, Flavio Pompermaier >> wrote: >> > Hi to all, >> > >> > I'm using Flink 1.0.2 and testing the job I discovered that I have a >> lot of >> > log with this error: >> > >> > TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO >> type >> > >> > initially I though I forgot to properly migrate my code from 0.10.x to >> 1.0.x >> > as stated in [1] but the I checked my code and i correctly build the >> > ExecutionEnvironment as: >> > >> > env = ExecutionEnvironment.createLocalEnvironment(c); >> > env.registerTypeWithKryoSerializer(DateTime.class, >> > JodaDateTimeSerializer.class ); >> > >> > So, is this log to ignore or I have a problem? Should it be put to >> debug or >> > not printed at all? >> > >> > [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x >> > >> > Best, >> > Flavio >> >
Re: flink async snapshots
Thats correct. With the fully async option the checkpoints take longer but you don't impact ongoing processing of elements. With the semi-async method snapshots take a shorter time but during the synchronous part no element processing can happen. On Fri, 20 May 2016 at 15:04 Abhishek Singhwrote: > Yes. Thanks for explaining. > > On Friday, May 20, 2016, Ufuk Celebi wrote: > >> On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singh >> wrote: >> > If you can take atomic in-memory copies, then it works (at the cost of >> > doubling your instantaneous memory). For larger state (say rocks DB), >> won’t >> > you have to stop the world (atomic snapshot) and make a copy? Doesn’t >> that >> > make it synchronous, instead of background/async? >> >> Hey Abhishek, >> >> that's correct. There are two variants for RocksDB: >> >> - semi-async (default): snapshot is taking via RocksDB backup feature >> to backup to a directory (sync). This is then copied to the final >> checkpoint location (async, e.g copy to HDFS). >> >> - fully-async: snapshot is taking via RocksDB snapshot feature (sync, >> but no full copy and essentially "free"). With this snapshot we >> iterate over all k/v-pairs and copy them to the final checkpoint >> location (async, e.g. copy to HDFS). >> >> You enable the second variant via: >> rocksDbBackend.enableFullyAsyncSnapshots(); >> >> This is only part of the 1.1-SNAPSHOT version though. >> >> I'm not too familiar with the performance characteristics of both >> variants, but maybe Aljoscha can chime in. >> >> Does this clarify things for you? >> >> – Ufuk >> >
Re: TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type
I think it might just be a warning. When using Kryo it is in the end a GenericTypeInformation but the TypeAnalyzer might still initially try to analyze it as a POJO. On Fri, 20 May 2016 at 12:51 Ufuk Celebiwrote: > I tried to reproduce this and if you set up the depdency > > >de.javakaffee >kryo-serializers >0.28 > > > and register the Kryo type as suggested you should not see any log > messages (e.g. the type should be treated as a generic type and not a > pojo type). > > Does the program successfully execute for you? > > – Ufuk > > > On Thu, May 19, 2016 at 5:49 PM, Flavio Pompermaier > wrote: > > Hi to all, > > > > I'm using Flink 1.0.2 and testing the job I discovered that I have a lot > of > > log with this error: > > > > TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO > type > > > > initially I though I forgot to properly migrate my code from 0.10.x to > 1.0.x > > as stated in [1] but the I checked my code and i correctly build the > > ExecutionEnvironment as: > > > > env = ExecutionEnvironment.createLocalEnvironment(c); > > env.registerTypeWithKryoSerializer(DateTime.class, > > JodaDateTimeSerializer.class ); > > > > So, is this log to ignore or I have a problem? Should it be put to debug > or > > not printed at all? > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x > > > > Best, > > Flavio >
Re: join stream with last available element of other stream
Aljoscha answered this in the other thread you started for this ("'Last One' Window") On Fri, May 20, 2016 at 12:43 PM, Artem Bogachevwrote: > Hi, > > I’ve faced a problem trying to model our platform using Flink Streams. > > Let me describe our model: > > // Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc. > val realData: DataStream[(K, V)] = env.addSource(…) > > // Stream of forecasts (same format) based on some window aggregates > val forecastedData: DataStream[(K, V)] = > realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new > Forecaster(…)) > > I would like to construct a stream errors, which values are just differences > between realData stream and the latest available forecast for this key in > forecastedData stream > > // I suppose this solution does not guarantee that all realData values will > have corresponding forecast > val errors: DataStream[(K, V)] = > realData.join(forecastedData).where(0).equal(0)… > > Could you give an advice on how to implement such pattern? Do I have to > write custom windows? > > Artem
Re: custom sources
On Fri, May 20, 2016 at 3:12 PM, Abhishek Singhwrote: > Thanks. I am still in theory/evaluation mode. Will try to code this up to > see if checkpoint will become an issue. I do have a high rate of ingest and > lots of in flight data. Hopefully flink back pressure keeps this nicely > bounded. Sure! Feel free to post any questions that you have during you evaluation. If you are interested in how back pressure is propagated you can have at this blog post here: http://data-artisans.com/how-flink-handles-backpressure/
Re: custom sources
Thanks. I am still in theory/evaluation mode. Will try to code this up to see if checkpoint will become an issue. I do have a high rate of ingest and lots of in flight data. Hopefully flink back pressure keeps this nicely bounded. I doubt it will be a problem for me - because even spark is writing all in-flight data to disk - because all partitioning goes thru disk and is inline - ie sync. Flink disk usage is write only and for failure case only. Looks pretty compelling so far. On Friday, May 20, 2016, Ufuk Celebiwrote: > On Thu, May 19, 2016 at 7:48 PM, Abhishek R. Singh > > wrote: > > There seems to be some relationship between watermarks, triggers and > > checkpoint that is someone not being leveraged. > > Checkpointing is independent of this, yes. Did the state size become a > problem for your use case? There are various users running Flink with > very large state sizes without any issues. The recommended state > backend for these use cases is the RocksDB backend. > > The barriers are triggered at the sources and flow with the data > ( > https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html > ). > Everything in-flight after the barrier is not relevant for the > checkpoint. We are only interested in a consistent state snapshot. >
Re: flink async snapshots
Yes. Thanks for explaining. On Friday, May 20, 2016, Ufuk Celebiwrote: > On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singh > > wrote: > > If you can take atomic in-memory copies, then it works (at the cost of > > doubling your instantaneous memory). For larger state (say rocks DB), > won’t > > you have to stop the world (atomic snapshot) and make a copy? Doesn’t > that > > make it synchronous, instead of background/async? > > Hey Abhishek, > > that's correct. There are two variants for RocksDB: > > - semi-async (default): snapshot is taking via RocksDB backup feature > to backup to a directory (sync). This is then copied to the final > checkpoint location (async, e.g copy to HDFS). > > - fully-async: snapshot is taking via RocksDB snapshot feature (sync, > but no full copy and essentially "free"). With this snapshot we > iterate over all k/v-pairs and copy them to the final checkpoint > location (async, e.g. copy to HDFS). > > You enable the second variant via: > rocksDbBackend.enableFullyAsyncSnapshots(); > > This is only part of the 1.1-SNAPSHOT version though. > > I'm not too familiar with the performance characteristics of both > variants, but maybe Aljoscha can chime in. > > Does this clarify things for you? > > – Ufuk >
Re: unsubscribe
You have to send this to user-unsubscr...@flink.apache.org You've sent this this to user@f.a.o instead. On Fri, May 20, 2016 at 1:01 PM, Christophe Salperwyckwrote: >
unsubscribe
Re: custom sources
On Thu, May 19, 2016 at 7:48 PM, Abhishek R. Singhwrote: > There seems to be some relationship between watermarks, triggers and > checkpoint that is someone not being leveraged. Checkpointing is independent of this, yes. Did the state size become a problem for your use case? There are various users running Flink with very large state sizes without any issues. The recommended state backend for these use cases is the RocksDB backend. The barriers are triggered at the sources and flow with the data (https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html). Everything in-flight after the barrier is not relevant for the checkpoint. We are only interested in a consistent state snapshot.
Re: TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type
I tried to reproduce this and if you set up the depdency de.javakaffee kryo-serializers 0.28 and register the Kryo type as suggested you should not see any log messages (e.g. the type should be treated as a generic type and not a pojo type). Does the program successfully execute for you? – Ufuk On Thu, May 19, 2016 at 5:49 PM, Flavio Pompermaierwrote: > Hi to all, > > I'm using Flink 1.0.2 and testing the job I discovered that I have a lot of > log with this error: > > TypeExtractor:1672 - class org.joda.time.DateTime is not a valid POJO type > > initially I though I forgot to properly migrate my code from 0.10.x to 1.0.x > as stated in [1] but the I checked my code and i correctly build the > ExecutionEnvironment as: > > env = ExecutionEnvironment.createLocalEnvironment(c); > env.registerTypeWithKryoSerializer(DateTime.class, > JodaDateTimeSerializer.class ); > > So, is this log to ignore or I have a problem? Should it be put to debug or > not printed at all? > > [1] > https://cwiki.apache.org/confluence/display/FLINK/Migration+Guide%3A+0.10.x+to+1.0.x > > Best, > Flavio
Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)
Hi Ufuk, my records could be quite large Pojos (I think some MB). The only thing I do to configure Kryo is: env.registerTypeWithKryoSerializer(DateTime.class, JodaDateTimeSerializer.class ); Best, Flavio On Fri, May 20, 2016 at 12:38 PM, Ufuk Celebiwrote: > @Stefano: the records are serialized anyway for batch jobs. The > spilling deserializer is only relevant if single records are very > large. How large are your records? In any case, I don't expect this to > be the problem. > > @Flavio: The class name "typo" errors (Vdhicle instead of Vehicle and > ttil instead of util) look like some kind of data corruption and would > need further investigation. The other failure you reported might be > related to this. As a starting point, how do you configure the Kryo > serializer? > > On Fri, May 20, 2016 at 10:02 AM, Flavio Pompermaier > wrote: > > Today I've got this other strange error.. Obviously I don't have a > > VdhicleEvent class, but a VehicleEvent class :( > > > > java.lang.RuntimeException: Cannot instantiate class. > > at > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) > > at > > > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > > at > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) > > at > > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > > at > > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > > at > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > > at > > > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) > > at > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) > > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.ClassNotFoundException: > > it.okkam.flink.test.model.pojo.VdhicleEvent > > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > at java.lang.Class.forName0(Native Method) > > at java.lang.Class.forName(Class.java:348) > > at > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) > > ... 10 more > > > > > > Thanks in advance, > > Flavio > > > > > > On Mon, May 16, 2016 at 10:08 AM, Stefano Bortoli > > wrote: > >> > >> Hi Flavio, Till, > >> > >> do you think this can be possibly related to the serialization problem > >> caused by 'the management' of Kryo serializer buffer when spilling on > disk? > >> We are definitely going beyond what is managed in memory with this task. > >> > >> saluti, > >> Stefano > >> > >> 2016-05-16 9:44 GMT+02:00 Flavio Pompermaier : > >>> > >>> That exception showed just once, but the following happens randomly > (if I > >>> re-run the job after stopping and restartign the cluster it doesn't > show up > >>> usually): > >>> > >>> Caused by: java.io.IOException: Serializer consumed more bytes than the > >>> record had. This indicates broken serialization. If you are using > custom > >>> serialization types (Value or Writable), check their serialization > methods. > >>> If you are using a Kryo-serialized type, check the corresponding Kryo > >>> serializer. > >>> at > >>> > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) > >>> at > >>> > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > >>> at > >>> > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > >>> at > >>> > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > >>> at > >>> > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) > >>> at > >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) > >>> at > >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >>> at java.lang.Thread.run(Thread.java:745) > >>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 > >>> at java.util.ArrayList.elementData(ArrayList.java:418) > >>> at java.util.ArrayList.get(ArrayList.java:431) > >>> at >
join stream with last available element of other stream
Hi, I’ve faced a problem trying to model our platform using Flink Streams. Let me describe our model: // Stream of data, ex. stocks: (AAPL, 100.0), (GZMP, 100.0) etc. val realData: DataStream[(K, V)] = env.addSource(…) // Stream of forecasts (same format) based on some window aggregates val forecastedData: DataStream[(K, V)] = realData.keyBy(1).timeWindow(Time.minutes(FORECAST_INTERVAL)).apply(new Forecaster(…)) I would like to construct a stream errors, which values are just differences between realData stream and the latest available forecast for this key in forecastedData stream // I suppose this solution does not guarantee that all realData values will have corresponding forecast val errors: DataStream[(K, V)] = realData.join(forecastedData).where(0).equal(0)… Could you give an advice on how to implement such pattern? Do I have to write custom windows? Artem
Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)
@Stefano: the records are serialized anyway for batch jobs. The spilling deserializer is only relevant if single records are very large. How large are your records? In any case, I don't expect this to be the problem. @Flavio: The class name "typo" errors (Vdhicle instead of Vehicle and ttil instead of util) look like some kind of data corruption and would need further investigation. The other failure you reported might be related to this. As a starting point, how do you configure the Kryo serializer? On Fri, May 20, 2016 at 10:02 AM, Flavio Pompermaierwrote: > Today I've got this other strange error.. Obviously I don't have a > VdhicleEvent class, but a VehicleEvent class :( > > java.lang.RuntimeException: Cannot instantiate class. > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) > at > org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) > at > org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) > at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) > at > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassNotFoundException: > it.okkam.flink.test.model.pojo.VdhicleEvent > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) > ... 10 more > > > Thanks in advance, > Flavio > > > On Mon, May 16, 2016 at 10:08 AM, Stefano Bortoli > wrote: >> >> Hi Flavio, Till, >> >> do you think this can be possibly related to the serialization problem >> caused by 'the management' of Kryo serializer buffer when spilling on disk? >> We are definitely going beyond what is managed in memory with this task. >> >> saluti, >> Stefano >> >> 2016-05-16 9:44 GMT+02:00 Flavio Pompermaier : >>> >>> That exception showed just once, but the following happens randomly (if I >>> re-run the job after stopping and restartign the cluster it doesn't show up >>> usually): >>> >>> Caused by: java.io.IOException: Serializer consumed more bytes than the >>> record had. This indicates broken serialization. If you are using custom >>> serialization types (Value or Writable), check their serialization methods. >>> If you are using a Kryo-serialized type, check the corresponding Kryo >>> serializer. >>> at >>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >>> at >>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >>> at >>> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >>> at >>> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >>> at >>> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >>> at >>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >>> at >>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >>> at java.util.ArrayList.elementData(ArrayList.java:418) >>> at java.util.ArrayList.get(ArrayList.java:431) >>> at >>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >>> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >>> at >>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >>> at >>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >>> at >>>
Re: inheritance of Program interface in Program.java in org.apache.flink.api.common
On Thu, May 19, 2016 at 4:46 PM, 윤형덕wrote: > > how can this.mainClass that doesn't override getPlan method that is abstract method of Program interface(program.class) and has only static main method be instantiate as Program? This is only called if the class is actually a subclass of Program. That's why there is the `isAssignable` check. Otherwise, we check that there is a mainMethod (else if (hasMainMethod(mainClass)). If this is not the case, a ProgramInvocationException is thrown.
Re: hot deployment of stream processing(event at a time) jobs
I think you are looking for the savepoints feature: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/savepoints.html The general idea is to trigger a savepoint, start the second job from this savepoint (reading from the same topic), and then eventually cancel the first job. Depending on the sink, the second job needs to produce its result to a different end point though. Does this help? – Ufuk On Thu, May 19, 2016 at 8:18 AM, Igor Bermanwrote: > Hi, > I have simple job that consumes events from Kafka topic and process events > with filter/flat-map only(i.e. no aggregation, no windows, no private state) > > The most important constraint in my setup is to continue processing no > matter what(i.e. stopping for few seconds to cancel job and restart it with > new version is not an option because it will take few seconds) > > I was thinking about Blue/Green deployment concept and will start new job > with new fat-jar while old job still running and then eventually cancel old > job > > How Flink will handle such scenario? What will happen regarding semantics of > event processing in transition time? > > I know that core Kafka has consumer re-balancing mechanism, but I'm not too > familiar with it > > any thought will be highly appreciated > > thanks in advance > Igor > > >
Re: flink-kafka-connector offset management
Hey Arun! How did you configure your Kafka source? If the offset has been committed and you configured the source to read from the latest offset, the message should not be re-processed. – Ufuk On Fri, May 13, 2016 at 2:19 PM, Arun Balanwrote: > Hi, I am trying to use the flink-kafka-connector and I notice that every > time I restart my application it re-reads the last message on the kafka > topic. So if the latest offset on the topic is 10, then when the application > is restarted, kafka-connector will re-read message 10. Why is this the > behavior? I would assume that the last message has already been read and > offset committed. I require that messages that are already processed from > the topic not be reprocessed. Any insight would be helpful. > > Thanks > Arun Balan
Re: Java heap space error
Indeed I can confirm that I resolved this problem reducing the number of slots per Task Manager (and thus incrementing the available memory of each task)! However from time to time I have serialization issue that I don't know where they come from..it looks like the PjoSerialization has some issue somewhere Thanks anyway Ufuk! On Fri, May 20, 2016 at 12:04 PM, Ufuk Celebiwrote: > The job is running out of heap memory, probably because a user > function needs a lot of it (the parquet thrift sink?). > > You can try to work around it by reducing the amount of managed memory > in order to leave more heap space available. > > On Thu, May 12, 2016 at 6:55 PM, Flavio Pompermaier > wrote: > > Hi to all, > > running a job that writes parquet-thrift files I had this exception (in a > > Task Manager): > > > > io.netty.channel.nio.NioEventLoop - > Unexpected > > exception in the selector loop. > > java.lang.OutOfMemoryError: Java heap space > > 2016-05-12 18:49:11,302 WARN > > org.jboss.netty.channel.socket.nio.AbstractNioSelector- > Unexpected > > exception in the selector loop. > > java.lang.OutOfMemoryError: Java heap space > > 2016-05-12 18:49:11,302 ERROR > > org.apache.flink.runtime.io.disk.iomanager.IOManager - The > handler > > of the request-complete-callback threw an exception: Java heap space > > java.lang.OutOfMemoryError: Java heap space > > 2016-05-12 18:49:11,303 ERROR > > org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O > reading > > thread encountered an error: segment has been freed > > java.lang.IllegalStateException: segment has been freed > > at > > > org.apache.flink.core.memory.HeapMemorySegment.wrap(HeapMemorySegment.java:85) > > at > > > org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest.read(AsynchronousFileIOChannel.java:310) > > at > > > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:396) > > 2016-05-12 18:49:11,303 ERROR > > org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O > reading > > thread encountered an error: segment has been freed > > > > > > Any idea of what could be the cause? > > > > Best, > > Flavio >
Re: Java heap space error
The job is running out of heap memory, probably because a user function needs a lot of it (the parquet thrift sink?). You can try to work around it by reducing the amount of managed memory in order to leave more heap space available. On Thu, May 12, 2016 at 6:55 PM, Flavio Pompermaierwrote: > Hi to all, > running a job that writes parquet-thrift files I had this exception (in a > Task Manager): > > io.netty.channel.nio.NioEventLoop - Unexpected > exception in the selector loop. > java.lang.OutOfMemoryError: Java heap space > 2016-05-12 18:49:11,302 WARN > org.jboss.netty.channel.socket.nio.AbstractNioSelector- Unexpected > exception in the selector loop. > java.lang.OutOfMemoryError: Java heap space > 2016-05-12 18:49:11,302 ERROR > org.apache.flink.runtime.io.disk.iomanager.IOManager - The handler > of the request-complete-callback threw an exception: Java heap space > java.lang.OutOfMemoryError: Java heap space > 2016-05-12 18:49:11,303 ERROR > org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O reading > thread encountered an error: segment has been freed > java.lang.IllegalStateException: segment has been freed > at > org.apache.flink.core.memory.HeapMemorySegment.wrap(HeapMemorySegment.java:85) > at > org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest.read(AsynchronousFileIOChannel.java:310) > at > org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$ReaderThread.run(IOManagerAsync.java:396) > 2016-05-12 18:49:11,303 ERROR > org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O reading > thread encountered an error: segment has been freed > > > Any idea of what could be the cause? > > Best, > Flavio
Re: Run jar job in local cluster
I would suggest to set the log level to DEBUG and check the logs why the client can not connect to your elastic search cluster. Did you try that? On Mon, May 9, 2016 at 3:54 PM, rafal greenwrote: > Dear Sir or Madam, > > Can you tell me why I have a problem with elasticsearch in local cluster? > > I analysed this example: > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html > > My flink and elasticsearch config are default (only I change node.name to > "node-1") > > This example run on my IntelliJIdea 15 but on local cluster I have a > problem. Of course WordCount and SocketTextStreamWordCount works fine. > > > I spend 2 days to try find solution (With uncle google ;) ) but It's not > easy > > val config = new java.util.HashMap[String, String] > config.put("bulk.flush.max.actions", "1") > config.put("cluster.name", "elasticsearch") > config.put("path.home", > "/media/user/e5e05ab5-28f3-4cee-a57c-444e32b99f04/thesis/elasticsearch-2.3.2") > > val transports = new util.ArrayList[InetSocketAddress] > transports.add(new > InetSocketAddress(InetAddress.getByName("127.0.0.1"),9300)) > > > > Error output: > > java.lang.RuntimeException: Client is not connected to any Elasticsearch > nodes! > at > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) > at java.lang.Thread.run(Thread.java:745) > > 05/08/2016 22:57:02 Job execution switched to status FAILING. > java.lang.RuntimeException: Client is not connected to any Elasticsearch > nodes! > at > org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:579) > at java.lang.Thread.run(Thread.java:745) > 05/08/2016 22:57:02 Job execution switched to status FAILED. > > > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Job execution failed. > at org.apache.flink.client.program.Client.runBlocking(Client.java:381) > at org.apache.flink.client.program.Client.runBlocking(Client.java:355) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:541) > at com.pl.greeny.flink.TwitterAnalysis$.main(TwitterAnalysis.scala:69) > at com.pl.greeny.flink.TwitterAnalysis.main(TwitterAnalysis.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:860) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:327) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1187) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1238) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:807) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:753) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at >
Re: Weird Kryo exception (Unable to find class: java.ttil.HashSet)
Today I've got this other strange error.. Obviously I don't have a VdhicleEvent class, but a VehicleEvent class :( java.lang.RuntimeException: Cannot instantiate class. at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407) at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: it.okkam.flink.test.model.pojo.VdhicleEvent at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405) ... 10 more Thanks in advance, Flavio On Mon, May 16, 2016 at 10:08 AM, Stefano Bortoliwrote: > Hi Flavio, Till, > > do you think this can be possibly related to the serialization problem > caused by 'the management' of Kryo serializer buffer when spilling on disk? > We are definitely going beyond what is managed in memory with this task. > > saluti, > Stefano > > 2016-05-16 9:44 GMT+02:00 Flavio Pompermaier : > >> That exception showed just once, but the following happens randomly (if I >> re-run the job after stopping and restartign the cluster it doesn't show up >> usually): >> >> Caused by: java.io.IOException: Serializer consumed more bytes than the >> record had. This indicates broken serialization. If you are using custom >> serialization types (Value or Writable), check their serialization methods. >> If you are using a Kryo-serialized type, check the corresponding Kryo >> serializer. >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142) >> at >> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65) >> at >> org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) >> at >> org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73) >> at >> org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101) >> at >> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480) >> at >> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2 >> at java.util.ArrayList.elementData(ArrayList.java:418) >> at java.util.ArrayList.get(ArrayList.java:431) >> at >> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) >> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) >> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) >> at >> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228) >> at >> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135) >> at >> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >> at >> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) >> at >> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124) >> >> >> On Sun, May 15, 2016 at 10:58 AM, Flavio Pompermaier < >> pomperma...@okkam.it> wrote: >> >>> Hi to all, >>> in my last run of a job I received this weird Kryo
Re: flink async snapshots
On Thu, May 19, 2016 at 8:54 PM, Abhishek R. Singhwrote: > If you can take atomic in-memory copies, then it works (at the cost of > doubling your instantaneous memory). For larger state (say rocks DB), won’t > you have to stop the world (atomic snapshot) and make a copy? Doesn’t that > make it synchronous, instead of background/async? Hey Abhishek, that's correct. There are two variants for RocksDB: - semi-async (default): snapshot is taking via RocksDB backup feature to backup to a directory (sync). This is then copied to the final checkpoint location (async, e.g copy to HDFS). - fully-async: snapshot is taking via RocksDB snapshot feature (sync, but no full copy and essentially "free"). With this snapshot we iterate over all k/v-pairs and copy them to the final checkpoint location (async, e.g. copy to HDFS). You enable the second variant via: rocksDbBackend.enableFullyAsyncSnapshots(); This is only part of the 1.1-SNAPSHOT version though. I'm not too familiar with the performance characteristics of both variants, but maybe Aljoscha can chime in. Does this clarify things for you? – Ufuk