AW: REST Interface to JobManager

2016-09-22 Thread Dominique Rondé


Hi Curtis,
we implemented this today. But without a REST-Interface. We transfer out 
artifacts and a script with a scp call from out Bamboo server and execute the 
script. This script kills the yarn application, start a new flink application 
in yarn and submit all routes to the cluster.
Works pretty well... ;) Just in case...
GreetsDominique


Von meinem Samsung Gerät gesendet.

 Ursprüngliche Nachricht 
Von: Curtis Wilde  
Datum: 22.09.16  18:49  (GMT+01:00) 
An: user@flink.apache.org 
Betreff: REST Interface to JobManager 



I would like to be able to use Jenkins to deploy jobs to Flink.
I’ve seen talk of a REST interface that might allow me to do this
https://issues.apache.org/jira/browse/FLINK-1228
Is there any documentation around this feature?
 



Rich Window Function - When does close(tear down) method executes ?

2016-09-22 Thread Swapnil Chougule
I am using rich window function in my streaming project. I want "close"
method to get triggered after each window interval.
In my case, open gets executed life time once & close method doesn't get
executed ?

Can anybody help to sort out same ? I want tear down method after each
window interval.

Thanks,
Swapnil


Re: How can I prove ....

2016-09-22 Thread amir bahmanyari
Hi Again, following is from the dashboard while wverything is supposedlt 
running.No real-time change in send/received/#of records...but one node is 
definitely producing a *.out file...And all TMs are reporting in their *.log 
files. And the process will eventually end , but very slow.Thanks again 
Aljoscha.



  From: amir bahmanyari 
 To: Aljoscha Krettek ; User  
 Sent: Thursday, September 22, 2016 9:16 AM
 Subject: Re: How can I prove 
   
Thanks Aljoscha,Thats why I am wondering about this. I dont see send/receive 
columns change at alljust 0's all the time.The only thing that changes is 
time stamp.Is this an indication that the nodes in the cluster are not 
participating in execution of the data?Thanks again.Amir-

  From: Aljoscha Krettek 
 To: amir bahmanyari ; User  
 Sent: Thursday, September 22, 2016 5:01 AM
 Subject: Re: How can I prove 
  
Hi,depending on the data source you might not be able to stress CPU/MEM because 
the source might be to slow. As long as you see the numbers increasing in the 
Flink Dashboard for all operators you should be good.
Cheers,Aljoscha
On Thu, 22 Sep 2016 at 00:26 amir bahmanyari  wrote:

That all nodes in a Flink Cluster are involved simultaneously in processing the 
data?Programmatically, graphically...I need to stress CPU , MEM and all 
resources to their max.How can I guarantee this is happening in Flink 
Cluster?Out of 4 nodes, this is the highest resource usage I see from 
"top"...Everything else is not even close...top - 22:22:45 up 41 days,  2:39,  
1 user,  load average: 1.76, 1.55, 1.28Tasks: 344 total,   1 running, 343 
sleeping,   0 stopped,   0 zombie%Cpu(s):  5.4 us,  1.0 sy,  0.0 ni, 93.5 id,  
0.1 wa,  0.0 hi,  0.0 si,  0.0 stKiB Mem:  11551564+total, 65702020 used, 
49813632 free,   115072 buffersKiB Swap:        0 total,        0 used,        
0 free.  3148420 cached Mem

I am pretty sure I can push FlinkRunner to way more extent than thisAnd 
thats where true realistic perf numbers start showing up.Thanks+regards,Amir-


   

   

REST Interface to JobManager

2016-09-22 Thread Curtis Wilde
I would like to be able to use Jenkins to deploy jobs to Flink.
I’ve seen talk of a REST interface that might allow me to do this 
https://issues.apache.org/jira/browse/FLINK-1228
Is there any documentation around this feature?



Re: How can I prove ....

2016-09-22 Thread amir bahmanyari
Thanks Aljoscha,Thats why I am wondering about this. I dont see send/receive 
columns change at alljust 0's all the time.The only thing that changes is 
time stamp.Is this an indication that the nodes in the cluster are not 
participating in execution of the data?Thanks again.Amir-

  From: Aljoscha Krettek 
 To: amir bahmanyari ; User  
 Sent: Thursday, September 22, 2016 5:01 AM
 Subject: Re: How can I prove 
   
Hi,depending on the data source you might not be able to stress CPU/MEM because 
the source might be to slow. As long as you see the numbers increasing in the 
Flink Dashboard for all operators you should be good.
Cheers,Aljoscha
On Thu, 22 Sep 2016 at 00:26 amir bahmanyari  wrote:

That all nodes in a Flink Cluster are involved simultaneously in processing the 
data?Programmatically, graphically...I need to stress CPU , MEM and all 
resources to their max.How can I guarantee this is happening in Flink 
Cluster?Out of 4 nodes, this is the highest resource usage I see from 
"top"...Everything else is not even close...top - 22:22:45 up 41 days,  2:39,  
1 user,  load average: 1.76, 1.55, 1.28Tasks: 344 total,   1 running, 343 
sleeping,   0 stopped,   0 zombie%Cpu(s):  5.4 us,  1.0 sy,  0.0 ni, 93.5 id,  
0.1 wa,  0.0 hi,  0.0 si,  0.0 stKiB Mem:  11551564+total, 65702020 used, 
49813632 free,   115072 buffersKiB Swap:        0 total,        0 used,        
0 free.  3148420 cached Mem

I am pretty sure I can push FlinkRunner to way more extent than thisAnd 
thats where true realistic perf numbers start showing up.Thanks+regards,Amir-


   

Re: Sharing Java Collections within Flink Cluster

2016-09-22 Thread Chakravarthy varaga
Hi Aljoscha & Fabian,

Finally I got this working. Thanks for your help. In terms persisting
the state (for S2), I tried to use checkpoint every 10 Secs using a
FsStateBackend... What I notice is that the checkpoint duration is  almost
2 minutes for many cases, while for the other cases it varies from 100 ms
to 1.5 minutes frequently.

The pseudocode is as below:

 KeyedStream ks1 = ds1.keyBy("*") ;
 KeyedStream, String> ks2 = ds2.flatMap(split T into
k-v pairs).keyBy(0);

 ks1.connect(ks2).flatMap(X);
 //X is a CoFlatMapFunction that inserts and removes elements from ks2
into a key-value state member. Elements from ks1 are matched against that
state.

 //ks1 is streaming about 100K events/sec from kafka topic
 //ks2 is streaming about 1 event every 10 minutes... Precisely when
the 1st event is consumed from this stream, checkpoint takes 2 minutes
straightaway.

The version of flink is 1.1.2

 Best Regards
CVP

On Tue, Sep 13, 2016 at 7:29 AM, Aljoscha Krettek 
wrote:

> Hi,
> you don't need the BlockedEventState class, you should be able to just do
> this:
>
> private transient ValueState blockedRoads;
>  
>   @Override
> public void open(final org.apache.flink.configuration.Configuration
> parameters) throws Exception {
> final ValueStateDescriptor blockedStateDesc =
> new ValueStateDescriptor("BlockedEventStates",
> TypeInformation.of(BlockedRoadInfo.class), null);
> blockedRoads = getRuntimeContext().getState(blockedStateDesc);
>
> };
>
>   }
>
> Cheers,
> Aljoscha
>
>
> On Mon, 12 Sep 2016 at 16:24 Chakravarthy varaga 
> wrote:
>
>> Hi Fabian,
>>
>> I'm coding to check if your proposal works and hit with an issue with
>> ClassCastException.
>>
>>
>> // Here is my Value that has state information.an implementation
>> of my value state... where the key is a Double value... on connected stream
>> ks2
>>
>> public class BlockedEventState implements ValueState
>> {
>>
>> public BlockedRoadInfo blockedRoad;
>>
>> @Override
>> public void clear() {
>> blockedRoad = null;
>>
>> }
>>
>> @Override
>> public BlockedRoadInfo value() throws IOException {
>> return blockedRoad;
>> }
>>
>> @Override
>> public void update(final BlockedRoadInfo value) throws IOException {
>> blockedRoad = value;
>> }
>> }
>>
>>//BlockedRoadInfo class...
>> public class BlockedRoadInfo {
>> long maxLink;
>> long minLink;
>> double blockedEventId;
>> setters & ... getters
>> }
>>
>> /// new RichCoFlatMapFunction() {
>>
>> private transient BlockedEventState blockedRoads;
>>  
>>   @Override
>> public void open(final org.apache.flink.configuration.Configuration
>> parameters) throws Exception {
>> final ValueStateDescriptor blockedStateDesc =
>> new ValueStateDescriptor("BlockedEventStates",
>> TypeInformation.of(BlockedRoadInfo.class), null);
>> blockedRoads = (BlockedEventState) getRuntimeContext().getState(
>> blockedStateDesc); * // FAILS HERE WITH CLASSCAST*
>>
>> };
>>
>>   }
>>
>>
>>
>>
>> *Caused by: java.lang.ClassCastException:
>> org.apache.flink.runtime.state.filesystem.FsValueState cannot be cast to
>> com.ericsson.components.aia.iot.volvo.state.BlockedEventState*
>>
>>
>>
>> *I have tried to set the state backend to both MemState and
>> FsState...streamEnv.setStateBackend(new
>> FsStateBackend("file:///tmp/flink/checkpoints"));*
>>
>>
>>
>> On Thu, Sep 8, 2016 at 10:10 PM, Fabian Hueske  wrote:
>>
>>> Not sure if I got your requirements right, but would this work?
>>>
>>> KeyedStream ks1 = ds1.keyBy("*") ;
>>> KeyedStream, String> ks2 = ds2.flatMap(split T into
>>> k-v pairs).keyBy(0);
>>>
>>> ks1.connect(ks2).flatMap(X)
>>>
>>> X is a CoFlatMapFunction that inserts and removes elements from ks2 into
>>> a key-value state member. Elements from ks1 are matched against that state.
>>>
>>> Best, Fabian
>>>
>>> 2016-09-08 20:24 GMT+02:00 Chakravarthy varaga >> >:
>>>
 Hi Fabian,

  First of all thanks for all your prompt responses. With regards to
 2) Multiple looks ups, I have to clarify what I mean by that...

  DS1 elementKeyStream  = stream1.map(String<>); this maps
 each of the streaming elements into string mapped value...
  DS2  = stream2.xxx(); // where stream2 is a kafka source
 stream, as you proposed.. xxx() should be my function() which splits the
 string and generates key1:, key2:, key3:
 keyN:

  Now,
 I wish to map elementKeyStream with look ups within (key1,
 key2...keyN) where key1, key2.. keyN and their respective values should be
 available across the cluster...

Re: RawSchema as deserialization schema

2016-09-22 Thread Stephan Ewen
/cc Robert, he is looking into extending the Kafka Connectors to support
more of Kafka's direct utilities

On Thu, Sep 22, 2016 at 3:17 PM, Swapnil Chougule 
wrote:

> It will be good to have RawSchema as one of the deserialization schema in
> streaming framework (like SimpleStringSchema).
> Many use cases needs data in byte array format after reading from source
> like kafka.
>
> Any inputs for same ?
>
> On Mon, Sep 12, 2016 at 11:42 AM, Swapnil Chougule <
> the.swapni...@gmail.com> wrote:
>
>> Thanks Maximilian. I implemented same & it worked for me. I was under
>> impression that RawSchema is available from flink.
>>
>> Regards,
>> Swapnil
>>
>> On Mon, Sep 5, 2016 at 8:48 PM, Maximilian Michels 
>> wrote:
>>
>>> Just implement DeserializationSchema and return the byte array from
>>> Kafka. Byte array serialization poses no problem to the Flink
>>> serialization.
>>>
>>> On Mon, Sep 5, 2016 at 3:50 PM, Swapnil Chougule
>>>  wrote:
>>> > I am using Kafka consumer in flink 1.1.1 with Kafka 0.8.2. I want to
>>> read
>>> > byte array as it is in datastream. I tried to use RawSchema as
>>> > desrialization schema but couldn't find same 1.1.1.
>>> > I want to know whether I have to write my custom  implementation for
>>> same ?
>>> > Can somebody help me to sort out same ?
>>> >
>>> > Also passing byte[] to next operator is supported as far as
>>> serialization is
>>> > concerned ?
>>>
>>
>>
>


Re: Daily/hourly TumblingEventTimeWindows

2016-09-22 Thread Maximilian Bode
I have just noticed that this is exactly what it currently does. Reading the 
docs I assumed all windows would be of the same size.

> Am 22.09.2016 um 13:35 schrieb Maximilian Bode :
> 
> Hi everyone,
> 
> is there an easy way to implement a tumbling event time window that tumbles 
> at a certain time? Examples could be daily or hourly (tumbling at exactly 
> 00:00, 01:00, 02:00 etc.) windows.
> 
> So in particular, for a daily window, the first window would be shorter than 
> the rest, tumble at midnight and after that it would basically be the same as 
> a regular 24h TumbilngEventTimeWindow.
> 
> Cheers,
> Max



Daily/hourly TumblingEventTimeWindows

2016-09-22 Thread Maximilian Bode
Hi everyone,

is there an easy way to implement a tumbling event time window that tumbles at 
a certain time? Examples could be daily or hourly (tumbling at exactly 00:00, 
01:00, 02:00 etc.) windows.

So in particular, for a daily window, the first window would be shorter than 
the rest, tumble at midnight and after that it would basically be the same as a 
regular 24h TumbilngEventTimeWindow.

Cheers,
Max

Re: Custom(application) Metrics - Piggyback on Flink's metrics infra or not?

2016-09-22 Thread Chesnay Schepler
Actually i was wrong on the UDF point. By variables i meant the 
information that is encoded in the scope, like the subtask index, task 
name, taskmanager ID etc., however all these can be accessed from the 
MetricGroup that is returned by RuntimeContext#getMetricGroup(), which 
you can of course use in your UDF.


On 22.09.2016 05:47, Eswar Reddy wrote:
Thank you Chesnay. Good to know there are few wrappers available to 
get best of both worlds. I may mostly go without piggybacking though 
to have more control and learning for now, but I will keep an eye for 
new benefits I will get in future via piggybacking.
The UDF point looks like a deal breaker, I will spend some more time 
understanding it( we can get Flink's runtime using this 
 
inside the UDF so by 'variables' you must have meant the metrics 
object that gets passed around)


@Sumit, Adding backend writers(reporter) is as much simple in 
DropWizard as well. Thanks for bringing it up though.


Thanks,
Eswar.

On Tue, Sep 20, 2016 at 11:33 PM, Chawla,Sumit > wrote:


In addition, It supports enabling multiple Reporters.  You can
have same data pushed to multiple systems.  Plus its very easy to
write new reporter for doing any customization.


Regards
Sumit Chawla


On Tue, Sep 20, 2016 at 2:10 AM, Chesnay Schepler
> wrote:

Hello Eswar,

as far as I'm aware the general structure of the Flink's
metric system is rather similar to DropWizard. You can use
DropWizard metrics by creating a simple wrapper, we even ship
one for Histograms. Furthermore, you can also use DropWizard
reporters, you only have to extend the DropWizardReporter
class, essentially providing a factory method for your reporter.

Using Flinks infrastructure provides the following benefits:
* better resource usage, as only a single reporter instance
per taskmanager exists
* access to system metrics
* namespace stuff; you cannot access all variables yourselves
from a UDF without modifying the source of Flink; whether this
is an advantage is of course dependent on what you are
interested in

Regards,
Chesnay


On 20.09.2016 08:29, Eswar Reddy wrote:

Hi,

I see Flink support's built-in metrics to monitor various
components of Flink. In addition, one can register
application specific(custom) metrics to Flink's built-in
metrics infra. The problem with this is user has to develop
his custom metrics using Flink's metrics framework/API rather
than a generic framework such as dropwizard. Alternatively,
user can follow this


approach where his   dropwizard metrics push code is
co-located with actual app code within each Task and metrics
are directly pushed to a backend writer(say, Graphite) from
each Task.

In this alternative, I am aware of having to handle mapping
spatial granularity of Flink's run-time with metrics
namespace, but doing it myself should not a big effort.
Fault-tolerance comes automatically since app code and
metrics push code are co-located in the Task. Is there
anything else Flink's metrics infra handles automatically?
Based on this I'd weigh using good old dropwizard vs Flink
specific metrics framework.

Finally, I guess feasibility an automatic
dropwizard-to-flinkmetrics translation utility can be checked
out, but I would like to first understand additional benefits
of using flink's infra for custom metrics.

Thanks,
Eswar.








Re: Flink JDBCOutputFormat logs wrong WARN message

2016-09-22 Thread Swapnil Chougule
Hi Fabian/ Chesnay
Can anybody give me permission to assign JIRA (created for same.)?

Thanks,
Swapnil

On Tue, Sep 20, 2016 at 6:18 PM, Swapnil Chougule 
wrote:

> Thanks Chesnay & Fabian for update.
> I will create JIRA issue & open a pull request to fix it.
>
> Thanks,
> Swapnil
>
> On Tue, Sep 20, 2016 at 2:54 PM, Fabian Hueske  wrote:
>
>> Yes, the condition needs to be fixed.
>>
>> @Swapnil, would you like to create a JIRA issue and open a pull request
>> to fix it?
>>
>> Thanks, Fabian
>>
>> 2016-09-20 11:22 GMT+02:00 Chesnay Schepler :
>>
>>> I would agree that the condition should be changed.
>>>
>>>
>>> On 20.09.2016 10:52, Swapnil Chougule wrote:
>>>
 I checked following code in Flink JDBCOutputFormat while I was using in
 my project work. I found following snippet:

 @Override
 public void writeRecord(Row row) throws IOException {

 if (typesArray != null && typesArray.length > 0 &&
 typesArray.length == row.productArity()) {
 LOG.warn("Column SQL types array doesn't match arity of
 passed Row! Check the passed array...");
 }
 ...
 }

 I am finding this "if" condition wrong. It should be

 if (typesArray != null && typesArray.length > 0 && typesArray.length !=
 row.productArity())

 As a result, it is logging wrong warning in my logs which is incorrect.
 (Even if typearray matches arity of passed row)
 May I have inputs on same ?

 Thanks,
 Swapnil

>>>
>>>
>>
>


Re: Simple batch job hangs if run twice

2016-09-22 Thread Robert Metzger
Can you try running with DEBUG logging level?
Then you should see if input splits are assigned.
Also, you could try to use a debugger to see what's going on.

On Mon, Sep 19, 2016 at 2:04 PM, Yassine MARZOUGUI <
y.marzou...@mindlytix.com> wrote:

> Hi Chensey,
>
> I am running Flink 1.1.2, and using NetBeans 8.1.
> I made a screencast reproducing the problem here: http://recordit.co/
> P53OnFokN4 .
>
> Best,
> Yassine
>
>
> 2016-09-19 10:04 GMT+02:00 Chesnay Schepler :
>
>> No, I can't recall that i had this happen to me.
>>
>> I would enable logging and try again, as well as checking whether the
>> second job is actually running through the WebInterface.
>>
>> If you tell me your NetBeans version i can try to reproduce it.
>>
>> Also, which version of Flink are you using?
>>
>>
>> On 19.09.2016 07:45, Aljoscha Krettek wrote:
>>
>> Hmm, this sound like it could be IDE/Windows specific, unfortunately I
>> don't have access to a windows machine. I'll loop in Chesnay how is using
>> windows.
>>
>> Chesnay, do you maybe have an idea what could be the problem? Have you
>> ever encountered this?
>>
>> On Sat, 17 Sep 2016 at 15:30 Yassine MARZOUGUI 
>> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> Thanks for your response. By the first time I mean I hit run from the
>>> IDE (I am using Netbeans on Windows) the first time after building the
>>> program. If then I stop it and run it again (without rebuidling) It is
>>> stuck in the state RUNNING. Sometimes I have to rebuild it, or close the
>>> IDE to be able to get an output. The behaviour is random, maybe it's
>>> related to the IDE or the OS and not necessarily Flink itself.
>>>
>>> On Sep 17, 2016 15:16, "Aljoscha Krettek"  wrote:
>>>
 Hi,
 when is the "first time". It seems you have tried this repeatedly so
 what differentiates a "first time" from the other times? Are you closing
 your IDE in-between or do you mean running the job a second time within the
 same program?

 Cheers,
 Aljoscha

 On Fri, 9 Sep 2016 at 16:40 Yassine MARZOUGUI <
 y.marzou...@mindlytix.com> wrote:

> Hi all,
>
> When I run the following batch job inside the IDE for the first time,
> it outputs results and switches to FINISHED, but when I run it again it is
> stuck in the state RUNNING. The csv file size is 160 MB. What could be the
> reason for this behaviour?
>
> public class BatchJob {
>
> public static void main(String[] args) throws Exception {
> final ExecutionEnvironment env = ExecutionEnvironment.getExecut
> ionEnvironment();
>
> env.readCsvFile("dump.csv")
> .ignoreFirstLine()
> .fieldDelimiter(";")
> .includeFields("111000")
> .types(String.class, String.class, String.class)
> .first(100)
> .print();
>
> }
> }
>
> Best,
> Yassine
>

>>
>


Re: how to unit test streaming window jobs?

2016-09-22 Thread Robert Metzger
Hi Luis,

using Event Time windows, you should be able to generate some test data and
get predictable results.
Flink is internally using similar tests to ensure correctness of the
windowing implementation (for example
the EventTimeWindowCheckpointingITCase).

Regards,
Robert

On Mon, Sep 12, 2016 at 6:27 PM, Luis Mariano Guerra <
mari...@event-fabric.com> wrote:

> hi,
>
>  is there a way to test window jobs?
>
> I would like to build the job, give some inputs, "fast forward" to the
> next window, collect the results and assert them.
>


emit watermarks

2016-09-22 Thread Radu Tudoran

Hi,

Is there some way to emit a watermark in the trigger?

I see that in the evictor there is the option to check the StreamRecord 
if it is a watermark..so I would hope that there is some option also to create 
them



Re: Flink job fails to restore RocksDB state after upgrading to 1.2-SNAPSHOT

2016-09-22 Thread Stefan Richter
Hi,

to me, this looks like you are running into the problem described under 
[FLINK-4603] : KeyedStateBackend cannot restore user code classes. I have 
opened a pull request (PR 2533) this morning that should fix this behavior as 
soon as it is merged into master.

Best,
Stefan

> Am 21.09.2016 um 23:49 schrieb Josh :
> 
> Hi Stephan,
> 
> Thanks for the reply. I should have been a bit clearer but actually I was not 
> trying to restore a 1.0 savepoint with 1.2. I ran my job on 1.2 from scratch 
> (starting with no state), then took a savepoint and tried to restart it from 
> the savepoint - and that's when I get this exception. If I do this with the 
> same job using an older version of Flink (1.1-SNAPSHOT taken in June), the 
> savepoint and restore works fine.
> 
> I wanted to use 1.2-SNAPSHOT because it has some changes I'd like to use 
> (improvements to Kinesis connector + the bucketing sink). Anyway for now I 
> have things working with an older version of Flink - but it would be good to 
> know what's changed recently that's causing the restore to break and if my 
> job is not going to be compatible with future releases of Flink.
> 
> Best,
> Josh
> 
> On Wed, Sep 21, 2016 at 8:21 PM, Stephan Ewen  > wrote:
> Hi Josh!
> 
> The state code on 1.2-SNAPSHOT is undergoing pretty heavy changes right now, 
> in order to add the elasticity feature (change parallelism or running jobs 
> and still maintaining exactly once guarantees).
> At this point, no 1.0 savepoints can be restored in 1.2-SNAPSHOT. We will try 
> and add compatibility towards 1.1 savepoints before the release of version 
> 1.2.
> 
> I think the exception is probably caused by the fact that old savepoint 
> stored some serialized user code (the new one is not expected to) which 
> cannot be loaded.
> 
> Adding Aljoscha and Stefan to this, to see if they can add anything.
> In any case, this should have a much better error message.
> 
> I hope you understand that the 1.2-SNAPSHOT version is in some parts WIP, so 
> not really recommended for general use.
> 
> Does version 1.1 not work for you?
> 
> Greetings,
> Stephan
> 
> 
> On Wed, Sep 21, 2016 at 7:44 PM, Josh  > wrote:
> Hi,
> 
> I have a Flink job which uses the RocksDBStateBackend, which has been running 
> on a Flink 1.0 cluster. 
> 
> The job is written in Scala, and I previously made some changes to the job to 
> ensure that state could be restored. For example, whenever I call `map` or 
> `flatMap` on a DataStream, I pass a named class: `.flatMap(new 
> MyCustomFlatMapper())` instead of an anonymous function.
> 
> I've been testing the job on Flink 1.2-SNAPSHOT, and I am no longer able to 
> restore state. I'm seeing exceptions which look like this when trying to 
> restore from a savepoint:
> 
> java.lang.RuntimeException: Could not initialize keyed state backend. 
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.open(AbstractStreamOperator.java:148)
> Caused by: java.lang.ClassNotFoundException: 
> com.joshfg.flink.job.MyJob$MyCustomFlatMapper$$anon$4$$anon$2
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBRestoreOperation.restoreKVStateMetaData(RocksDBKeyedStateBackend.java:653)
> 
> I'm not passing any anonymous functions to `map` or `flatMap` on Flink 
> DataStreams, so it looks like this exception is caused just from using Scala 
> functions like `filter`, `map`, `flatMap` on standard Scala collections, 
> within my class `MyCustomFlatMapper`.
> 
> Are there any changes to the way Flink state is restored or to 
> RocksDBStateBackend, in the last 2-3 months, which could cause this to happen?
> If so, any advice on fixing it? 
> 
> I'm hoping there's a better solution to this than rewriting the Flink job in 
> Java.
> 
> Thanks,
> 
> Josh
> 
> 
>