Re: Flink SQL client always cost 2 minutes to submit job to a local cluster

2019-01-03 Thread Hequn Cheng
Hi yinhua,

Could you help to reproduce the problem? I can help to figure out the root
cause.

Best, Hequn


On Fri, Jan 4, 2019 at 11:37 AM yinhua.dai  wrote:

> Hi Fabian,
>
> It's the submission of the jar file cost too long time.
> And yes Hequn and your suggestion is working, but just curious why a 100M
> jar files causes so long time to submit, is it related with some upload
> parameter settings of the web layer?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


The way to write a UDF with generic type

2019-01-03 Thread yinhua.dai
Hi Community,

I tried to write a UDF with generic type, but seems that Flink will complain
not recognizing the type information when I use it in SQL.

I checked the implementation of native function "MAX" and realize that it's
not using the same API(AggregationFunction e.g.) as user defined function,
is that the reason why "MAX" doesn't have the generic type issue?

How can I implement my own "MAX" function which could support all types?
Thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink SQL client always cost 2 minutes to submit job to a local cluster

2019-01-03 Thread yinhua.dai
Hi Fabian,

It's the submission of the jar file cost too long time.
And yes Hequn and your suggestion is working, but just curious why a 100M
jar files causes so long time to submit, is it related with some upload
parameter settings of the web layer?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Is there an example of flink cluster "as a job" deployment on k8s ?

2019-01-03 Thread Chesnay Schepler

I believe we're targeting late February.

On 03.01.2019 20:10, Vishal Santoshi wrote:
This is interesting https://issues.apache.org/jira/browse/FLINK-9953. 
When would 1.8 be out ?


On Wed, Dec 12, 2018 at 12:45 PM Vishal Santoshi 
mailto:vishal.santo...@gmail.com>> wrote:


thanks

On Thu, Dec 6, 2018 at 5:39 AM Dawid Wysakowicz
mailto:dwysakow...@apache.org>> wrote:

Hi Vishal,

You might want to have a look at the
flink-container/kubernetes module:
https://github.com/apache/flink/tree/master/flink-container/kubernetes

Best,

Dawid

On 05/12/2018 22:50, Vishal Santoshi wrote:
>





Re: using updating shared data

2019-01-03 Thread Avi Levi
Thanks for the tip Elias!

On Wed, Jan 2, 2019 at 9:44 PM Elias Levy 
wrote:

> One thing you must be careful of, is that if you are using event time
> processing, assuming that the control stream will only receive messages
> sporadically, is that event time will stop moving forward in the operator
> joining the streams while the control stream is idle.  You can get around
> this by using a periodic watermark extractor one the control stream that
> bounds the event time delay to processing time or by defining your own low
> level operator that ignores watermarks from the control stream.
>
> On Wed, Jan 2, 2019 at 8:42 AM Avi Levi  wrote:
>
>> Thanks Till I will defiantly going to check it. just to make sure that I
>> got you correctly. you are suggesting the the list that I want to broadcast
>> will be broadcasted via control stream and it will be than be kept in the
>> relevant operator state correct ? and updates (CRUD) on that list will be
>> preformed via the control stream. correct ?
>> BR
>> Avi
>>
>> On Wed, Jan 2, 2019 at 4:28 PM Till Rohrmann 
>> wrote:
>>
>>> Hi Avi,
>>>
>>> you could use Flink's broadcast state pattern [1]. You would need to use
>>> the DataStream API but it allows you to have two streams (input and control
>>> stream) where the control stream is broadcasted to all sub tasks. So by
>>> ingesting messages into the control stream you can send model updates to
>>> all sub tasks.
>>>
>>> [1]
>>> 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
>>> 
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Jan 1, 2019 at 6:49 PM miki haiat  wrote:
>>>
 Im trying to understand  your  use case.
 What is the source  of the data ? FS ,KAFKA else ?


 On Tue, Jan 1, 2019 at 6:29 PM Avi Levi 
 wrote:

> Hi,
> I have a list (couple of thousands text lines) that I need to use in
> my map function. I read this article about broadcasting variables
> 
>  or
> using distributed cache
> 
> however I need to update this list from time to time, and if I understood
> correctly it is not possible on broadcast or cache without restarting the
> job. Is there idiomatic way to achieve this? A db seems to be an overkill
> for that and I do want to be cheap on io/network calls as much as 
> possible.
>
> Cheers
> Avi
>
>


Re: Is there an example of flink cluster "as a job" deployment on k8s ?

2019-01-03 Thread Vishal Santoshi
This is interesting https://issues.apache.org/jira/browse/FLINK-9953.  When
would 1.8 be out ?

On Wed, Dec 12, 2018 at 12:45 PM Vishal Santoshi 
wrote:

> thanks
>
> On Thu, Dec 6, 2018 at 5:39 AM Dawid Wysakowicz 
> wrote:
>
>> Hi Vishal,
>>
>> You might want to have a look at the flink-container/kubernetes module:
>> https://github.com/apache/flink/tree/master/flink-container/kubernetes
>>
>> Best,
>>
>> Dawid
>>
>> On 05/12/2018 22:50, Vishal Santoshi wrote:
>> >
>>
>>


Re: Problem building 1.7.1 with scala-2.12

2019-01-03 Thread Cliff Resnick
Thanks, that works. I was passing -Pscala-2.12 (for Profile).

On Thu, Jan 3, 2019 at 4:45 AM Chesnay Schepler  wrote:

> When building Flink for scala 2.12 you have to pass "-Dscala-2.12" to
> maven; see the flink-connector-kafka-0.9 pom for details. (look for the
> scala-2.11 profile)
>
> On 02.01.2019 17:48, Cliff Resnick wrote:
> > The build fails at flink-connector-kafka-0.9 because _2.12 libraries
> > apparently do not exist for kafka < 0.10. Any help appreciated!
> >
> >
> > -Cliff
>
>
>


Re: Flink SQL client always cost 2 minutes to submit job to a local cluster

2019-01-03 Thread Fabian Hueske
Hi,

You can try to build a JAR file with all runtime dependencies of Flink SQL
(Calcite, Janino, +transitive dependencies), add it to the lib folder, and
exclude the dependencies from the JAR file that is sent to the cluster when
you submit a job.

It would also be good to figure out what takes so long.
Is it actually the submission of the jar file (in that case Hequn's and my
suggestions would help) or the time to plan a query?

Best, Fabian

Am So., 30. Dez. 2018 um 22:06 Uhr schrieb yinhua.dai <
yinhua.2...@outlook.com>:

> I have to do that for now, however I have to find another way because the
> jar
> some times get update and the flink cluster will be remotely in future.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Is there a better way to debug ClassNotFoundException from FlinkUserCodeClassLoaders?

2019-01-03 Thread Hao Sun
I am on Flink 1.7.1 and K8S.
I said "suddenly" because my program worked fine until I added a new
MapFunction.
I do not know the details, but I think I know what is causing it

=== Start of Program ===
val stream: DataStream[MaxwellEvent] = 
steam.map(new ProblemFunction()) will cause the issue
class ProblemFunction(stringParam: String)(implicit datadog: DatadogClient)
extends MapFunction[MaxwellEvent, MaxwellEvent]
=== End of Program ===

I think the class taking curry params caused the issue, after I give up on
the curry format, the error disappeared.

I am using https://github.com/sbt/sbt-assembly to assemble the fat jar.
There might be some issue, or config issue with that as well.

I am reading this article, it is a good start for me as well
https://heapanalytics.com/blog/engineering/missing-scala-class-noclassdeffounderror


Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Thu, Jan 3, 2019 at 1:08 AM Timo Walther  wrote:

> Hi Hao,
>
> which Flink version are you using? What do you mean with "suddenly", did
> it work before?
>
> Regards,
> Timo
>
>
> Am 03.01.19 um 07:13 schrieb Hao Sun:
>
> Yep, javap shows the class is there, but FlinkUserCodeClassLoaders somehow
> could not find it suddenly
>
> javap -cp /opt/flink/lib/zendesk-fps-core-assembly-0.1.0.jar
> 'com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45'
> Compiled from "ConnectedStreams.scala"
> public final class
> com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45
> extends
> org.apache.flink.api.scala.typeutils.CaseClassSerializer
> {
> public com.zendesk.fraudprevention.datatypes.MaxwellEvent
> createInstance(java.lang.Object[]);
> public
> org.apache.flink.api.scala.typeutils.CaseClassSerializer
> createSerializerInstance(java.lang.Class,
> org.apache.flink.api.common.typeutils.TypeSerializer[]);
> public org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase
> createSerializerInstance(java.lang.Class,
> org.apache.flink.api.common.typeutils.TypeSerializer[]);
> public java.lang.Object createInstance(java.lang.Object[]);
> public
> com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45(com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90,
> org.apache.flink.api.common.typeutils.TypeSerializer[]);
> }
>
> Hao Sun
> Team Lead
> 1019 Market St. 7F
> San Francisco, CA 94103
>
>
> On Wed, Jan 2, 2019 at 6:04 PM qi luo  wrote:
>
>> Hi Hao,
>>
>> Since Flink is using Child-First class loader, you may try search for the
>> class 
>> "*com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45”
>> in your fat JAR. Is that an inner class?*
>>
>> *Best,*
>> *Qi*
>>
>> On Jan 3, 2019, at 7:01 AM, Hao Sun  wrote:
>>
>> Hi,
>>
>> I am wondering if there are any protips to figure out what class is not
>> found?
>>
>> = Logs 
>> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not
>> instantiate chained outputs.
>> at
>> org.apache.flink.streaming.api.graph.StreamConfig.getChainedOutputs(StreamConfig.java:324)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:292)
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
>> http://OperatorChain.java:133
>> )
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>> at java.lang.Thread.run(Thread.java:748)
>> *Caused by: java.lang.ClassNotFoundException:
>> com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45*
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at
>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:77)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1866)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
>> at
>> 

ConnectTimeoutException when createPartitionRequestClient

2019-01-03 Thread Wenrui Meng
Hi,

I consistently get connection timeout issue when creating
partitionRequestClient in flink 1.4. I tried to ping from the connecting
host to the connected host, but the ping latency is less than 0.1 ms
consistently. So it's probably not due to the cluster status. I also tried
increase max backoff, nettowrk timeout and some other setting, it doesn't
help.

I enabled the debug log of flink but not find any suspicious or useful
information to help me fix the issue. Here is the link

of the jobManager and taskManager logs. The connecting host is the host
which throw the exception. The connected host is the host the connecting
host try to request partition from.

Since our platform is not up to date yet, the flink version I used in this
is 1.4. But I noticed that there is not much change of these code on the
Master branch. Any help will be appreciated.

Here is stack trace of the exception

from RUNNING to FAILED.
java.io.IOException: Connecting the channel failed: Connecting to remote
task manager + 'athena485-sjc1/10.70.132.8:34185' has failed. This might
indicate that the remote task manager has been lost.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.waitForChannel(PartitionRequestClientFactory.java:197)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.access$000(PartitionRequestClientFactory.java:132)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory.createPartitionRequestClient(PartitionRequestClientFactory.java:84)
at
org.apache.flink.runtime.io.network.netty.NettyConnectionManager.createPartitionRequestClient(NettyConnectionManager.java:59)
at
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.requestSubpartition(RemoteInputChannel.java:156)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:480)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502)
at
org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:93)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:214)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
Connecting to remote task manager + 'athena485-sjc1/10.70.132.8:34185' has
failed. This might indicate that the remote task manager has been lost.
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:220)
at
org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory$ConnectingChannel.operationComplete(PartitionRequestClientFactory.java:132)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:214)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:120)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 common frames omitted
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException:
connection timed out: athena485-sjc1/10.70.132.8:34185
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:212)
... 6 common frames omitted

Thanks,
Wenrui


Re: Question about Flink optimizer on Stream API

2019-01-03 Thread Till Rohrmann
Hi Felipe,

for streaming Flink currently does not optimize the data flow graph. I
think the best reference is actually going through the code as you've done
for the batch case.

Cheers,
Till

On Wed, Dec 19, 2018 at 3:14 PM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I was reading some FLIP documents related to the new design of the Flink
> Schedule [1] and unification of batch and stream [2]. Then I created two
> different programs to learn how Flink optimizes the Query Plan in Batch and
> in Stream mode (and how much further it goes). One using batch [3] and one
> using Stream [4]. During the code debugging and also as it is depicted on
> the document [2], the batch program uses the
> org.apache.flink.optimizer.Optimizer class which generates a
> "org.apache.flink.optimizer.plan.OptimizedPlan" while stream program uses
> the "org.apache.flink.streaming.api.graph.StreamGraph" and every
> transformation inside the packet
> "org.apache.flink.streaming.api.transformations".
>
> When I am showing the execution plan with "env.getExecutionPlan()" I see
> exactly I have written on the Flink program (which it is expected).
> However, I was looking for where I can see the optimized plan. I mean
> decisions of operators reordering based on cost or statistics. For batch I
> could find the "org.apache.flink.optimizer.costs.CostEstimator" and
> "org.apache.flink.optimizer.DataStatistics". But for Stream I only found
> the creation of the plan. How can I debug that? Or have a better
> understanding of what Flink is doing. Do you advise me to read some other
> reference about this?
>
> Kind Regards,
> Felipe
>
> [1] Group-aware scheduling for Flink -
> https://docs.google.com/document/d/1q7NOqt05HIN-PlKEEPB36JiuU1Iu9fnxxVGJzylhsxU/edit#heading=h.k15nfgsa5bnk
> [2] Unified Core API for Streaming and Batch -
> https://docs.google.com/document/d/1G0NUIaaNJvT6CMrNCP6dRXGv88xNhDQqZFrQEuJ0rVU/edit#
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/batch/MatrixMultiplication.java
> [4]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/SensorsReadingMqttJoinQEP.java
>
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: Change Window Size during runtime

2019-01-03 Thread Chesnay Schepler

You can't change the window size at runtime.

On 03.01.2019 00:54, Rad Rad wrote:

Hi All,

I have one stream is consumed by FlinkKafkaConsumer which will be joined
with another stream for defined window size such as
Time.milliseconds(1). How can I change window size during runtime to
Time.milliseconds(2)?


Stream1.join(Stream2)
.where(new SingleValueSensorKeySelector())
.equalTo(new GPSKeySelector())

.window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
.apply(joinStreamFunc).addSink(kafkaProducer);




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Flink error reading file over network (Windows)

2019-01-03 Thread Chesnay Schepler

Yes, you'll need to create your own InputFormat that understands SMB.

On 03.01.2019 08:26, miki haiat wrote:

Hi,

Im trying to read a csv file from windows shard drive.
I tried numbers option but i failed.

I cant find an option to use SMB format,
so im assuming that create my own input format is the way to achieve 
that ?


What is the correct way to read file from windows network ?.

Thanks,

Miki






Re: Problem building 1.7.1 with scala-2.12

2019-01-03 Thread Chesnay Schepler
When building Flink for scala 2.12 you have to pass "-Dscala-2.12" to 
maven; see the flink-connector-kafka-0.9 pom for details. (look for the 
scala-2.11 profile)


On 02.01.2019 17:48, Cliff Resnick wrote:
The build fails at flink-connector-kafka-0.9 because _2.12 libraries 
apparently do not exist for kafka < 0.10. Any help appreciated!



-Cliff





Re: Is there a better way to debug ClassNotFoundException from FlinkUserCodeClassLoaders?

2019-01-03 Thread Timo Walther

Hi Hao,

which Flink version are you using? What do you mean with "suddenly", did 
it work before?


Regards,
Timo


Am 03.01.19 um 07:13 schrieb Hao Sun:
Yep, javap shows the class is there, but FlinkUserCodeClassLoaders 
somehow could not find it suddenly


javap -cp /opt/flink/lib/zendesk-fps-core-assembly-0.1.0.jar 
'com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45'

Compiled from "ConnectedStreams.scala"
public final class 
com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45 
extends 
org.apache.flink.api.scala.typeutils.CaseClassSerializer 
{
public com.zendesk.fraudprevention.datatypes.MaxwellEvent 
createInstance(java.lang.Object[]);
public 
org.apache.flink.api.scala.typeutils.CaseClassSerializer 
createSerializerInstance(java.lang.Class, 
org.apache.flink.api.common.typeutils.TypeSerializer[]);
public org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase 
createSerializerInstance(java.lang.Class, 
org.apache.flink.api.common.typeutils.TypeSerializer[]);

public java.lang.Object createInstance(java.lang.Object[]);
public 
com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45(com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90, 
org.apache.flink.api.common.typeutils.TypeSerializer[]);

}

Hao Sun
Team Lead
1019 Market St. 7F
San Francisco, CA 94103


On Wed, Jan 2, 2019 at 6:04 PM qi luo > wrote:


Hi Hao,

Since Flink is using Child-First class loader, you may try search
for the class
"*com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45”
in your fat JAR. Is that an inner class?*
*
*
*Best,*
*Qi*


On Jan 3, 2019, at 7:01 AM, Hao Sun mailto:ha...@zendesk.com>> wrote:

Hi,

I am wondering if there are any protips to figure out what class
is not found?

= Logs 
org.apache.flink.streaming.runtime.tasks.StreamTaskException:
Could not instantiate chained outputs.
at

org.apache.flink.streaming.api.graph.StreamConfig.getChainedOutputs(StreamConfig.java:324)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:292)
at

org.apache.flink.streaming.runtime.tasks.OperatorChain.(http://OperatorChain.java:133)
at

org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)
*Caused by: java.lang.ClassNotFoundException:
com.zendesk.fraudprevention.examples.ConnectedStreams$$anon$90$$anon$45*
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at

org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:129)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at

org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:77)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1866)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1749)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2040)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at java.util.ArrayList.readObject(ArrayList.java:797)
at sun.reflect.GeneratedMethodAccessor26.invoke(Unknown Source)
at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at