Re: serialization issue in streaming job run with scala Future
Hi Yuval - Here's a brief summary f what we are trying to do .. At the library level we have this .. def buildExecutionGraph(): Unit def executeStreamingQueries(env: StreamExecutionEnvironment): JobExecutionResult = { buildExecutionGraph() env.execute(s"Executing $streamletRef") } and we do the following .. // note this ctx is created outside the Future val jobResult = Future(createLogic.executeStreamingQueries(ctx.env)) and at the application level we have something like this .. override def buildExecutionGraph = { val rides: DataStream[TaxiRide] = readStream(inTaxiRide) // reads from Kafka .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId") val processed: DataStream[TaxiRideFare] = rides .connect(fares) .flatMap(new EnrichmentFunction) writeStream(out, processed) // writes to Kafka } It fails only when we use the Future, otherwise it works .. regards. On Thu, Sep 19, 2019 at 1:16 PM Yuval Itzchakov wrote: > Debshish, could you share an example of before and after of your classes > for future reference? > > On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, > wrote: > >> We solved the problem of serialization by making some things transient >> which were being captured as part of the closure. So we no longer have >> serialization errors. Everything works properly without the future. >> >> I realize that because of statics concurrent job submission will be an >> issue. But we are submitting one job only - the difference is that it's >> through a Future. So there is no concurrent submission unless I am missing >> something. >> >> regards. >> >> On Thu, Sep 19, 2019 at 12:54 PM Biao Liu wrote: >> >>> Hi Debasish, >>> >>> I think there is something critical of your usage hided. It might help >>> if you could provide more details. >>> >>> It still confuses me how you solve the serialization issue. Why the >>> non-transient fields only affects serialization in a future? >>> >>> WRT this ProgramAbortException issue, do you submit jobs concurrently in >>> one process? >>> Currently job submission is not thread-safe. It relies on some static >>> variables which could be affected by other concurrent submissions in the >>> same process. >>> Asking this because usually job submission is not through >>> OptimizerPlanEnvironment which appears in your exception stack trace. >>> >>> Thanks, >>> Biao /'bɪ.aʊ/ >>> >>> >>> >>> On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh >>> wrote: >>> I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following .. def execute(..) = { // this does all data stream manipulation, joins etc. buildComputationGraph() // submits for execution with StreamExecutionEnvironment env.execute(..) } and we want to do .. val jobExecutionResultFuture = Future(execute(..)) and this gives that exception. regards. On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch wrote: > Hi Debasish, > > Have you taken a look at the AsyncIO API for running async operations? > I think this is the preferred way of doing it. [1] > So it would look something like this: > > class AsyncDatabaseRequest extends AsyncFunction[String, (String, > String)] { > > /** The database specific client that can issue concurrent requests > with callbacks */ > lazy val client: DatabaseClient = new DatabaseClient(host, post, > credentials) > > /** The context used for the future callbacks */ > implicit lazy val executor: ExecutionContext = > ExecutionContext.fromExecutor(Executors.directExecutor()) > > > override def asyncInvoke(str: String, resultFuture: > ResultFuture[(String, String)]): Unit = { > > // issue the asynchronous request, receive a future for the > resultval resultFutureRequested: Future[String] = > client.query(str) > > // set the callback to be executed once the request by the client > is complete// the callback simply forwards the result to the > result futureresultFutureRequested.onSuccess { > case result: String => resultFuture.complete(Iterable((str, > result))) > } > }} > > // create the original streamval stream: DataStream[String] = ... > // apply the async I/O transformationval resultStream: > DataStream[(String, String)] = > AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), > 1000, TimeUnit.MILLISECONDS, 100) > > > Thanks, > Rafi > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operat
Re: serialization issue in streaming job run with scala Future
Debshish, could you share an example of before and after of your classes for future reference? On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, wrote: > We solved the problem of serialization by making some things transient > which were being captured as part of the closure. So we no longer have > serialization errors. Everything works properly without the future. > > I realize that because of statics concurrent job submission will be an > issue. But we are submitting one job only - the difference is that it's > through a Future. So there is no concurrent submission unless I am missing > something. > > regards. > > On Thu, Sep 19, 2019 at 12:54 PM Biao Liu wrote: > >> Hi Debasish, >> >> I think there is something critical of your usage hided. It might help if >> you could provide more details. >> >> It still confuses me how you solve the serialization issue. Why the >> non-transient fields only affects serialization in a future? >> >> WRT this ProgramAbortException issue, do you submit jobs concurrently in >> one process? >> Currently job submission is not thread-safe. It relies on some static >> variables which could be affected by other concurrent submissions in the >> same process. >> Asking this because usually job submission is not through >> OptimizerPlanEnvironment which appears in your exception stack trace. >> >> Thanks, >> Biao /'bɪ.aʊ/ >> >> >> >> On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh >> wrote: >> >>> I think what you are pointing at is asynchronous datastream operations. >>> In our case we want to submit the entire job in a Future. Something like >>> the following .. >>> >>> def execute(..) = { >>> // this does all data stream manipulation, joins etc. >>> buildComputationGraph() >>> >>> // submits for execution with StreamExecutionEnvironment >>> env.execute(..) >>> } >>> >>> and we want to do .. >>> >>> val jobExecutionResultFuture = Future(execute(..)) >>> >>> and this gives that exception. >>> >>> regards. >>> >>> On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch >>> wrote: >>> Hi Debasish, Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1] So it would look something like this: class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { /** The database specific client that can issue concurrent requests with callbacks */ lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials) /** The context used for the future callbacks */ implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = { // issue the asynchronous request, receive a future for the result val resultFutureRequested: Future[String] = client.query(str) // set the callback to be executed once the request by the client is complete// the callback simply forwards the result to the result futureresultFutureRequested.onSuccess { case result: String => resultFuture.complete(Iterable((str, result))) } }} // create the original streamval stream: DataStream[String] = ... // apply the async I/O transformationval resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100) Thanks, Rafi [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh < ghosh.debas...@gmail.com> wrote: > ok, the above problem was due to some serialization issues which we > fixed by marking some of the things transient. This fixes the > serialization > issues .. But now when I try to execute in a Future I hit upon this .. > > > *java.util.concurrent.ExecutionException: Boxed Error* at > scala.concurrent.impl.Promise$.resolver(Promise.scala:87) > at > scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > at scala.concurrent.Promise.complete(Promise.scala:53) > at scala.concurrent.Promise.complete$(Promise.scala:52) > at > scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) > at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > at > java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > ja
Re: serialization issue in streaming job run with scala Future
We solved the problem of serialization by making some things transient which were being captured as part of the closure. So we no longer have serialization errors. Everything works properly without the future. I realize that because of statics concurrent job submission will be an issue. But we are submitting one job only - the difference is that it's through a Future. So there is no concurrent submission unless I am missing something. regards. On Thu, Sep 19, 2019 at 12:54 PM Biao Liu wrote: > Hi Debasish, > > I think there is something critical of your usage hided. It might help if > you could provide more details. > > It still confuses me how you solve the serialization issue. Why the > non-transient fields only affects serialization in a future? > > WRT this ProgramAbortException issue, do you submit jobs concurrently in > one process? > Currently job submission is not thread-safe. It relies on some static > variables which could be affected by other concurrent submissions in the > same process. > Asking this because usually job submission is not through > OptimizerPlanEnvironment which appears in your exception stack trace. > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh > wrote: > >> I think what you are pointing at is asynchronous datastream operations. >> In our case we want to submit the entire job in a Future. Something like >> the following .. >> >> def execute(..) = { >> // this does all data stream manipulation, joins etc. >> buildComputationGraph() >> >> // submits for execution with StreamExecutionEnvironment >> env.execute(..) >> } >> >> and we want to do .. >> >> val jobExecutionResultFuture = Future(execute(..)) >> >> and this gives that exception. >> >> regards. >> >> On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch wrote: >> >>> Hi Debasish, >>> >>> Have you taken a look at the AsyncIO API for running async operations? I >>> think this is the preferred way of doing it. [1] >>> So it would look something like this: >>> >>> class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { >>> >>> /** The database specific client that can issue concurrent requests >>> with callbacks */ >>> lazy val client: DatabaseClient = new DatabaseClient(host, post, >>> credentials) >>> >>> /** The context used for the future callbacks */ >>> implicit lazy val executor: ExecutionContext = >>> ExecutionContext.fromExecutor(Executors.directExecutor()) >>> >>> >>> override def asyncInvoke(str: String, resultFuture: >>> ResultFuture[(String, String)]): Unit = { >>> >>> // issue the asynchronous request, receive a future for the result >>> val resultFutureRequested: Future[String] = client.query(str) >>> >>> // set the callback to be executed once the request by the client >>> is complete// the callback simply forwards the result to the result >>> futureresultFutureRequested.onSuccess { >>> case result: String => resultFuture.complete(Iterable((str, >>> result))) >>> } >>> }} >>> >>> // create the original streamval stream: DataStream[String] = ... >>> // apply the async I/O transformationval resultStream: DataStream[(String, >>> String)] = >>> AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, >>> TimeUnit.MILLISECONDS, 100) >>> >>> >>> Thanks, >>> Rafi >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api >>> >>> On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh >>> wrote: >>> ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this .. *java.util.concurrent.ExecutionException: Boxed Error* at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at scala.concurrent.Promise.complete(Promise.scala:53) at scala.concurrent.Promise.complete$(Promise.scala:52) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) *Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$Progra
Re: serialization issue in streaming job run with scala Future
Hi Debasish, I think there is something critical of your usage hided. It might help if you could provide more details. It still confuses me how you solve the serialization issue. Why the non-transient fields only affects serialization in a future? WRT this ProgramAbortException issue, do you submit jobs concurrently in one process? Currently job submission is not thread-safe. It relies on some static variables which could be affected by other concurrent submissions in the same process. Asking this because usually job submission is not through OptimizerPlanEnvironment which appears in your exception stack trace. Thanks, Biao /'bɪ.aʊ/ On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh wrote: > I think what you are pointing at is asynchronous datastream operations. In > our case we want to submit the entire job in a Future. Something like the > following .. > > def execute(..) = { > // this does all data stream manipulation, joins etc. > buildComputationGraph() > > // submits for execution with StreamExecutionEnvironment > env.execute(..) > } > > and we want to do .. > > val jobExecutionResultFuture = Future(execute(..)) > > and this gives that exception. > > regards. > > On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch wrote: > >> Hi Debasish, >> >> Have you taken a look at the AsyncIO API for running async operations? I >> think this is the preferred way of doing it. [1] >> So it would look something like this: >> >> class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { >> >> /** The database specific client that can issue concurrent requests with >> callbacks */ >> lazy val client: DatabaseClient = new DatabaseClient(host, post, >> credentials) >> >> /** The context used for the future callbacks */ >> implicit lazy val executor: ExecutionContext = >> ExecutionContext.fromExecutor(Executors.directExecutor()) >> >> >> override def asyncInvoke(str: String, resultFuture: >> ResultFuture[(String, String)]): Unit = { >> >> // issue the asynchronous request, receive a future for the result >> val resultFutureRequested: Future[String] = client.query(str) >> >> // set the callback to be executed once the request by the client is >> complete// the callback simply forwards the result to the result >> futureresultFutureRequested.onSuccess { >> case result: String => resultFuture.complete(Iterable((str, >> result))) >> } >> }} >> >> // create the original streamval stream: DataStream[String] = ... >> // apply the async I/O transformationval resultStream: DataStream[(String, >> String)] = >> AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, >> TimeUnit.MILLISECONDS, 100) >> >> >> Thanks, >> Rafi >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api >> >> On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh >> wrote: >> >>> ok, the above problem was due to some serialization issues which we >>> fixed by marking some of the things transient. This fixes the serialization >>> issues .. But now when I try to execute in a Future I hit upon this .. >>> >>> >>> *java.util.concurrent.ExecutionException: Boxed Error* at >>> scala.concurrent.impl.Promise$.resolver(Promise.scala:87) >>> at >>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) >>> at >>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) >>> at scala.concurrent.Promise.complete(Promise.scala:53) >>> at scala.concurrent.Promise.complete$(Promise.scala:52) >>> at >>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) >>> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) >>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) >>> at >>> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) >>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) >>> at >>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) >>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) >>> at >>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) >>> >>> *Caused by: >>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException* >>> at >>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>> at >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>> at >>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280) >>> at >>> pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149) >>> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658) >>> at scala.
Re: serialization issue in streaming job run with scala Future
I think what you are pointing at is asynchronous datastream operations. In our case we want to submit the entire job in a Future. Something like the following .. def execute(..) = { // this does all data stream manipulation, joins etc. buildComputationGraph() // submits for execution with StreamExecutionEnvironment env.execute(..) } and we want to do .. val jobExecutionResultFuture = Future(execute(..)) and this gives that exception. regards. On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch wrote: > Hi Debasish, > > Have you taken a look at the AsyncIO API for running async operations? I > think this is the preferred way of doing it. [1] > So it would look something like this: > > class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { > > /** The database specific client that can issue concurrent requests with > callbacks */ > lazy val client: DatabaseClient = new DatabaseClient(host, post, > credentials) > > /** The context used for the future callbacks */ > implicit lazy val executor: ExecutionContext = > ExecutionContext.fromExecutor(Executors.directExecutor()) > > > override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, > String)]): Unit = { > > // issue the asynchronous request, receive a future for the result > val resultFutureRequested: Future[String] = client.query(str) > > // set the callback to be executed once the request by the client is > complete// the callback simply forwards the result to the result > futureresultFutureRequested.onSuccess { > case result: String => resultFuture.complete(Iterable((str, > result))) > } > }} > > // create the original streamval stream: DataStream[String] = ... > // apply the async I/O transformationval resultStream: DataStream[(String, > String)] = > AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, > TimeUnit.MILLISECONDS, 100) > > > Thanks, > Rafi > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api > > On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh > wrote: > >> ok, the above problem was due to some serialization issues which we fixed >> by marking some of the things transient. This fixes the serialization >> issues .. But now when I try to execute in a Future I hit upon this .. >> >> >> *java.util.concurrent.ExecutionException: Boxed Error* at >> scala.concurrent.impl.Promise$.resolver(Promise.scala:87) >> at >> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) >> at >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) >> at scala.concurrent.Promise.complete(Promise.scala:53) >> at scala.concurrent.Promise.complete$(Promise.scala:52) >> at >> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) >> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) >> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) >> at >> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) >> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) >> at >> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) >> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) >> at >> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) >> >> *Caused by: >> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException* >> at >> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >> at >> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280) >> at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149) >> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658) >> at scala.util.Success.$anonfun$map$1(Try.scala:255) >> at scala.util.Success.map(Try.scala:213) >> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) >> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) >> ... 7 more >> >> I found this issue in JIRA >> https://issues.apache.org/jira/browse/FLINK-10381 which is still open >> and talks about a related issue. But we are not submitting multiple jobs - >> we are just submitting 1 job but async in a Future. I am not clear why this >> should create the problem that I see. >> >> Can anyone please help with an explanation ? >> >> regards. >> >> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh >> wrote: >> >>> I think the issue may not be linked with Future. What happens is when >>> this piece of code is executed .. >>> >>> val rides: DataStream[TaxiRide] = >>>
Re: serialization issue in streaming job run with scala Future
Hi Debasish, Have you taken a look at the AsyncIO API for running async operations? I think this is the preferred way of doing it. [1] So it would look something like this: class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] { /** The database specific client that can issue concurrent requests with callbacks */ lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials) /** The context used for the future callbacks */ implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor()) override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = { // issue the asynchronous request, receive a future for the resultval resultFutureRequested: Future[String] = client.query(str) // set the callback to be executed once the request by the client is complete// the callback simply forwards the result to the result futureresultFutureRequested.onSuccess { case result: String => resultFuture.complete(Iterable((str, result))) } }} // create the original streamval stream: DataStream[String] = ... // apply the async I/O transformationval resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100) Thanks, Rafi [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh wrote: > ok, the above problem was due to some serialization issues which we fixed > by marking some of the things transient. This fixes the serialization > issues .. But now when I try to execute in a Future I hit upon this .. > > > *java.util.concurrent.ExecutionException: Boxed Error* at > scala.concurrent.impl.Promise$.resolver(Promise.scala:87) > at > scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > at scala.concurrent.Promise.complete(Promise.scala:53) > at scala.concurrent.Promise.complete$(Promise.scala:52) > at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) > at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > at > java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) > > *Caused by: > org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException* > at > org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > at > pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280) > at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149) > at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658) > at scala.util.Success.$anonfun$map$1(Try.scala:255) > at scala.util.Success.map(Try.scala:213) > at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) > at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) > ... 7 more > > I found this issue in JIRA > https://issues.apache.org/jira/browse/FLINK-10381 which is still open and > talks about a related issue. But we are not submitting multiple jobs - we > are just submitting 1 job but async in a Future. I am not clear why this > should create the problem that I see. > > Can anyone please help with an explanation ? > > regards. > > On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh > wrote: > >> I think the issue may not be linked with Future. What happens is when >> this piece of code is executed .. >> >> val rides: DataStream[TaxiRide] = >> readStream(inTaxiRide) >> .filter { ride ⇒ ride.getIsStart().booleanValue } >> .keyBy("rideId") >> >> val fares: DataStream[TaxiFare] = >> readStream(inTaxiFare) >> .keyBy("rideId") >> >> val processed: DataStream[TaxiRideFare] = >> rides >> .connect(fares) >> .flatMap(new EnrichmentFunction) >> >> somehow the ClosureCleaner gets executed as evident from the following >> which tries to serialize Avro data. Is there any way to pass the custom >> avro serializer that I am using ? >> >> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG >> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt >> type:INT
Re: serialization issue in streaming job run with scala Future
ok, the above problem was due to some serialization issues which we fixed by marking some of the things transient. This fixes the serialization issues .. But now when I try to execute in a Future I hit upon this .. *java.util.concurrent.ExecutionException: Boxed Error* at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at scala.concurrent.Promise.complete(Promise.scala:53) at scala.concurrent.Promise.complete$(Promise.scala:52) at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) *Caused by: org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException* at org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280) at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) ... 7 more I found this issue in JIRA https://issues.apache.org/jira/browse/FLINK-10381 which is still open and talks about a related issue. But we are not submitting multiple jobs - we are just submitting 1 job but async in a Future. I am not clear why this should create the problem that I see. Can anyone please help with an explanation ? regards. On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh wrote: > I think the issue may not be linked with Future. What happens is when this > piece of code is executed .. > > val rides: DataStream[TaxiRide] = > readStream(inTaxiRide) > .filter { ride ⇒ ride.getIsStart().booleanValue } > .keyBy("rideId") > > val fares: DataStream[TaxiFare] = > readStream(inTaxiFare) > .keyBy("rideId") > > val processed: DataStream[TaxiRideFare] = > rides > .connect(fares) > .flatMap(new EnrichmentFunction) > > somehow the ClosureCleaner gets executed as evident from the following > which tries to serialize Avro data. Is there any way to pass the custom > avro serializer that I am using ? > > org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG > pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt > type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, > startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT > pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not > serializable. The object probably contains or references non serializable > fields. > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) > at > org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) > at > org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) > at > org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) > at > pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47) > at > pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278) > at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149) > at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) > at scala.util.T
Re: serialization issue in streaming job run with scala Future
I think the issue may not be linked with Future. What happens is when this piece of code is executed .. val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId") val processed: DataStream[TaxiRideFare] = rides .connect(fares) .flatMap(new EnrichmentFunction) somehow the ClosureCleaner gets executed as evident from the following which tries to serialize Avro data. Is there any way to pass the custom avro serializer that I am using ? org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) at pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47) at pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278) at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149) at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) at scala.util.Try$.apply(Try.scala:213) at pipelines.runner.Runner$.run(Runner.scala:43) at pipelines.runner.Runner$.main(Runner.scala:30) at pipelines.runner.Runner.main(Runner.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at java.util.ArrayList.writeObject(ArrayList.java:766) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
Re: serialization issue in streaming job run with scala Future
Hi Debasish, I guess the reason is something unexpectedly involved in serialization due to a reference from inner class (anonymous class or lambda expression). When Flink serializes this inner class instance, it would also serialize all referenced objects, for example, the outer class instance. If the outer class is not serializable, this error would happen. You could have a try to move the piece of codes to a named non-inner class. Thanks, Biao /'bɪ.aʊ/ On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh wrote: > My main question is why serialisation kicks in when I try to execute > within a `Future` and not otherwise. > > regards. > > On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh > wrote: > >> Yes, they are generated from Avro Schema and implements Serializable .. >> >> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma >> wrote: >> >>> Does TaxiRide or TaxiRideFare implements Serializable? >>> >>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh >>> wrote: >>> Hello - The following piece of code is an example of a connected data streams .. val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId") val processed: DataStream[TaxiRideFare] = rides .connect(fares) .flatMap(new EnrichmentFunction) When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. But if I try to execute the above from within a scala.concurrent.Future, I get the following exception .. org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) ... Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) Any thoughts why this may happen ? regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg >>> >>> >>> -- >>> Thanks >>> Deepak >>> www.bigdatabig.com >>> www.keosha.net >>> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- > Sent from my iPhone >
Re: serialization issue in streaming job run with scala Future
My main question is why serialisation kicks in when I try to execute within a `Future` and not otherwise. regards. On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh wrote: > Yes, they are generated from Avro Schema and implements Serializable .. > > On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma > wrote: > >> Does TaxiRide or TaxiRideFare implements Serializable? >> >> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh >> wrote: >> >>> Hello - >>> >>> The following piece of code is an example of a connected data streams .. >>> >>> val rides: DataStream[TaxiRide] = >>> readStream(inTaxiRide) >>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>> .keyBy("rideId") >>> >>> val fares: DataStream[TaxiFare] = >>> readStream(inTaxiFare) >>> .keyBy("rideId") >>> >>> val processed: DataStream[TaxiRideFare] = >>> rides >>> .connect(fares) >>> .flatMap(new EnrichmentFunction) >>> >>> When I execute the above logic using >>> StreamExecutionEnvironment.execute(..) it runs fine. >>> But if I try to execute the above from within a scala.concurrent.Future, >>> I get the following exception .. >>> >>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG >>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt >>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, >>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT >>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not >>> serializable. The object probably contains or references non serializable >>> fields. >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >>> at >>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >>> at >>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >>> at >>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >>> at >>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >>> ... >>> >>> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>> >>> Any thoughts why this may happen ? >>> >>> regards. >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> >> >> -- >> Thanks >> Deepak >> www.bigdatabig.com >> www.keosha.net >> > > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Sent from my iPhone
Re: serialization issue in streaming job run with scala Future
Yes, they are generated from Avro Schema and implements Serializable .. On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma wrote: > Does TaxiRide or TaxiRideFare implements Serializable? > > On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh > wrote: > >> Hello - >> >> The following piece of code is an example of a connected data streams .. >> >> val rides: DataStream[TaxiRide] = >> readStream(inTaxiRide) >> .filter { ride ⇒ ride.getIsStart().booleanValue } >> .keyBy("rideId") >> >> val fares: DataStream[TaxiFare] = >> readStream(inTaxiFare) >> .keyBy("rideId") >> >> val processed: DataStream[TaxiRideFare] = >> rides >> .connect(fares) >> .flatMap(new EnrichmentFunction) >> >> When I execute the above logic using >> StreamExecutionEnvironment.execute(..) it runs fine. >> But if I try to execute the above from within a scala.concurrent.Future, >> I get the following exception .. >> >> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG >> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt >> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, >> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT >> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not >> serializable. The object probably contains or references non serializable >> fields. >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) >> at >> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) >> at >> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) >> at >> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) >> at >> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) >> ... >> >> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field >> at >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> >> Any thoughts why this may happen ? >> >> regards. >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg
Re: serialization issue in streaming job run with scala Future
Does TaxiRide or TaxiRideFare implements Serializable? On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh wrote: > Hello - > > The following piece of code is an example of a connected data streams .. > > val rides: DataStream[TaxiRide] = > readStream(inTaxiRide) > .filter { ride ⇒ ride.getIsStart().booleanValue } > .keyBy("rideId") > > val fares: DataStream[TaxiFare] = > readStream(inTaxiFare) > .keyBy("rideId") > > val processed: DataStream[TaxiRideFare] = > rides > .connect(fares) > .flatMap(new EnrichmentFunction) > > When I execute the above logic using > StreamExecutionEnvironment.execute(..) it runs fine. > But if I try to execute the above from within a scala.concurrent.Future, > I get the following exception .. > > org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG > pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt > type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, > startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT > pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not > serializable. The object probably contains or references non serializable > fields. > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) > at > org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) > at > org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) > at > org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) > at > org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) > ... > > Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > > Any thoughts why this may happen ? > > regards. > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Thanks Deepak www.bigdatabig.com www.keosha.net
serialization issue in streaming job run with scala Future
Hello - The following piece of code is an example of a connected data streams .. val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId") val processed: DataStream[TaxiRideFare] = rides .connect(fares) .flatMap(new EnrichmentFunction) When I execute the above logic using StreamExecutionEnvironment.execute(..) it runs fine. But if I try to execute the above from within a scala.concurrent.Future, I get the following exception .. org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5, startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574) at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185) at org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274) at org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179) ... Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) Any thoughts why this may happen ? regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg