Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Debasish Ghosh
Hi Yuval -

Here's a brief summary f what we are trying to do ..

At the library level we have this ..

def buildExecutionGraph(): Unit

def executeStreamingQueries(env: StreamExecutionEnvironment):
JobExecutionResult = {
  buildExecutionGraph()
  env.execute(s"Executing $streamletRef")
}

and we do the following ..

// note this ctx is created outside the Future
val jobResult = Future(createLogic.executeStreamingQueries(ctx.env))

and at the application level we have something like this ..

override def buildExecutionGraph = {
  val rides: DataStream[TaxiRide] =
readStream(inTaxiRide) // reads from Kafka
  .filter { ride ⇒ ride.getIsStart().booleanValue }
  .keyBy("rideId")

  val fares: DataStream[TaxiFare] =
readStream(inTaxiFare)
  .keyBy("rideId")

  val processed: DataStream[TaxiRideFare] =
rides
  .connect(fares)
  .flatMap(new EnrichmentFunction)

  writeStream(out, processed) // writes to Kafka
}

It fails only when we use the Future, otherwise it works ..

regards.

On Thu, Sep 19, 2019 at 1:16 PM Yuval Itzchakov  wrote:

> Debshish, could you share an example of before and after of your classes
> for future reference?
>
> On Thu, 19 Sep 2019, 10:42 Debasish Ghosh, 
> wrote:
>
>> We solved the problem of serialization by making some things transient
>> which were being captured as part of the closure. So we no longer have
>> serialization errors. Everything works properly without the future.
>>
>> I realize that because of statics concurrent job submission will be an
>> issue. But we are submitting one job only - the difference is that it's
>> through a Future. So there is no concurrent submission unless I am missing
>> something.
>>
>> regards.
>>
>> On Thu, Sep 19, 2019 at 12:54 PM Biao Liu  wrote:
>>
>>> Hi Debasish,
>>>
>>> I think there is something critical of your usage hided. It might help
>>> if you could provide more details.
>>>
>>> It still confuses me how you solve the serialization issue. Why the
>>> non-transient fields only affects serialization in a future?
>>>
>>> WRT this ProgramAbortException issue, do you submit jobs concurrently in
>>> one process?
>>> Currently job submission is not thread-safe. It relies on some static
>>> variables which could be affected by other concurrent submissions in the
>>> same process.
>>> Asking this because usually job submission is not through
>>> OptimizerPlanEnvironment which appears in your exception stack trace.
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh 
>>> wrote:
>>>
 I think what you are pointing at is asynchronous datastream operations.
 In our case we want to submit the entire job in a Future. Something like
 the following ..

 def execute(..) = {
   // this does all data stream manipulation, joins etc.
   buildComputationGraph()

   // submits for execution with StreamExecutionEnvironment
   env.execute(..)
 }

 and we want to do ..

 val jobExecutionResultFuture = Future(execute(..))

 and this gives that exception.

 regards.

 On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch 
 wrote:

> Hi Debasish,
>
> Have you taken a look at the AsyncIO API for running async operations?
> I think this is the preferred way of doing it. [1]
> So it would look something like this:
>
> class AsyncDatabaseRequest extends AsyncFunction[String, (String, 
> String)] {
>
> /** The database specific client that can issue concurrent requests 
> with callbacks */
> lazy val client: DatabaseClient = new DatabaseClient(host, post, 
> credentials)
>
> /** The context used for the future callbacks */
> implicit lazy val executor: ExecutionContext = 
> ExecutionContext.fromExecutor(Executors.directExecutor())
>
>
> override def asyncInvoke(str: String, resultFuture: 
> ResultFuture[(String, String)]): Unit = {
>
> // issue the asynchronous request, receive a future for the 
> resultval resultFutureRequested: Future[String] = 
> client.query(str)
>
> // set the callback to be executed once the request by the client 
> is complete// the callback simply forwards the result to the 
> result futureresultFutureRequested.onSuccess {
> case result: String => resultFuture.complete(Iterable((str, 
> result)))
> }
> }}
>
> // create the original streamval stream: DataStream[String] = ...
> // apply the async I/O transformationval resultStream: 
> DataStream[(String, String)] =
> AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 
> 1000, TimeUnit.MILLISECONDS, 100)
>
>
> Thanks,
> Rafi
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operat

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Yuval Itzchakov
Debshish, could you share an example of before and after of your classes
for future reference?

On Thu, 19 Sep 2019, 10:42 Debasish Ghosh,  wrote:

> We solved the problem of serialization by making some things transient
> which were being captured as part of the closure. So we no longer have
> serialization errors. Everything works properly without the future.
>
> I realize that because of statics concurrent job submission will be an
> issue. But we are submitting one job only - the difference is that it's
> through a Future. So there is no concurrent submission unless I am missing
> something.
>
> regards.
>
> On Thu, Sep 19, 2019 at 12:54 PM Biao Liu  wrote:
>
>> Hi Debasish,
>>
>> I think there is something critical of your usage hided. It might help if
>> you could provide more details.
>>
>> It still confuses me how you solve the serialization issue. Why the
>> non-transient fields only affects serialization in a future?
>>
>> WRT this ProgramAbortException issue, do you submit jobs concurrently in
>> one process?
>> Currently job submission is not thread-safe. It relies on some static
>> variables which could be affected by other concurrent submissions in the
>> same process.
>> Asking this because usually job submission is not through
>> OptimizerPlanEnvironment which appears in your exception stack trace.
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh 
>> wrote:
>>
>>> I think what you are pointing at is asynchronous datastream operations.
>>> In our case we want to submit the entire job in a Future. Something like
>>> the following ..
>>>
>>> def execute(..) = {
>>>   // this does all data stream manipulation, joins etc.
>>>   buildComputationGraph()
>>>
>>>   // submits for execution with StreamExecutionEnvironment
>>>   env.execute(..)
>>> }
>>>
>>> and we want to do ..
>>>
>>> val jobExecutionResultFuture = Future(execute(..))
>>>
>>> and this gives that exception.
>>>
>>> regards.
>>>
>>> On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch 
>>> wrote:
>>>
 Hi Debasish,

 Have you taken a look at the AsyncIO API for running async operations?
 I think this is the preferred way of doing it. [1]
 So it would look something like this:

 class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] 
 {

 /** The database specific client that can issue concurrent requests 
 with callbacks */
 lazy val client: DatabaseClient = new DatabaseClient(host, post, 
 credentials)

 /** The context used for the future callbacks */
 implicit lazy val executor: ExecutionContext = 
 ExecutionContext.fromExecutor(Executors.directExecutor())


 override def asyncInvoke(str: String, resultFuture: 
 ResultFuture[(String, String)]): Unit = {

 // issue the asynchronous request, receive a future for the result 
val resultFutureRequested: Future[String] = client.query(str)

 // set the callback to be executed once the request by the client 
 is complete// the callback simply forwards the result to the 
 result futureresultFutureRequested.onSuccess {
 case result: String => resultFuture.complete(Iterable((str, 
 result)))
 }
 }}

 // create the original streamval stream: DataStream[String] = ...
 // apply the async I/O transformationval resultStream: DataStream[(String, 
 String)] =
 AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 
 1000, TimeUnit.MILLISECONDS, 100)


 Thanks,
 Rafi

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api

 On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh <
 ghosh.debas...@gmail.com> wrote:

> ok, the above problem was due to some serialization issues which we
> fixed by marking some of the things transient. This fixes the 
> serialization
> issues .. But now when I try to execute in a Future I hit upon this ..
>
>
> *java.util.concurrent.ExecutionException: Boxed Error* at
> scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
> at
> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at scala.concurrent.Promise.complete(Promise.scala:53)
> at scala.concurrent.Promise.complete$(Promise.scala:52)
> at
> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
> at
> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> ja

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Debasish Ghosh
We solved the problem of serialization by making some things transient
which were being captured as part of the closure. So we no longer have
serialization errors. Everything works properly without the future.

I realize that because of statics concurrent job submission will be an
issue. But we are submitting one job only - the difference is that it's
through a Future. So there is no concurrent submission unless I am missing
something.

regards.

On Thu, Sep 19, 2019 at 12:54 PM Biao Liu  wrote:

> Hi Debasish,
>
> I think there is something critical of your usage hided. It might help if
> you could provide more details.
>
> It still confuses me how you solve the serialization issue. Why the
> non-transient fields only affects serialization in a future?
>
> WRT this ProgramAbortException issue, do you submit jobs concurrently in
> one process?
> Currently job submission is not thread-safe. It relies on some static
> variables which could be affected by other concurrent submissions in the
> same process.
> Asking this because usually job submission is not through
> OptimizerPlanEnvironment which appears in your exception stack trace.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh 
> wrote:
>
>> I think what you are pointing at is asynchronous datastream operations.
>> In our case we want to submit the entire job in a Future. Something like
>> the following ..
>>
>> def execute(..) = {
>>   // this does all data stream manipulation, joins etc.
>>   buildComputationGraph()
>>
>>   // submits for execution with StreamExecutionEnvironment
>>   env.execute(..)
>> }
>>
>> and we want to do ..
>>
>> val jobExecutionResultFuture = Future(execute(..))
>>
>> and this gives that exception.
>>
>> regards.
>>
>> On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch  wrote:
>>
>>> Hi Debasish,
>>>
>>> Have you taken a look at the AsyncIO API for running async operations? I
>>> think this is the preferred way of doing it. [1]
>>> So it would look something like this:
>>>
>>> class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
>>>
>>> /** The database specific client that can issue concurrent requests 
>>> with callbacks */
>>> lazy val client: DatabaseClient = new DatabaseClient(host, post, 
>>> credentials)
>>>
>>> /** The context used for the future callbacks */
>>> implicit lazy val executor: ExecutionContext = 
>>> ExecutionContext.fromExecutor(Executors.directExecutor())
>>>
>>>
>>> override def asyncInvoke(str: String, resultFuture: 
>>> ResultFuture[(String, String)]): Unit = {
>>>
>>> // issue the asynchronous request, receive a future for the result  
>>>   val resultFutureRequested: Future[String] = client.query(str)
>>>
>>> // set the callback to be executed once the request by the client 
>>> is complete// the callback simply forwards the result to the result 
>>> futureresultFutureRequested.onSuccess {
>>> case result: String => resultFuture.complete(Iterable((str, 
>>> result)))
>>> }
>>> }}
>>>
>>> // create the original streamval stream: DataStream[String] = ...
>>> // apply the async I/O transformationval resultStream: DataStream[(String, 
>>> String)] =
>>> AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
>>> TimeUnit.MILLISECONDS, 100)
>>>
>>>
>>> Thanks,
>>> Rafi
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api
>>>
>>> On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh 
>>> wrote:
>>>
 ok, the above problem was due to some serialization issues which we
 fixed by marking some of the things transient. This fixes the serialization
 issues .. But now when I try to execute in a Future I hit upon this ..


 *java.util.concurrent.ExecutionException: Boxed Error* at
 scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
 at
 scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
 at
 scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
 at scala.concurrent.Promise.complete(Promise.scala:53)
 at scala.concurrent.Promise.complete$(Promise.scala:52)
 at
 scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
 at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
 at
 java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
 at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
 at
 java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
 at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
 at
 java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

 *Caused by:
 org.apache.flink.client.program.OptimizerPlanEnvironment$Progra

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Biao Liu
Hi Debasish,

I think there is something critical of your usage hided. It might help if
you could provide more details.

It still confuses me how you solve the serialization issue. Why the
non-transient fields only affects serialization in a future?

WRT this ProgramAbortException issue, do you submit jobs concurrently in
one process?
Currently job submission is not thread-safe. It relies on some static
variables which could be affected by other concurrent submissions in the
same process.
Asking this because usually job submission is not through
OptimizerPlanEnvironment which appears in your exception stack trace.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Sep 2019 at 15:03, Debasish Ghosh 
wrote:

> I think what you are pointing at is asynchronous datastream operations. In
> our case we want to submit the entire job in a Future. Something like the
> following ..
>
> def execute(..) = {
>   // this does all data stream manipulation, joins etc.
>   buildComputationGraph()
>
>   // submits for execution with StreamExecutionEnvironment
>   env.execute(..)
> }
>
> and we want to do ..
>
> val jobExecutionResultFuture = Future(execute(..))
>
> and this gives that exception.
>
> regards.
>
> On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch  wrote:
>
>> Hi Debasish,
>>
>> Have you taken a look at the AsyncIO API for running async operations? I
>> think this is the preferred way of doing it. [1]
>> So it would look something like this:
>>
>> class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
>>
>> /** The database specific client that can issue concurrent requests with 
>> callbacks */
>> lazy val client: DatabaseClient = new DatabaseClient(host, post, 
>> credentials)
>>
>> /** The context used for the future callbacks */
>> implicit lazy val executor: ExecutionContext = 
>> ExecutionContext.fromExecutor(Executors.directExecutor())
>>
>>
>> override def asyncInvoke(str: String, resultFuture: 
>> ResultFuture[(String, String)]): Unit = {
>>
>> // issue the asynchronous request, receive a future for the result   
>>  val resultFutureRequested: Future[String] = client.query(str)
>>
>> // set the callback to be executed once the request by the client is 
>> complete// the callback simply forwards the result to the result 
>> futureresultFutureRequested.onSuccess {
>> case result: String => resultFuture.complete(Iterable((str, 
>> result)))
>> }
>> }}
>>
>> // create the original streamval stream: DataStream[String] = ...
>> // apply the async I/O transformationval resultStream: DataStream[(String, 
>> String)] =
>> AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
>> TimeUnit.MILLISECONDS, 100)
>>
>>
>> Thanks,
>> Rafi
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api
>>
>> On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh 
>> wrote:
>>
>>> ok, the above problem was due to some serialization issues which we
>>> fixed by marking some of the things transient. This fixes the serialization
>>> issues .. But now when I try to execute in a Future I hit upon this ..
>>>
>>>
>>> *java.util.concurrent.ExecutionException: Boxed Error* at
>>> scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
>>> at
>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
>>> at
>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>>> at scala.concurrent.Promise.complete(Promise.scala:53)
>>> at scala.concurrent.Promise.complete$(Promise.scala:52)
>>> at
>>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
>>> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
>>> at
>>> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
>>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>>> at
>>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>>> at
>>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>>>
>>> *Caused by:
>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
>>> at
>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>> at
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>> at
>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
>>> at
>>> pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
>>> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
>>> at scala.

Re: serialization issue in streaming job run with scala Future

2019-09-19 Thread Debasish Ghosh
I think what you are pointing at is asynchronous datastream operations. In
our case we want to submit the entire job in a Future. Something like the
following ..

def execute(..) = {
  // this does all data stream manipulation, joins etc.
  buildComputationGraph()

  // submits for execution with StreamExecutionEnvironment
  env.execute(..)
}

and we want to do ..

val jobExecutionResultFuture = Future(execute(..))

and this gives that exception.

regards.

On Thu, Sep 19, 2019 at 11:00 AM Rafi Aroch  wrote:

> Hi Debasish,
>
> Have you taken a look at the AsyncIO API for running async operations? I
> think this is the preferred way of doing it. [1]
> So it would look something like this:
>
> class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
>
> /** The database specific client that can issue concurrent requests with 
> callbacks */
> lazy val client: DatabaseClient = new DatabaseClient(host, post, 
> credentials)
>
> /** The context used for the future callbacks */
> implicit lazy val executor: ExecutionContext = 
> ExecutionContext.fromExecutor(Executors.directExecutor())
>
>
> override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, 
> String)]): Unit = {
>
> // issue the asynchronous request, receive a future for the result
> val resultFutureRequested: Future[String] = client.query(str)
>
> // set the callback to be executed once the request by the client is 
> complete// the callback simply forwards the result to the result 
> futureresultFutureRequested.onSuccess {
> case result: String => resultFuture.complete(Iterable((str, 
> result)))
> }
> }}
>
> // create the original streamval stream: DataStream[String] = ...
> // apply the async I/O transformationval resultStream: DataStream[(String, 
> String)] =
> AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, 
> TimeUnit.MILLISECONDS, 100)
>
>
> Thanks,
> Rafi
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api
>
> On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh 
> wrote:
>
>> ok, the above problem was due to some serialization issues which we fixed
>> by marking some of the things transient. This fixes the serialization
>> issues .. But now when I try to execute in a Future I hit upon this ..
>>
>>
>> *java.util.concurrent.ExecutionException: Boxed Error* at
>> scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
>> at
>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>> at scala.concurrent.Promise.complete(Promise.scala:53)
>> at scala.concurrent.Promise.complete$(Promise.scala:52)
>> at
>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
>> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
>> at
>> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
>> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
>> at
>> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
>> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
>> at
>> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>>
>> *Caused by:
>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
>> at
>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>> at
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>> at
>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
>> at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
>> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
>> at scala.util.Success.$anonfun$map$1(Try.scala:255)
>> at scala.util.Success.map(Try.scala:213)
>> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
>> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
>> ... 7 more
>>
>> I found this issue in JIRA
>> https://issues.apache.org/jira/browse/FLINK-10381 which is still open
>> and talks about a related issue. But we are not submitting multiple jobs -
>> we are just submitting 1 job but async in a Future. I am not clear why this
>> should create the problem that I see.
>>
>> Can anyone please help with an explanation ?
>>
>> regards.
>>
>> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh 
>> wrote:
>>
>>> I think the issue may not be linked with Future. What happens is when
>>> this piece of code is executed ..
>>>
>>> val rides: DataStream[TaxiRide] =
>>>  

Re: serialization issue in streaming job run with scala Future

2019-09-18 Thread Rafi Aroch
Hi Debasish,

Have you taken a look at the AsyncIO API for running async operations? I
think this is the preferred way of doing it. [1]
So it would look something like this:

class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {

/** The database specific client that can issue concurrent
requests with callbacks */
lazy val client: DatabaseClient = new DatabaseClient(host, post,
credentials)

/** The context used for the future callbacks */
implicit lazy val executor: ExecutionContext =
ExecutionContext.fromExecutor(Executors.directExecutor())


override def asyncInvoke(str: String, resultFuture:
ResultFuture[(String, String)]): Unit = {

// issue the asynchronous request, receive a future for the
resultval resultFutureRequested: Future[String] =
client.query(str)

// set the callback to be executed once the request by the
client is complete// the callback simply forwards the result
to the result futureresultFutureRequested.onSuccess {
case result: String =>
resultFuture.complete(Iterable((str, result)))
}
}}

// create the original streamval stream: DataStream[String] = ...
// apply the async I/O transformationval resultStream:
DataStream[(String, String)] =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(),
1000, TimeUnit.MILLISECONDS, 100)


Thanks,
Rafi

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/asyncio.html#async-io-api

On Wed, Sep 18, 2019 at 8:26 PM Debasish Ghosh 
wrote:

> ok, the above problem was due to some serialization issues which we fixed
> by marking some of the things transient. This fixes the serialization
> issues .. But now when I try to execute in a Future I hit upon this ..
>
>
> *java.util.concurrent.ExecutionException: Boxed Error* at
> scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
> at
> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at scala.concurrent.Promise.complete(Promise.scala:53)
> at scala.concurrent.Promise.complete$(Promise.scala:52)
> at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
> at
> java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
>
> *Caused by:
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
> at
> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> at
> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
> at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
> at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
> at scala.util.Success.$anonfun$map$1(Try.scala:255)
> at scala.util.Success.map(Try.scala:213)
> at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
> ... 7 more
>
> I found this issue in JIRA
> https://issues.apache.org/jira/browse/FLINK-10381 which is still open and
> talks about a related issue. But we are not submitting multiple jobs - we
> are just submitting 1 job but async in a Future. I am not clear why this
> should create the problem that I see.
>
> Can anyone please help with an explanation ?
>
> regards.
>
> On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh 
> wrote:
>
>> I think the issue may not be linked with Future. What happens is when
>> this piece of code is executed ..
>>
>> val rides: DataStream[TaxiRide] =
>>   readStream(inTaxiRide)
>> .filter { ride ⇒ ride.getIsStart().booleanValue }
>> .keyBy("rideId")
>>
>> val fares: DataStream[TaxiFare] =
>>   readStream(inTaxiFare)
>> .keyBy("rideId")
>>
>> val processed: DataStream[TaxiRideFare] =
>>   rides
>> .connect(fares)
>> .flatMap(new EnrichmentFunction)
>>
>> somehow the ClosureCleaner gets executed as evident from the following
>> which tries to serialize Avro data. Is there any way to pass the custom
>> avro serializer that I am using ?
>>
>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
>> type:INT

Re: serialization issue in streaming job run with scala Future

2019-09-18 Thread Debasish Ghosh
ok, the above problem was due to some serialization issues which we fixed
by marking some of the things transient. This fixes the serialization
issues .. But now when I try to execute in a Future I hit upon this ..


*java.util.concurrent.ExecutionException: Boxed Error* at
scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at
scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.complete(Promise.scala:53)
at scala.concurrent.Promise.complete$(Promise.scala:52)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

*Caused by:
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException*
at
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at
pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:280)
at pipelines.flink.FlinkStreamlet.$anonfun$run$2(FlinkStreamlet.scala:149)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
... 7 more

I found this issue in JIRA
https://issues.apache.org/jira/browse/FLINK-10381 which
is still open and talks about a related issue. But we are not submitting
multiple jobs - we are just submitting 1 job but async in a Future. I am
not clear why this should create the problem that I see.

Can anyone please help with an explanation ?

regards.

On Wed, Sep 18, 2019 at 12:22 AM Debasish Ghosh 
wrote:

> I think the issue may not be linked with Future. What happens is when this
> piece of code is executed ..
>
> val rides: DataStream[TaxiRide] =
>   readStream(inTaxiRide)
> .filter { ride ⇒ ride.getIsStart().booleanValue }
> .keyBy("rideId")
>
> val fares: DataStream[TaxiFare] =
>   readStream(inTaxiFare)
> .keyBy("rideId")
>
> val processed: DataStream[TaxiRideFare] =
>   rides
> .connect(fares)
> .flatMap(new EnrichmentFunction)
>
> somehow the ClosureCleaner gets executed as evident from the following
> which tries to serialize Avro data. Is there any way to pass the custom
> avro serializer that I am using ?
>
> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
> serializable. The object probably contains or references non serializable
> fields.
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
> at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
> at
> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
> at
> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
> at
> pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
> at
> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
> at scala.util.T

Re: serialization issue in streaming job run with scala Future

2019-09-17 Thread Debasish Ghosh
I think the issue may not be linked with Future. What happens is when this
piece of code is executed ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
.filter { ride ⇒ ride.getIsStart().booleanValue }
.keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
.keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
.connect(fares)
.flatMap(new EnrichmentFunction)

somehow the ClosureCleaner gets executed as evident from the following
which tries to serialize Avro data. Is there any way to pass the custom
avro serializer that I am using ?

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
serializable. The object probably contains or references non serializable
fields.
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at
org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
at
org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
at
pipelines.examples.processor.TaxiRideProcessor$$anon$1.buildExecutionGraph(TaxiRideProcessor.scala:47)
at
pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:278)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:149)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

Re: serialization issue in streaming job run with scala Future

2019-09-16 Thread Biao Liu
Hi Debasish,

I guess the reason is something unexpectedly involved in serialization due
to a reference from inner class (anonymous class or lambda expression).
When Flink serializes this inner class instance, it would also serialize
all referenced objects, for example, the outer class instance. If the outer
class is not serializable, this error would happen.

You could have a try to move the piece of codes to a named non-inner class.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Sep 2019 at 02:06, Debasish Ghosh 
wrote:

> My main question is why serialisation kicks in when I try to execute
> within a `Future` and not otherwise.
>
> regards.
>
> On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh 
> wrote:
>
>> Yes, they are generated from Avro Schema and implements Serializable ..
>>
>> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma 
>> wrote:
>>
>>> Does TaxiRide or TaxiRideFare implements Serializable?
>>>
>>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh 
>>> wrote:
>>>
 Hello -

 The following piece of code is an example of a connected data streams ..

 val rides: DataStream[TaxiRide] =
   readStream(inTaxiRide)
 .filter { ride ⇒ ride.getIsStart().booleanValue }
 .keyBy("rideId")

 val fares: DataStream[TaxiFare] =
   readStream(inTaxiFare)
 .keyBy("rideId")

 val processed: DataStream[TaxiRideFare] =
   rides
 .connect(fares)
 .flatMap(new EnrichmentFunction)

 When I execute the above logic using
 StreamExecutionEnvironment.execute(..) it runs fine.
 But if I try to execute the above from within a scala.concurrent.Future,
 I get the following exception ..

 org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
 pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
 type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
 startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
 pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
 serializable. The object probably contains or references non serializable
 fields.
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
 at
 org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
 at
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
 at
 org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
 at
 org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
 at
 org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
   ...

 Caused by: java.io.NotSerializableException:
 org.apache.avro.Schema$Field
 at
 java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

 Any thoughts why this may happen ?

 regards.

 --
 Debasish Ghosh
 http://manning.com/ghosh2
 http://manning.com/ghosh

 Twttr: @debasishg
 Blog: http://debasishg.blogspot.com
 Code: http://github.com/debasishg

>>>
>>>
>>> --
>>> Thanks
>>> Deepak
>>> www.bigdatabig.com
>>> www.keosha.net
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
> --
> Sent from my iPhone
>


Re: serialization issue in streaming job run with scala Future

2019-09-16 Thread Debasish Ghosh
My main question is why serialisation kicks in when I try to execute within
a `Future` and not otherwise.

regards.

On Mon, 16 Sep 2019 at 4:46 PM, Debasish Ghosh 
wrote:

> Yes, they are generated from Avro Schema and implements Serializable ..
>
> On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma 
> wrote:
>
>> Does TaxiRide or TaxiRideFare implements Serializable?
>>
>> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh 
>> wrote:
>>
>>> Hello -
>>>
>>> The following piece of code is an example of a connected data streams ..
>>>
>>> val rides: DataStream[TaxiRide] =
>>>   readStream(inTaxiRide)
>>> .filter { ride ⇒ ride.getIsStart().booleanValue }
>>> .keyBy("rideId")
>>>
>>> val fares: DataStream[TaxiFare] =
>>>   readStream(inTaxiFare)
>>> .keyBy("rideId")
>>>
>>> val processed: DataStream[TaxiRideFare] =
>>>   rides
>>> .connect(fares)
>>> .flatMap(new EnrichmentFunction)
>>>
>>> When I execute the above logic using
>>> StreamExecutionEnvironment.execute(..) it runs fine.
>>> But if I try to execute the above from within a scala.concurrent.Future,
>>> I get the following exception ..
>>>
>>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
>>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
>>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
>>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
>>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
>>> serializable. The object probably contains or references non serializable
>>> fields.
>>> at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>>> at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>> at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>> at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>> at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>>> at
>>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>>> at
>>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>>> at
>>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>>> at
>>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>>   ...
>>>
>>> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
>>> at
>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>>
>>> Any thoughts why this may happen ?
>>>
>>> regards.
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>>
>> --
>> Thanks
>> Deepak
>> www.bigdatabig.com
>> www.keosha.net
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>
-- 
Sent from my iPhone


Re: serialization issue in streaming job run with scala Future

2019-09-16 Thread Debasish Ghosh
Yes, they are generated from Avro Schema and implements Serializable ..

On Mon, Sep 16, 2019 at 4:40 PM Deepak Sharma  wrote:

> Does TaxiRide or TaxiRideFare implements Serializable?
>
> On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh 
> wrote:
>
>> Hello -
>>
>> The following piece of code is an example of a connected data streams ..
>>
>> val rides: DataStream[TaxiRide] =
>>   readStream(inTaxiRide)
>> .filter { ride ⇒ ride.getIsStart().booleanValue }
>> .keyBy("rideId")
>>
>> val fares: DataStream[TaxiFare] =
>>   readStream(inTaxiFare)
>> .keyBy("rideId")
>>
>> val processed: DataStream[TaxiRideFare] =
>>   rides
>> .connect(fares)
>> .flatMap(new EnrichmentFunction)
>>
>> When I execute the above logic using
>> StreamExecutionEnvironment.execute(..) it runs fine.
>> But if I try to execute the above from within a scala.concurrent.Future,
>> I get the following exception ..
>>
>> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
>> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
>> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
>> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
>> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
>> serializable. The object probably contains or references non serializable
>> fields.
>> at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
>> at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>> at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>> at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>> at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
>> at
>> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
>> at
>> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
>> at
>> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
>> at
>> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>>   ...
>>
>> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>>
>> Any thoughts why this may happen ?
>>
>> regards.
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: serialization issue in streaming job run with scala Future

2019-09-16 Thread Deepak Sharma
Does TaxiRide or TaxiRideFare implements Serializable?

On Mon, Sep 16, 2019 at 3:47 PM Debasish Ghosh 
wrote:

> Hello -
>
> The following piece of code is an example of a connected data streams ..
>
> val rides: DataStream[TaxiRide] =
>   readStream(inTaxiRide)
> .filter { ride ⇒ ride.getIsStart().booleanValue }
> .keyBy("rideId")
>
> val fares: DataStream[TaxiFare] =
>   readStream(inTaxiFare)
> .keyBy("rideId")
>
> val processed: DataStream[TaxiRideFare] =
>   rides
> .connect(fares)
> .flatMap(new EnrichmentFunction)
>
> When I execute the above logic using
> StreamExecutionEnvironment.execute(..) it runs fine.
> But if I try to execute the above from within a scala.concurrent.Future,
> I get the following exception ..
>
> org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
> pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
> type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
> startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
> pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
> serializable. The object probably contains or references non serializable
> fields.
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
> at
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
> at
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
> at
> org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
> at
> org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
>   ...
>
> Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
>
> Any thoughts why this may happen ?
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


serialization issue in streaming job run with scala Future

2019-09-16 Thread Debasish Ghosh
Hello -

The following piece of code is an example of a connected data streams ..

val rides: DataStream[TaxiRide] =
  readStream(inTaxiRide)
.filter { ride ⇒ ride.getIsStart().booleanValue }
.keyBy("rideId")

val fares: DataStream[TaxiFare] =
  readStream(inTaxiFare)
.keyBy("rideId")

val processed: DataStream[TaxiRideFare] =
  rides
.connect(fares)
.flatMap(new EnrichmentFunction)

When I execute the above logic using StreamExecutionEnvironment.execute(..)
it runs fine.
But if I try to execute the above from within a scala.concurrent.Future, I
get the following exception ..

org.apache.flink.api.common.InvalidProgramException: [rideId type:LONG
pos:0, isStart type:BOOLEAN pos:1, taxiId type:LONG pos:2, passengerCnt
type:INT pos:3, driverId type:LONG pos:4, startLon type:FLOAT pos:5,
startLat type:FLOAT pos:6, endLon type:FLOAT pos:7, endLat type:FLOAT
pos:8, startTime type:LONG pos:9, endTime type:LONG pos:10] is not
serializable. The object probably contains or references non serializable
fields.
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
at
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at
org.apache.flink.streaming.api.datastream.ConnectedStreams.flatMap(ConnectedStreams.java:274)
at
org.apache.flink.streaming.api.scala.ConnectedStreams.flatMap(ConnectedStreams.scala:179)
  ...

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)

Any thoughts why this may happen ?

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg