Re: How to get kafka partition ID?

2018-01-16 Thread Yuta Morisawa

Hi Gordon

Thanks a lot!
So far I used AbstractDeserializationSchema.
I will try the class you mentioned.

Regards
On 2018/01/17 2:48, Gordon Weakliem wrote:
If you pass a KeyedDeserializationSchema to 
new FlinkKafkaConsumer08(topic, keyedDeserializationSchema, properties), 
you'll implement a method like this:


     public T deserialize(byte[] messageKey, byte[] message, String 
topic, int partition, long offset) throws IOException {

     }

Then just make T a type that contains the partition as a field so 
operators down the pipeline can access that field.



On Tue, Jan 16, 2018 at 12:11 AM, Yuta Morisawa 
> wrote:


Hi

I want to apply a window function simply on data from each
kafka-partition(I mean I don't need any grouping and just want to
process data parallely).
I think the best way to do so is drawing kafka partition id and use
it for keyBy function.
For example,

val data = stream.keyBy(kafka-partitionID)
                  .window(...)
                  .fold(...)

But, I could not find the way.
How can I get the kafka-partition ID in Flink code?


--
Regards,
Yuta




--
Img 
*  Gordon Weakliem*| Sr. Software Engineer

*O *303.493.5490
*  Boulder*|NYC | London 






CONFIDENTIALITY. This communication is intended only for the use of the 
intended recipient(s) and contains information that is privileged and 
confidential. As a recipient of this confidential and proprietary 
information, you are prohibited from distributing this information 
outside of sovrn. Further, if you are not the intended recipient, please 
note that any dissemination of this communication is prohibited. If you 
have received this communication in error, please erase all copies of 
the message, including all attachments, and please also notify the 
sender immediately. Thank you for your cooperation.




State backend questions

2018-01-16 Thread Christophe Jolif
Hi all,

At first my state should not be "that" big and fit in memory, so
FsStateBackend could be a solution for me. However moving forward I
envision more features and more users and the state growing. With that in
mind RocksDBStateBackend might be the solution.

Is there an easy "upgrade" path from one to another? In other words is
there an easy path to "move" the state from one backend to another one and
restart the job from there if the need arise or should I definitely plan
long ahead and use RocksDB right away if I don't want to get into trouble?

If yes, how much configuration is available at Flink level to configure
memory used by RocksDB to cache data without having to go back to disk so
that I don't penalize too much the current use-cases?

Thanks,
-- 
Christophe


Re: History Server

2018-01-16 Thread Eron Wright
As a follow-up question, how well does the history server work for
observing a running job?   I'm trying to understand whether, in the
cluster-per-job model, a user would be expected to hop from the Web UI to
the History Server once the job completed.

Thanks

On Wed, Oct 4, 2017 at 3:49 AM, Stephan Ewen  wrote:

> To add to this:
>
> The History Server is mainly useful in cases where one runs a
> Flink-cluster-per-job. One the job finished, the processes disappear. The
> History Server should be longer lived to make past executions' stats
> available.
>
> On Mon, Sep 25, 2017 at 3:44 PM, Nico Kruber 
> wrote:
>
>> Hi Elias,
>> in theory, it could be integrated into a single web interface, but this
>> was
>> not done so far.
>> I guess the main reason for keeping it separate was probably to have a
>> better
>> separation of concerns as the history server is actually independent of
>> the
>> current JobManager execution and merely displays previous job results
>> which
>> may also come from different or previously existing JobManager instances
>> which
>> stored history data in its storage directory.
>>
>> Chesnay (cc'd) may elaborate a bit more in case you'd like to change that
>> and
>> integrate the history server (interface) into the JobManager.
>>
>>
>> Nico
>>
>> On Sunday, 24 September 2017 02:48:40 CEST Elias Levy wrote:
>> > I am curious, why is the History Server a separate process and Web UI
>> > instead of being part of the Web Dashboard within the Job Manager?
>>
>>
>>
>


Re: Timestamps and watermarks in CoProcessFunction function

2018-01-16 Thread Eron Wright
Consider the watermarks that are generated by your chosen watermark
generator as an +assertion+ about the progression of time, based on domain
knowledge, observation of elements, and connector specifics.  The generator
is asserting that any elements observed after a given watermark will come
later in event time, e.g. "we've reached 12:00 PM; subsequent events will
have a timestamp greater than 12:00 PM".

Your specific output seems fine to me.  It reads like, "event @ 11:59,
watermark @ 12:00, event @ 12:02, watermark @ 12:01, event @ 12:03".  The
watermark assertion wasn't violated in this situation.

Some operators provide special "late event" handling logic for the
situation that the assertion is violated.   The process function is quite
flexible, providing timers to observe the progression of time (due to
watermarks), and making it possible to handle late events as you see fit.
Often a process function will buffer events until a certain time is reached.

Hope this helps!
Eron

On Tue, Jan 16, 2018 at 8:51 AM, William Saar  wrote:

>
> Hi,
> I have added the code below to the start of processElement2 in
> CoProcessFunction. It prints timestamps and watermarks for the first 3
> elements for each new watermark. Shouldn't the timestamp always be lower
> than the next watermark? The 3 timestamps before the last watermark are all
> larger than the watermark time
>
> The output I get is
> wm -9223372036854775808
> ts 1478815851242
> ts 1478816075096
> ts 1478816114186
> wm 1478822353934
> ts 1478835814359
> ts 1478835083219
> ts 1478836126621
> wm 1478827220420
> ts 1478836408336
> ts 1478836469247
> ts 1478836759959
>
> if (getRuntimeContext.getIndexOfThisSubtask == 0) {
> if (context.timerService().currentWatermark() != printedWatermark) {
> printedWatermark = context.timerService().currentWatermark()
> println("wm " + printedWatermark)
> n = 0
>   } else {
> n += 1
>   }
> if (n < 3) {
> println("ts " + context.timestamp())
>   }
> }
>
>
>
>
>


flowable <-> flink integration

2018-01-16 Thread Martin Grofčík
Hi,

I want to implement flowable (BPMN platform  - www.flowable.org) <-> flink
integration module. The motivation is to execute process simulations with
flink (simple simulation experiment example
https://gromar01.wordpress.com/2017/11/07/will-we-meet-our-kpis/). I was
able to create


Flink provides REST API through which I can easily create a job and monitor
its execution.
wordCountProcess.PNG
(15K)



at the end I can encapsulate whole process into one task (e.g. Execute
flink job) which will do the same in java code.
In fact I have no experience with flink that's why I can imagine only
process steps to:
1. create flink job
2. monitor its state

Question 1:
Can you propose another useful process steps? (e.g.  to download results,
upload datasets, .)
(Provide me a link how I can proceed with their implementation, please)

Question 2:
The problem with the process is that it is always checking job state. I
would prefer to add a hook at the end of flink job execution to call
flowable rest API to notify process instance about the job finished
(failed) events.
The way which I have found is to implement rest end
point org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler which
calls flowable rest api at the end of flink job execution.
What I would prefer is to make something like wrapper around the main class
to execute flowable rest call at the end.
Can you provide me a hint how to implement this wrapper please?

Thank you in advance for the answer.

Regards
Martin


Canary deploys and Flink?

2018-01-16 Thread Ron Crocker
A question came up from my colleagues about canary deploys and Flink. We had a 
hard time understanding how we could do a canary deploy without constructing a 
new cluster and deploying the job there. If you have a canary deploy model, how 
do you do this? 

Thanks for your help!

Ron

Problem while debugging a python job

2018-01-16 Thread Mathias Peters
Hi all,

I'm trying to debug a python script with a flink job in using Intellij.
I'm using the current snapshot (1.5 cloned today). In former versions, I
could simply run org.apache.flink.python.api.PythonPlanBinder from
within the IDE. Atm, I'm getting NoClassDefFoundError s from classes of
the core module (e.g. FSDataOutputStream). I imported the fresh project
as desribed on your website and I ran mvn clean package several times.

Is this a known issue or am I missing something?

Thanks in advance and cheers

Mathias



Re: SideOutput doesn't receive anything if filter is applied after the process function

2018-01-16 Thread Chen Qin
Thanks Chesnay,

So I think to support multi input and multiple output model like data flow 
paper indicates, Flink needs to get credit based scheduling as well as side 
input ready and doing a new set of data stream apis that doesn’t constrained 
with backwards compatibility issues. Only then can we pass through side outputs 
to next operator and consumer can decide what to do with it.

Yes, it might be too far to reach but that seems the one of directions 
community can consider.

Chen


> On Jan 16, 2018, at 5:18 AM, Chesnay Schepler  wrote:
> 
> I've opened https://issues.apache.org/jira/browse/FLINK-8437
> 
> Unfortunately i doubt we can fix this properly. The proposed solution will 
> not work if we ever allow arbitrary functions to use side-outputs.
> 
>> On 16.01.2018 08:59, Juho Autio wrote:
>> Could someone with knowledge of the right terms create this in JIRA, please? 
>> I guess I could also create it if needed..
>> 
>>> On Mon, Jan 15, 2018 at 3:15 PM, Chesnay Schepler  
>>> wrote:
>>> yes, i meant that process() returns the special operator. This would 
>>> definitely deserve a JIRA issue.
>>> 
>>> 
>>> On 15.01.2018 14:09, Juho Autio wrote:
 Thanks for the explanation. Did you meant that process() would return a 
 SingleOutputWithSideOutputOperator?
 
 Any way, that should be enough to avoid the problem that I hit (and it 
 also seems like the best & only way).
 
 Maybe the name should be something more 
 generic though, like ProcessedSingleOutputOperator or something, I 
 wouldn't know..
 
 Would this deserve an improvement ticket in JIRA?
 
 On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler  
 wrote:
> It would mean that getSideOutput() would return a 
> SingleOutputWithSideOutputOperator which extends SingleOutputOperator 
> offering getSideOutput(). Other transformations would still return a 
> SingleOutputOperator.
> 
> With this the following code wouldn't compile.
> 
> stream
> .process(...)
> .filter(...)
> .getSideOutput(...) // compile error
> 
> You would have to explicitly define the code as below, which makes the 
> behavior unambiguous:
> 
> processed = stream
> .process(...)
> 
> filtered = processed
> .filter(...)
> 
> filteredSideOutput = processed
> .getSideOutput(...)
> .filter(...)
> 
> 
> On 15.01.2018 09:55, Juho Autio wrote:
>> > sideoutput might deserve a seperate class which inherit form 
>> > singleoutput. It might prevent lot of confusions
>> 
>> Thanks, but how could that be done? Do you mean that if one calls 
>> .process(), then the stream would change to another class
>>which would only allow calls like 
>> .getMainOutput() or .getSideOutput("name")? Of course compile time error 
>> would be even better than a runtime error, but I don't see yet how it 
>> could be done in practice.
>> 
>>> On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin  wrote:
>>> Hi Juho,
>>> 
>>> I think sideoutput might deserve a seperate class which inherit form 
>>> singleoutput. It might prevent lot of confusions. A more generic 
>>> question is whether datastream api can be mulitple ins and mulitple 
>>> outs natively. It's more like scheduling problem when you come from 
>>> single process system to multiple process system, which one should get 
>>> resource and how much sharing same hardware resources, I guess it will 
>>> open gate to lots of edge cases -> strategies-> more edge cases :)
>>> 
>>> Chen
>>> 
 On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio  
 wrote:
 Maybe I could express it in a slightly different way: if adding the 
 .filter() after .process() causes the side output to be somehow 
 totally "lost", then I believe the 
   .getSideOutput() could be aware that there is 
 not such side output to be listened to from upstream, and throw an 
 exception. I mean, this should be possible when building the DAG, it 
 shouldn't require starting the stream to detect? Thanks..
 
 On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai 
  wrote:
> Hi Juho,
> 
>> Now that I think of it this seems like a bug to me: why does the 
>> call to getSideOutput succeed if it doesn't provide _any_ input?
> 
> 
> With the way side outputs work, I don’t think this is possible (or 
> would make sense). An operator does not know whether or not it would 
> ever emit some element with a given tag.
> As far as 

Re: How to get kafka partition ID?

2018-01-16 Thread Gordon Weakliem
If you pass a KeyedDeserializationSchema to
new FlinkKafkaConsumer08(topic, keyedDeserializationSchema, properties),
you'll implement a method like this:

public T deserialize(byte[] messageKey, byte[] message, String topic,
int partition, long offset) throws IOException {
}

Then just make T a type that contains the partition as a field so operators
down the pipeline can access that field.


On Tue, Jan 16, 2018 at 12:11 AM, Yuta Morisawa <
yu-moris...@kddi-research.jp> wrote:

> Hi
>
> I want to apply a window function simply on data from each
> kafka-partition(I mean I don't need any grouping and just want to process
> data parallely).
> I think the best way to do so is drawing kafka partition id and use it for
> keyBy function.
> For example,
>
> val data = stream.keyBy(kafka-partitionID)
>  .window(...)
>  .fold(...)
>
> But, I could not find the way.
> How can I get the kafka-partition ID in Flink code?
>
>
> --
> Regards,
> Yuta
>
>


-- 
[image: Img]
*  Gordon Weakliem*|  Sr. Software Engineer
  *O *303.493.5490
*  Boulder* | NYC | London




CONFIDENTIALITY. This communication is intended only for the use of the
intended recipient(s) and contains information that is privileged and
confidential. As a recipient of this confidential and proprietary
information, you are prohibited from distributing this information outside
of sovrn. Further, if you are not the intended recipient, please note that
any dissemination of this communication is prohibited. If you have received
this communication in error, please erase all copies of the message,
including all attachments, and please also notify the sender immediately.
Thank you for your cooperation.


Re: Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

2018-01-16 Thread jelmer
I think i found the issue. I'd just like to verify that my reasoning is
correct

We had the following keys in our flink-conf.yaml

jobmanager.web.address: localhost
jobmanager.web.port: 8081

This worked on flink 1.3.2

But on flink 1.4.0 this check

https://github.com/apache/flink/blob/32770103253e01cd61c8634378cfa1b26707e19a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRedirectUtils.java#L62

Will make make it so that both master and standby think that they don't
need to perform a redirect. Which means that the standby node will serve
web traffic.

I am assuming that it is intended that this never happens. (because if will
call remote actor systems) so this class not being serializable is not a bug





On 16 January 2018 at 14:51, Till Rohrmann  wrote:

> Hi,
>
> this indeed indicates that a REST handler is requesting the ExecutionGraph
> from a JobManager which does not run in the same ActorSystem. Could you
> please tell us the exact HA setup. Are your running Flink on Yarn with HA
> or do you use standalone HA with standby JobManagers?
>
> It would be really helpful if you could also share the logs with us.
>
> Cheers,
> Till
>
> On Tue, Jan 16, 2018 at 10:20 AM, Nico Kruber 
> wrote:
>
>> IMHO, this looks like a bug and it makes sense that you only see this
>> with an HA setup:
>>
>> The JobFound message contains the ExecutionGraph which, however, does
>> not implement the Serializable interface. Without HA, when browsing the
>> web interface, this message is (probably) not serialized since it is
>> only served to you via HTML. For HA, this may come from another
>> JobManager than the Web interface you are browsing.
>> I'm including Till (cc'd) as he might know more.
>>
>>
>> Nico
>>
>> On 16/01/18 09:22, jelmer wrote:
>> > HI,
>> >
>> > We recently upgraded our test environment to from flink 1.3.2 to flink
>> > 1.4.0.
>> >
>> > We are using a high availability setup on the job manager. And now often
>> > when I go to the job details in the web ui the call will timeout and the
>> > following error will pop up in the job manager log
>> >
>> >
>> > akka.remote.MessageSerializer$SerializationException: Failed to
>> > serialize remote message [class
>> > org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
>> > serializer [class akka.serialization.JavaSerializer].
>> > at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at
>> > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:889)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at
>> > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply
>> (Endpoint.scala:889)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.
>> scala:755)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
>> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
>> ForkJoinPool.java:1339)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at
>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
>> l.java:1979)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > at
>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
>> orkerThread.java:107)
>> > [flink-dist_2.11-1.4.0.jar:1.4.0]
>> > Caused by: java.io.NotSerializableException:
>> > org.apache.flink.runtime.executiongraph.ExecutionGraph
>> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.
>> java:1184)
>> > ~[na:1.8.0_131]
>> > at
>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputSt
>> ream.java:1548)
>> > ~[na:1.8.0_131]
>> > at
>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea
>> m.java:1509)
>> > ~[na:1.8.0_131]
>> > at
>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS
>> tream.java:1432)
>> 

Timestamps and watermarks in CoProcessFunction function

2018-01-16 Thread William Saar

Hi,
I have added the code below to the start of processElement2 in
CoProcessFunction. It prints timestamps and watermarks for the first 3
elements for each new watermark. Shouldn't the timestamp always be
lower than the next watermark? The 3 timestamps before the last
watermark are all larger than the watermark time

The output I get is
wm -9223372036854775808
ts 1478815851242
ts 1478816075096
ts 1478816114186
wm 1478822353934
ts 1478835814359
ts 1478835083219
ts 1478836126621
wm 1478827220420
ts 1478836408336
ts 1478836469247
ts 1478836759959

if 
(getRuntimeContext.getIndexOfThisSubtask == 0
) {
if 
(context.timerService().currentWatermark() != printedWatermark
) {
printedWatermark 
= context.timerService().currentWatermark()
println
("wm " 
+ printedWatermark
)
n 
= 0

} else 
{
n 
+= 1

}
if 
(n 
< 3
) {
println
("ts " 
+ context.timestamp())
 }
}




Re: Unrecoverable job failure after Json parse error?

2018-01-16 Thread Adrian Vasiliu
What I had in mind was about a generic handling of the JsonParseException case. But you are right, the picture becomes fuzzier if we also consider messages that are parseable but invalid due to missing or invalid fields. We could imagine a deeper message validation feature but I think subclassing to implement custom handling is okay...
Thanks
Adrian
 
- Original message -From: Nico Kruber To: Adrian Vasiliu Cc: user@flink.apache.orgSubject: Re: Unrecoverable job failure after Json parse error?Date: Tue, Jan 16, 2018 3:18 PM 
Nice, I didn't even read that far myself :P-> turns out the API was prepared for that after allI'm not sure about a default option for handling/skipping corruptedmessages since the handling of those is probably highly use-casespecific. If you nonetheless feel that this should be in there, feelfree to open an improvement request in our issue tracker athttps://issues.apache.org/jira/browse/FLINKNicoOn 16/01/18 13:35, Adrian Vasiliu wrote:> Hi Nico,> Thanks a lot. I did consider that, but I've missed the clarification of> the contract brought by the piece a doc you> pointed: "returning |null| to allow the Flink Kafka consumer to silently> skip the corrupted message".> I suppose it could be an improvement> for JSONKeyValueDeserializationSchema to provide this behaviour as an> out-of-the-box option. But anyway, I do have a solution in hands.> Thanks again.> Adrian>  >>     - Original message ->     From: Nico Kruber >     To: Adrian Vasiliu , user@flink.apache.org>     Cc:>     Subject: Re: Unrecoverable job failure after Json parse error?>     Date: Tue, Jan 16, 2018 11:34 AM>      >     Hi Adrian,>     couldn't you solve this by providing your own DeserializationSchema [1],>     possibly extending from JSONKeyValueDeserializationSchema and catching>     the error there?>>>     Nico>>     [1]>     https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema>>     On 12/01/18 18:26, Adrian Vasiliu wrote:>     > Hello,>     >>     > When using FlinkKafkaConsumer011>     with JSONKeyValueDeserializationSchema,>     > if an invalid, non-parsable message is sent to the Kafka topic, the>     > consumer expectedly fails with JsonParseException. So far so good, but>     > this leads to the following loop: the job switches to FAILED>     > then attempts to restart and fails again, and so on. That is, the>     > parsing error leads to the Kafka message not being committed, hence it>     > keeps being received. >     > Since the JsonParseException can't be catched in application code,>     what>     > would be the recommended way to handle the case of possibly>     > non-parseable Kafka messages?>     >  >     > Is there is a way to configure the Flink Kafka consumer to treat the>     > case of non-parseable messages by logging the parsing error then>     > committing the message such that the processing can continue? Is there>     > isn't, would such an enhancement make sense?>     >>     > Unless there is a better solution, it looks as a requirement to>     > guarantee that FlinkKafkaConsumer011 only receives valid messages,>     which>     > can be annoying in practice.>     >>     > For reference, here's the stack of the JsonParseException in the log:>     >>     > Source: Custom Source(1/1) switched to FAILED>     >>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:>     > Unexpected character (':' (code 58)): Expected space separating>     > root-level values>     > at [Source: UNKNOWN; line: 1, column: 3]>     > at>     >>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)>     > at>     >>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)>     > at>     >>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)>     > at>     >>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)>     > at>     >>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)>     > at>     >>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)>     > at>     >>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)>     > at>     >>     org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)>     > at>     >>     

Re: Failing to recover once checkpoint fails

2018-01-16 Thread Vishal Santoshi
Folks sorry for being late on this. Can some body with the knowledge of
this code base create a jira issue for the above ? We have seen this more
than once on production.

On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek 
wrote:

> Hi Vishal,
>
> Some relevant Jira issues for you are:
>
>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
> failed checkpoints
>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic fallback
> to earlier checkpoint when checkpoint restore fails
>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always remove
> checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>
> Best,
> Aljoscha
>
>
> On 9. Oct 2017, at 09:06, Fabian Hueske  wrote:
>
> Hi Vishal,
>
> it would be great if you could create a JIRA ticket with Blocker priority.
> Please add all relevant information of your detailed analysis, add a link
> to this email thread (see [1] for the web archive of the mailing list), and
> post the id of the JIRA issue here.
>
> Thanks for looking into this!
>
> Best regards,
> Fabian
>
> [1] https://lists.apache.org/list.html?user@flink.apache.org
>
> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi :
>
>> Thank you for confirming.
>>
>>
>>  I think this is a critical bug. In essence any checkpoint store (
>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>> becomes all the more painful with your confirming that  "failed
>> checkpoints killing the job"  b'coz essentially it mean that if remote
>> store in unavailable  during checkpoint than you have lost state ( till of
>> course you have a retry of none or an unbounded retry delay, a delay that
>> you *hope* the store revives in ) .. Remember  the first retry failure
>>  will cause new state according the code as written iff the remote store is
>> down. We would rather have a configurable property that establishes  our
>> desire to abort something like a "abort_retry_on_chkretrevalfailure"
>>
>>
>> In our case it is very important that we do not undercount a window, one
>> reason we use flink and it's awesome failure guarantees, as various alarms
>> sound ( we do anomaly detection on the time series ).
>>
>> Please create a jira ticket for us to follow or we could do it.
>>
>>
>> PS Not aborting on checkpointing, till a configurable limit is very
>> important too.
>>
>>
>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi Vishal,
>>>
>>> I think you're right! And thanks for looking into this so deeply.
>>>
>>> With your last mail your basically saying, that the checkpoint could not
>>> be restored because your HDFS was temporarily down. If Flink had not
>>> deleted that checkpoint it might have been possible to restore it at a
>>> later point, right?
>>>
>>> Regarding failed checkpoints killing the job: yes, this is currently the
>>> expected behaviour but there are plans to change this.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 5. Oct 2017, at 17:40, Vishal Santoshi 
>>> wrote:
>>>
>>> I think this is the offending piece. There is a catch all Exception,
>>> which IMHO should understand a recoverable exception from an unrecoverable
>>> on.
>>>
>>>
>>> try {
>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>> eckpointStateHandle);
>>> if (completedCheckpoint != null) {
>>> completedCheckpoints.add(completedCheckpoint);
>>> }
>>> } catch (Exception e) {
>>> LOG.warn("Could not retrieve checkpoint. Removing it from the completed
>>> " +
>>> "checkpoint store.", e);
>>> // remove the checkpoint with broken state handle
>>> removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.
>>> f0);
>>> }
>>>
>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 So this is the issue and tell us that it is wrong. ZK had some state (
 backed by hdfs ) that referred to a checkpoint ( the same exact last
 successful checkpoint that was successful before NN screwed us ). When the
 JM tried to recreate the state and b'coz NN was down failed to retrieve the
 CHK handle from hdfs and conveniently ( and I think very wrongly ) removed
 the CHK from being considered and cleaned the pointer ( though failed as
 was NN was down and is obvious from the dangling file in recovery ) . The
 metadata itself was on hdfs and failure in retrieving should have been a
 stop all, not going to trying doing magic exception rather than starting
 from a blank state.

 org.apache.flink.util.FlinkException: Could not retrieve checkpoint
 44286 from state handle under /0044286. This indicates that the
 retrieved state handle is broken. Try cleaning the state handle store.






 On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Also note that  the zookeeper recovery did  ( sadly 

Re: Unrecoverable job failure after Json parse error?

2018-01-16 Thread Nico Kruber
Nice, I didn't even read that far myself :P
-> turns out the API was prepared for that after all

I'm not sure about a default option for handling/skipping corrupted
messages since the handling of those is probably highly use-case
specific. If you nonetheless feel that this should be in there, feel
free to open an improvement request in our issue tracker at
https://issues.apache.org/jira/browse/FLINK


Nico

On 16/01/18 13:35, Adrian Vasiliu wrote:
> Hi Nico,
> Thanks a lot. I did consider that, but I've missed the clarification of
> the contract brought by the piece a doc you
> pointed: "returning |null| to allow the Flink Kafka consumer to silently
> skip the corrupted message".
> I suppose it could be an improvement
> for JSONKeyValueDeserializationSchema to provide this behaviour as an
> out-of-the-box option. But anyway, I do have a solution in hands.
> Thanks again.
> Adrian
>  
> 
> - Original message -
> From: Nico Kruber 
> To: Adrian Vasiliu , user@flink.apache.org
> Cc:
> Subject: Re: Unrecoverable job failure after Json parse error?
> Date: Tue, Jan 16, 2018 11:34 AM
>  
> Hi Adrian,
> couldn't you solve this by providing your own DeserializationSchema [1],
> possibly extending from JSONKeyValueDeserializationSchema and catching
> the error there?
> 
> 
> Nico
> 
> [1]
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema
> 
> On 12/01/18 18:26, Adrian Vasiliu wrote:
> > Hello,
> >
> > When using FlinkKafkaConsumer011
> with JSONKeyValueDeserializationSchema,
> > if an invalid, non-parsable message is sent to the Kafka topic, the
> > consumer expectedly fails with JsonParseException. So far so good, but
> > this leads to the following loop: the job switches to FAILED
> > then attempts to restart and fails again, and so on. That is, the
> > parsing error leads to the Kafka message not being committed, hence it
> > keeps being received. 
> > Since the JsonParseException can't be catched in application code,
> what
> > would be the recommended way to handle the case of possibly
> > non-parseable Kafka messages?
> >  
> > Is there is a way to configure the Flink Kafka consumer to treat the
> > case of non-parseable messages by logging the parsing error then
> > committing the message such that the processing can continue? Is there
> > isn't, would such an enhancement make sense?
> >
> > Unless there is a better solution, it looks as a requirement to
> > guarantee that FlinkKafkaConsumer011 only receives valid messages,
> which
> > can be annoying in practice.
> >
> > For reference, here's the stack of the JsonParseException in the log:
> >
> > Source: Custom Source(1/1) switched to FAILED
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
> > Unexpected character (':' (code 58)): Expected space separating
> > root-level values
> > at [Source: UNKNOWN; line: 1, column: 3]
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
> > at
> >
> 
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890)
> > at
> >
> 
> 

Re: Stream job failed after increasing number retained checkpoints

2018-01-16 Thread Jose Miguel Tejedor Fernandez
Thanks Piotr and Stefan,

The problem was the overhead in the heap memory usage of the JobManager
when increasing the num-retained checkpoints. It was solved once I revert
that value to one.

BR

That's the actual error according to the JobManager log in the OOM:

2018-01-08 22:27:09,293 WARN
org.jboss.netty.channel.socket.nio.AbstractNioSelector
  - Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
2018-01-08 22:27:15,796 ERROR akka.actor.ActorSystemImpl
- Uncaught error from thread
[flink-akka.actor.default-dispatcher-22840]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.OutOfMemoryError: Java heap space
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
growArray(ForkJoinPool.java:1090)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
ForkJoinPool.java:1978)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)
2018-01-08 22:27:16,288 ERROR akka.actor.ActorSystemImpl
- Uncaught error from thread
[flink-akka.remote.default-remote-dispatcher-22839]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.OutOfMemoryError: Java heap space
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
growArray(ForkJoinPool.java:1090)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
ForkJoinPool.java:1978)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)
2018-01-08 22:27:16,882 INFO
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
   - Removing web dashboard root cache directory
/tmp/flink-web-f75e187d-3d08-4864-ba08-1740c8586be1
2018-01-08 22:27:17,394 INFO
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor
   - Removing web dashboard jar upload directory
/tmp/flink-web-2c8657f2-9b87-4964-bde4-9997ef31966d
2018-01-08 22:27:19,863 INFO  org.apache.flink.runtime.blob.BlobServer
- Stopped BLOB server at 0.0.0.0:44378



here’s an test with Abba after it has accumulated state for 21 hours
it seems that creating a nightly savepoint won’t necessarily be scalable
so being able to use incremental checkpoints would still seem appealing, if
possible
or why not making a savepoint somehow by copying the required files of a
checkpoint instead - but I doubt that flink would support that



*José Miguel Tejedor Fernández*
Server developer
jose.fernan...@rovio.com

Rovio Entertainment Ltd.
Keilaranta 7, FIN - 02150 Espoo, Finland
www.rovio.com



On Wed, Jan 10, 2018 at 3:08 PM, Piotr Nowojski 
wrote:

> Hi,
>
> This Task Manager log is suggesting that problems lays on the Job Manager
> side (no visible gap in the logs, GC Time reported is accumulated and 31
> seconds accumulated over 963 gc collections is low value). Could you show
> the Job Manager log itself? Probably it’s the own that’s causing the
> TaskManager to timeout.
>
> On the other hand, I see that Task Manager max heap size is ~5GB and I
> assume this is the same setting for the Job manager. A Stefan pointed out,
> there is some memory overhead on the Job Manager for retaining the
> checkpoint and it is around couple of hundred bytes (maybe even 1KB) per
> operator instance. By doing quick math:
>
> 2880 checkpoints * 10 task managers * 10 operators in the job * 8
> parallelism per task manager * 500 bytes = ~1GB
>
> The answer might be that you just need to increase the Job Manager max
> heap to retain 2880 checkpoints.
>
> Piotrek
>
> On 10 Jan 2018, at 12:00, Jose Miguel Tejedor Fernandez <
> jose.fernan...@rovio.com> wrote:
>
> Hi,
>
> I wonder what reason you might have that you ever want such a huge number
>> of retained checkpoints?
>
>
> The Flink jobs running on EMR cluster require a checkpoint at midnight.
> (In our use case we need to synch a loaded delta to our a third party
> partner with the streamed data). The delta load the whole day data and
> that's why we wanted to have available the midnight's checkpoint to start
> from there.
> We could also make a savepoint at midnight, but it’s not as handy (we
> would need to build our own tooling to do it), and it can’t benefit from
> the smaller latency of an incremental checkpoint. Another thining is that
> implementing our own savepoint tool is a bit hard to monitor. Besides,
> retaining several having checkpoints created every minute is that it would
> also allow us to load a delta at any time. Please, if there are better ways
> of achieving this, let me know.
>
> From where does the log trace come from?
>
>
> It comes from the TaskManager.
>
> Please search on the opposite side of the time outing connection for
>> possible root cause of the timeout including:
>> - possible error/exceptions/warnings
>> - long GC pauses or other blocking operations (possibly long unnatural
>> gaps in the logs)
>> - machine health (CPU usage, disks usage, network connections)
>
>
> It seems that TaskManager disconnect from JobManager and then 

Re: Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

2018-01-16 Thread Till Rohrmann
Hi,

this indeed indicates that a REST handler is requesting the ExecutionGraph
from a JobManager which does not run in the same ActorSystem. Could you
please tell us the exact HA setup. Are your running Flink on Yarn with HA
or do you use standalone HA with standby JobManagers?

It would be really helpful if you could also share the logs with us.

Cheers,
Till

On Tue, Jan 16, 2018 at 10:20 AM, Nico Kruber 
wrote:

> IMHO, this looks like a bug and it makes sense that you only see this
> with an HA setup:
>
> The JobFound message contains the ExecutionGraph which, however, does
> not implement the Serializable interface. Without HA, when browsing the
> web interface, this message is (probably) not serialized since it is
> only served to you via HTML. For HA, this may come from another
> JobManager than the Web interface you are browsing.
> I'm including Till (cc'd) as he might know more.
>
>
> Nico
>
> On 16/01/18 09:22, jelmer wrote:
> > HI,
> >
> > We recently upgraded our test environment to from flink 1.3.2 to flink
> > 1.4.0.
> >
> > We are using a high availability setup on the job manager. And now often
> > when I go to the job details in the web ui the call will timeout and the
> > following error will pop up in the job manager log
> >
> >
> > akka.remote.MessageSerializer$SerializationException: Failed to
> > serialize remote message [class
> > org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
> > serializer [class akka.serialization.JavaSerializer].
> > at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at
> > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> apply(Endpoint.scala:889)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at
> > akka.remote.EndpointWriter$$anonfun$serializeMessage$1.
> apply(Endpoint.scala:889)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:755)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> > [flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> > [flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> > [flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> > [flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> > [flink-dist_2.11-1.4.0.jar:1.4.0]
> > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > [flink-dist_2.11-1.4.0.jar:1.4.0]
> > at
> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> > [flink-dist_2.11-1.4.0.jar:1.4.0]
> > at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > [flink-dist_2.11-1.4.0.jar:1.4.0]
> > at
> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> > [flink-dist_2.11-1.4.0.jar:1.4.0]
> > Caused by: java.io.NotSerializableException:
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> > ~[na:1.8.0_131]
> > at
> > java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
> > ~[na:1.8.0_131]
> > at
> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> > ~[na:1.8.0_131]
> > at
> > java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
> > ~[na:1.8.0_131]
> > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> > ~[na:1.8.0_131]
> > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> > ~[na:1.8.0_131]
> > at
> > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(
> Serializer.scala:321)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at
> > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.
> scala:321)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at
> > akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.
> scala:321)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.serialization.JavaSerializer.toBinary(Serializer.scala:321)
> > ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> > at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:47)
> > 

Re: SideOutput doesn't receive anything if filter is applied after the process function

2018-01-16 Thread Chesnay Schepler

I've opened https://issues.apache.org/jira/browse/FLINK-8437

Unfortunately i doubt we can fix this properly. The proposed solution 
will not work if we ever allow arbitrary functions to use side-outputs.


On 16.01.2018 08:59, Juho Autio wrote:
Could someone with knowledge of the right terms create this in JIRA, 
please? I guess I could also create it if needed..


On Mon, Jan 15, 2018 at 3:15 PM, Chesnay Schepler > wrote:


yes, i meant that process() returns the special operator. This
would definitely deserve a JIRA issue.


On 15.01.2018 14:09, Juho Autio wrote:

Thanks for the explanation. Did you meant that process() would
return a SingleOutputWithSideOutputOperator?

Any way, that should be enough to avoid the problem that I hit
(and it also seems like the best & only way).

Maybe the name should be something more generic though, like
ProcessedSingleOutputOperator or something, I wouldn't know..

Would this deserve an improvement ticket in JIRA?

On Mon, Jan 15, 2018 at 12:43 PM, Chesnay Schepler
> wrote:

It would mean that getSideOutput() would return a
SingleOutputWithSideOutputOperator which extends
SingleOutputOperator offering getSideOutput(). Other
transformations would still return a SingleOutputOperator.

With this the following code wouldn't compile.

stream
.process(...)
.filter(...)
.getSideOutput(...) // compile error

You would have to explicitly define the code as below, which
makes the behavior unambiguous:

processed = stream
.process(...)

filtered = processed
.filter(...)

filteredSideOutput = processed
.getSideOutput(...)
.filter(...)


On 15.01.2018 09:55, Juho Autio wrote:

> sideoutput might deserve a seperate class which inherit
form singleoutput. It might prevent lot of confusions

Thanks, but how could that be done? Do you mean that if one
calls .process(), then the stream would change to another
class which would only allow calls like .getMainOutput() or
.getSideOutput("name")? Of course compile time error would
be even better than a runtime error, but I don't see yet how
it could be done in practice.

On Sun, Jan 14, 2018 at 4:55 AM, Chen Qin
> wrote:

Hi Juho,

I think sideoutput might deserve a seperate class which
inherit form singleoutput. It might prevent lot of
confusions. A more generic question is whether
datastream api can be mulitple ins and mulitple outs
natively. It's more like scheduling problem when you
come from single process system to multiple process
system, which one should get resource and how much
sharing same hardware resources, I guess it will open
gate to lots of edge cases -> strategies-> more edge
cases :)

Chen

On Fri, Jan 12, 2018 at 6:34 AM, Juho Autio
> wrote:

Maybe I could express it in a slightly different
way: if adding the .filter() after .process() causes
the side output to be somehow totally "lost", then I
believe the .getSideOutput() could be aware that
there is not such side output to be listened to from
upstream, and throw an exception. I mean, this
should be possible when building the DAG, it
shouldn't require starting the stream to detect?
Thanks..

On Fri, Jan 12, 2018 at 2:48 PM, Tzu-Li (Gordon) Tai
>
wrote:

Hi Juho,


Now that I think of it this seems like a bug to
me: why does the call to getSideOutput succeed
if it doesn't provide _any_ input?


With the way side outputs work, I don’t think
this is possible (or would make sense). An
operator does not know whether or not it would
ever emit some element with a given tag.
As far as I understand it, calling
`getSideOutput` essentially adds a virtual node
to the result stream graph that listens to the
specified tag from the upstream input.

While I’m not aware whether or not your
observation is expected behavior, from an API
perspective, I can see why it is a bit confusing
  

Re: Unrecoverable job failure after Json parse error?

2018-01-16 Thread Adrian Vasiliu
Hi Nico,
Thanks a lot. I did consider that, but I've missed the clarification of the contract brought by the piece a doc you pointed: "returning null to allow the Flink Kafka consumer to silently skip the corrupted message".
I suppose it could be an improvement for JSONKeyValueDeserializationSchema to provide this behaviour as an out-of-the-box option. But anyway, I do have a solution in hands.
Thanks again.
Adrian
 
- Original message -From: Nico Kruber To: Adrian Vasiliu , user@flink.apache.orgCc:Subject: Re: Unrecoverable job failure after Json parse error?Date: Tue, Jan 16, 2018 11:34 AM 
Hi Adrian,couldn't you solve this by providing your own DeserializationSchema [1],possibly extending from JSONKeyValueDeserializationSchema and catchingthe error there?Nico[1]https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschemaOn 12/01/18 18:26, Adrian Vasiliu wrote:> Hello,>> When using FlinkKafkaConsumer011 with JSONKeyValueDeserializationSchema,> if an invalid, non-parsable message is sent to the Kafka topic, the> consumer expectedly fails with JsonParseException. So far so good, but> this leads to the following loop: the job switches to FAILED> then attempts to restart and fails again, and so on. That is, the> parsing error leads to the Kafka message not being committed, hence it> keeps being received. > Since the JsonParseException can't be catched in application code, what> would be the recommended way to handle the case of possibly> non-parseable Kafka messages?>  > Is there is a way to configure the Flink Kafka consumer to treat the> case of non-parseable messages by logging the parsing error then> committing the message such that the processing can continue? Is there> isn't, would such an enhancement make sense?>> Unless there is a better solution, it looks as a requirement to> guarantee that FlinkKafkaConsumer011 only receives valid messages, which> can be annoying in practice.>> For reference, here's the stack of the JsonParseException in the log:>> Source: Custom Source(1/1) switched to FAILED> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:> Unexpected character (':' (code 58)): Expected space separating> root-level values> at [Source: UNKNOWN; line: 1, column: 3]> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)> at> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890)> at> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55)> at> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40)> at> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140)> at> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641)> at> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)> at> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)> at> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)> at> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)> at java.lang.Thread.run(Thread.java:745)>> My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac.>> Thanks,> Adrian> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:> Compagnie IBM France> Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex> RCS Nanterre 552 

Re: Low throughput when trying to send data with Sockets

2018-01-16 Thread Nico Kruber
Hi George,
I suspect issuing a read operation for every 68 bytes incurs too much
overhead to perform as you would like it to. Instead, create a bigger
buffer (64k?) and extract single events from sub-regions of this buffer
instead.
Please note, however, that then the first buffer will only be processed
when this method returns (the details depend on the underlying channel
[1]). This is a trade-off between latency and throughput at some point.
If you set non-blocking mode for your channels, you will always get what
the channel has available and continue immediately. You can set this up
via this, for example:

==
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("http://jenkov.com;, 80));

while(! socketChannel.finishConnect() ){
//wait, or do something else...
}
==


Nico

[1]
https://docs.oracle.com/javase/7/docs/api/java/nio/channels/SocketChannel.html#read(java.nio.ByteBuffer)

On 15/01/18 13:19, George Theodorakis wrote:
> Hello,
> 
> I am trying to separate the logic of my application by generating and
> processing data in different physical engines. 
> 
> I have created my custom socket source class:
> 
> class SocketSourceFunction extends SourceFunction[Event2]{
>       @volatile private var isRunning:Boolean = true;
>       @transient private var serverSocket: ServerSocketChannel = null; 
> 
>       override def run(ctx: SourceContext[Event2]) = {
>   val hostname = "localhost"
>   val port = 6667
>           println("listening:" + port)                
>   val server = ServerSocketChannel.open();
>   server.bind(new InetSocketAddress (hostname, port));    
>   var buffer = ByteBuffer.allocate (68);
>   val des = new EventDeSerializer2()
>       
>   while (isRunning) {
>             println("waiting...")            
>             var socketChannel = server.accept();
> 
>      if (socketChannel != null){
>                println("accept:" + socketChannel)
>                while (true) {
>     var bytes = 0;
>     bytes = socketChannel.read(buffer)
>     if( bytes > 0) {
>     if (!buffer.hasRemaining()) {
>     buffer.rewind()
>     var event: Event2 = des.deserialize(buffer.array())
>     ctx.collect(event)
>     buffer.clear()
>     }
>     }
>                      }
> }
>           }          
>       }
> 
>       override def cancel() = {
>         isRunning = false;
>         val socket = this.serverSocket; 
>         if (socket != null) { 
>           try { 
>             socket.close(); 
>            }catch { case e: Throwable => {  
>              System.err.println(String.format("error: %s", e.getMessage()));
>         e.printStackTrace();
>         System.exit(1);
>              }
>            }
>          } 
>       }
> }
> 
> I am sending data with either raw sockets using ByteBuffers or with a
> Flink generator (serializing my Events and using writeToSocket()
> method). However, in both cases, I am experiencing less than 10x
> throughput in comparison to in-memory generation, even when using
> a 10gbit connection (the throughput is much lower).
> 
> Is there any obvious defect in my implementation?
> 
> Thank you in advance,
> George



signature.asc
Description: OpenPGP digital signature


Re: Unrecoverable job failure after Json parse error?

2018-01-16 Thread Nico Kruber
Hi Adrian,
couldn't you solve this by providing your own DeserializationSchema [1],
possibly extending from JSONKeyValueDeserializationSchema and catching
the error there?


Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#the-deserializationschema

On 12/01/18 18:26, Adrian Vasiliu wrote:
> Hello,
> 
> When using FlinkKafkaConsumer011 with JSONKeyValueDeserializationSchema,
> if an invalid, non-parsable message is sent to the Kafka topic, the
> consumer expectedly fails with JsonParseException. So far so good, but
> this leads to the following loop: the job switches to FAILED
> then attempts to restart and fails again, and so on. That is, the
> parsing error leads to the Kafka message not being committed, hence it
> keeps being received. 
> Since the JsonParseException can't be catched in application code, what
> would be the recommended way to handle the case of possibly
> non-parseable Kafka messages?
>  
> Is there is a way to configure the Flink Kafka consumer to treat the
> case of non-parseable messages by logging the parsing error then
> committing the message such that the processing can continue? Is there
> isn't, would such an enhancement make sense?
> 
> Unless there is a better solution, it looks as a requirement to
> guarantee that FlinkKafkaConsumer011 only receives valid messages, which
> can be annoying in practice.
> 
> For reference, here's the stack of the JsonParseException in the log:
> 
> Source: Custom Source(1/1) switched to FAILED
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
> Unexpected character (':' (code 58)): Expected space separating
> root-level values
> at [Source: UNKNOWN; line: 1, column: 3]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:450)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportMissingRootWS(ParserMinimalBase.java:466)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._verifyRootSpace(UTF8StreamJsonParser.java:1657)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parsePosNumber(UTF8StreamJsonParser.java:1394)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:852)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792)
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890)
> at
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:55)
> at
> org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:40)
> at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:641)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:745)
> 
> My env: Flink 1.4.0 and kafka_2.11-1.0.0 running locally on Mac.
> 
> Thanks,
> Adrian
> Sauf indication contraire ci-dessus:/ Unless stated otherwise above:
> Compagnie IBM France
> Siège Social : 17 avenue de l'Europe, 92275 Bois-Colombes Cedex
> RCS Nanterre 552 118 465
> Forme Sociale : S.A.S.
> Capital Social : 657.364.587 €
> SIREN/SIRET : 552 118 465 03644 - Code NAF 6202A



signature.asc
Description: OpenPGP digital signature


Re: Parallel stream consumption

2018-01-16 Thread Nico Kruber
Just found a nice (but old) blog post that explains Flink's integration
with Kafka:
https://data-artisans.com/blog/kafka-flink-a-practical-how-to

I guess, the basics are still valid


Nico

On 16/01/18 11:17, Nico Kruber wrote:
> Hi Jason,
> I'd suggest to start with [1] and [2] for getting the basics of a Flink
> program.
> The DataStream API basically wires operators together with streams so
> that whatever stream gets out of one operator is the input of the next.
> By connecting both functions to the same Kafka stream source, your
> program results in this:
> 
> Kafka --> Function1
>   |
>   --> Function2
> 
> where both functions receive all elements the previous stream offers
> (elements are broadcasted). If you want the two functions to work on
> different elements, you could add a filter before each function:
> 
> DataStream inputStream = ... got a kafka stream
> inputStream.filter(...).process( Function1 );
> inputStream.filter(...).process( Function2 );
> 
> or split the stream (see [3] for available operators).
> 
> I'm no expert on Kafka though, so I can't give you an advise on the
> performance - I'd suggest to create some small benchmarks for your setup
> since this probably depends on the cluster architecture and the
> parallelism of the operators and the number of Kafka partitions.
> Maybe Gordon (cc'd) can give some more insights.
> 
> 
> Regards
> Nico
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/programming-model.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html
> 
> On 12/01/18 21:57, Jason Kania wrote:
>> Hi,
>>
>> I have a question that I have not resolved via the documentation,
>> looking in the "Parallel Execution", "Streaming"  and the "Connectors"
>> sections. If I retrieve a kafka stream and then call the process
>> function against it in parallel, as follows, does it consume in some
>> round robin fashion between the two process calls or is each element
>> coming out of the kafka connector consumed by both processors in parallel?
>>
>> DataStream inputStream = ... got a kafka stream
>> inputStream.process( Function1 );
>> inputStream.process( Function2 );
>>
>> If it possible to consume in parallel by pointing at the single stream,
>> is it typically slower or faster than having two kafka streams with
>> different group ids?
>>
>> If not documented elsewhere, this would be good to cover since it is
>> fundamental.
>>
>> Thanks,
>>
>> Jason
> 



signature.asc
Description: OpenPGP digital signature


Re: Parallel stream consumption

2018-01-16 Thread Nico Kruber
Hi Jason,
I'd suggest to start with [1] and [2] for getting the basics of a Flink
program.
The DataStream API basically wires operators together with streams so
that whatever stream gets out of one operator is the input of the next.
By connecting both functions to the same Kafka stream source, your
program results in this:

Kafka --> Function1
  |
  --> Function2

where both functions receive all elements the previous stream offers
(elements are broadcasted). If you want the two functions to work on
different elements, you could add a filter before each function:

DataStream inputStream = ... got a kafka stream
inputStream.filter(...).process( Function1 );
inputStream.filter(...).process( Function2 );

or split the stream (see [3] for available operators).

I'm no expert on Kafka though, so I can't give you an advise on the
performance - I'd suggest to create some small benchmarks for your setup
since this probably depends on the cluster architecture and the
parallelism of the operators and the number of Kafka partitions.
Maybe Gordon (cc'd) can give some more insights.


Regards
Nico

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/programming-model.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/api_concepts.html
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html

On 12/01/18 21:57, Jason Kania wrote:
> Hi,
> 
> I have a question that I have not resolved via the documentation,
> looking in the "Parallel Execution", "Streaming"  and the "Connectors"
> sections. If I retrieve a kafka stream and then call the process
> function against it in parallel, as follows, does it consume in some
> round robin fashion between the two process calls or is each element
> coming out of the kafka connector consumed by both processors in parallel?
> 
> DataStream inputStream = ... got a kafka stream
> inputStream.process( Function1 );
> inputStream.process( Function2 );
> 
> If it possible to consume in parallel by pointing at the single stream,
> is it typically slower or faster than having two kafka streams with
> different group ids?
> 
> If not documented elsewhere, this would be good to cover since it is
> fundamental.
> 
> Thanks,
> 
> Jason



signature.asc
Description: OpenPGP digital signature


Re: logging question

2018-01-16 Thread Nico Kruber
Just a guess, but probably our logging initialisation changes the global
log level (see conf/log4j.properties). DataStream.collect() executes the
program along with creating a local Flink "cluster" (if you are testing
locally / in an IDE) and initializing logging, among other things.

Please comment the first line out and uncomment the following one to
read like this:
==
# This affects logging for both user code and Flink
#log4j.rootLogger=INFO, file

# Uncomment this if you want to _only_ change Flink's logging
log4j.logger.org.apache.flink=INFO
==


Nico

On 13/01/18 13:52, j...@vooght.de wrote:
> Hello,
> I am learning Flink and using the docker image along with the AMIDST
> library for this.
> Below is a sample task from AMIDST which provides INFO output up until I
> reach updateModel(). I pasted the short method as well and wonder what
> prevents the Logger from
> 
>     //Set-up Flink session
>     env = ExecutionEnvironment.getExecutionEnvironment();
>     env.getConfig().disableSysoutLogging();
>     Logger LOG = LoggerFactory.getLogger("> ParallelMLExample");
> 
>     //generate a random dataset
>     DataFlink dataFlink = new
> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
> 
>     //Creates a DAG with the NaiveBayes structure for the random
> dataset
>     DAG dag =
> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
> "DiscreteVar4");
>     LOG.info(dag.toString());
> 
>     //Create the Learner object
>     ParameterLearningAlgorithm learningAlgorithmFlink = new
> ParallelMaximumLikelihood();
> 
>     //Learning parameters
>     learningAlgorithmFlink.setBatchSize(10);
>     learningAlgorithmFlink.setDAG(dag);
> 
>     //Initialize the learning process
>     learningAlgorithmFlink.initLearning();
> 
>     //Learn from the flink data
>     LOG.info("BEFORE UPDATEMODEL");
>     learningAlgorithmFlink.updateModel(dataFlink);
>     LOG.info("AFTER UPDATEMODEL");
> 
>     //Print the learnt Bayes Net
>     BayesianNetwork bn =
> learningAlgorithmFlink.getLearntBayesianNetwork();
>     LOG.info(bn.toString());
> 
> 
> Below is the updateModel method.
> 
>     public double updateModel(DataFlink dataUpdate) {
>     try {
>     Configuration config = new Configuration();
>     config.setString(BN_NAME, this.dag.getName());
>     config.setBytes(EFBN_NAME,
> Serialization.serializeObject(efBayesianNetwork));
> 
>     DataSet dataset = dataUpdate.getDataSet();
>     this.sumSS = dataset.map(new SufficientSatisticsMAP())
>     .withParameters(config)
>     .reduce(new SufficientSatisticsReduce())
>     .collect().get(0);
> 
>     //Add the prior
>     sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
> 
>     JobExecutionResult result =
> dataset.getExecutionEnvironment().getLastJobExecutionResult();
> 
>     numInstances =
> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
> 
>     numInstances++;//Initial counts
> 
>     }catch(Exception ex){
>     throw new UndeclaredThrowableException(ex);
>     }
> 
>     return this.getLogMarginalProbability();
>     }
> 
> 
> Not sure why LOG.info past that method are not output to the console.
> TIA
> JP



signature.asc
Description: OpenPGP digital signature


Re: How to get automatic fail over working in Flink

2018-01-16 Thread Nico Kruber
Hi James,
In this scenario, with the restart strategy set, the job should restart
(without YARN/Mesos) as long as you have enough slots available.

Can you check with the web interface on http://:8081/ that
enough slots are available after killing one TaskManager?

Can you provide JobManager and TaskManager logs and some more details on
the job you are running?


Nico

On 16/01/18 07:04, Data Engineer wrote:
> This question has been asked on StackOverflow:
> https://stackoverflow.com/questions/48262080/how-to-get-automatic-fail-over-working-in-flink
> 
> I am using Apache Flink 1.4 on a cluster of 3 machines, out of which one
> is the JobManager and the other 2 host TaskManagers.
> 
> I start flink in cluster mode and submit a flink job. I have configured
> 24 task slots in the flink config, and for the job I use 6 task slots.
> 
> When I submit the job, I see 3 tasks are assigned to Worker machine 1
> and 3 are assigned to Worker machine 2. Now, when I kill the TaskManager
> on WorkerMachine 2, I see that the entire job fails.
> 
> Is this the expected behaviour, or does it have automatic failover as in
> Spark.
> 
> Do we need to use YARN/Mesos to achieve automatic failover?
> 
> We tried the Restart Strategy, but when it restarts we get an exception
> saying that no task slots are available and then the job fails. We think
> that 24 slots is enough to take over. What could we be doing wrong here?
> 
> Regards,
> James



signature.asc
Description: OpenPGP digital signature


Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor

2018-01-16 Thread Fabian Hueske
This depends on the requirements of your application.
Using the usual watermark generation strategies which are purely data
driven, a stream that does not produce data would not advance its
watermarks.
Not advancing the watermarks means that the program cannot make progress.

This might also be fine if your program consumes a single stream because if
this stream does not produce data, your program also doesn't have anything
to compute (there might be still data left. such as a window, that is not
computed).
The situation becomes more tricky, if your program has multiple sources
that become inactive at some point or a source where a partition can become
inactive.

AFAIK, there is a mechanism to mark partitions (and maybe complete sources)
as inactive.
@Gordon (in CC) knows more about this feature.

Best, Fabian

2018-01-15 14:51 GMT+01:00 Jayant Ameta :

> Hi Fabian,
> I want to extract timestamps from my event. However, the events stream can
> be sparse at times (e.g. 2 days without any events).
> What's the best strategy to create watermarks if I want real-time
> processing of the events which enter the stream?
>
> Jayant Ameta
>
> On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske  wrote:
>
>> Another thing to point out is that watermarks are usually data-driven,
>> i.e., they depend on the timestamps of the events and not on the clock of
>> the machine.
>> Otherwise, you might observe a lot of late data, i.e., events with
>> timestamps smaller than the last watermark.
>>
>> If you assign timestamps and watermarks based on the clock of the
>> machine, you might also use ingestion time instead of event time.
>>
>> 2018-01-11 11:49 GMT+01:00 Jayant Ameta :
>>
>>> Thanks Gary,
>>> I was only trying with a fixed set of events, so the Watermark was not
>>> advancing, like you said.
>>>
>>>
>>> Jayant Ameta
>>>
>>> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao 
>>> wrote:
>>>
 Hi Jayant,

 The difference is that the Watermarks from
 BoundedOutOfOrdernessTimestampExtractor are based on the greatest
 timestamp of
 all previous events. That is, if you do not receive new events, the
 Watermark
 will not advance. In contrast, your custom implementation of
 AssignerWithPeriodicWatermarks always advances the Watermark based on
 the wall
 clock.

 Maybe this will already help you to debug your application. If not, it
 would be
 great to see a minimal working example.

 Best,
 Gary

 On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta 
 wrote:

> Hi,
> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is
> not firing. However, the trigger fires when using custom timestamp
> extractor with similar watermark.
>
> Sample code below:
> 1.Assigner as anonymous class which works fine
>
> AssignerWithPeriodicWatermarks> assigner = new 
> AssignerWithPeriodicWatermarks>() {
>
> @Override
> public long extractTimestamp(Tuple2 element, long 
> previousElementTimestamp) {
> return System.currentTimeMillis();
> }
>
> @Override
> public final Watermark getCurrentWatermark() {
> // this guarantees that the watermark never goes backwards.
> return new Watermark(System.currentTimeMillis()-100);
> }
> };
>
>
> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work
>
> AssignerWithPeriodicWatermarks> assigner = new 
> BoundedOutOfOrdernessTimestampExtractor T>>(Time.milliseconds(100)) {
>
> @Override
> public long extractTimestamp(Tuple2 element) {
> return System.currentTimeMillis();
> }
> };
>
>
> Do you see any difference in the approaches?
>
> - Jayant
>


>>>
>>
>


Re: Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

2018-01-16 Thread Nico Kruber
IMHO, this looks like a bug and it makes sense that you only see this
with an HA setup:

The JobFound message contains the ExecutionGraph which, however, does
not implement the Serializable interface. Without HA, when browsing the
web interface, this message is (probably) not serialized since it is
only served to you via HTML. For HA, this may come from another
JobManager than the Web interface you are browsing.
I'm including Till (cc'd) as he might know more.


Nico

On 16/01/18 09:22, jelmer wrote:
> HI, 
> 
> We recently upgraded our test environment to from flink 1.3.2 to flink
> 1.4.0.
> 
> We are using a high availability setup on the job manager. And now often
> when I go to the job details in the web ui the call will timeout and the
> following error will pop up in the job manager log
> 
> 
> akka.remote.MessageSerializer$SerializationException: Failed to
> serialize remote message [class
> org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
> serializer [class akka.serialization.JavaSerializer].
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:755)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.4.0.jar:1.4.0]
> Caused by: java.io.NotSerializableException:
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> ~[na:1.8.0_131]
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> ~[na:1.8.0_131]
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> ~[na:1.8.0_131]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at
> akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:321)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:47)
> ~[flink-dist_2.11-1.4.0.jar:1.4.0]
> ... 17 common frames omitted
> 
> 
> 
> I isolated it further, and it seems to be triggered by this call
> 
> https://hostname/jobs/28076fffbcf7eab3f17900a54cc7c41d
> 
> I cannot reproduce it on my local lapop without HA setup.
> Before I dig any deeper, has anyone already come across this ?



signature.asc
Description: OpenPGP digital signature


Re: Hadoop compatibility and HBase bulk loading

2018-01-16 Thread Fabian Hueske
Looking at my previous mail which mentions changes to API, optimizer, and
runtime code of the DataSet API this would be a major and non-trivial
effort and also require that a committer spends a good amount of time for
this.


2018-01-16 10:07 GMT+01:00 Flavio Pompermaier :

> Do you think is that complex to support it? I think we can try to
> implement it if someone could give us some support (at least some big
> picture)
>
> On Tue, Jan 16, 2018 at 10:02 AM, Fabian Hueske  wrote:
>
>> No, I'm not aware of anybody working on extending the Hadoop
>> compatibility support.
>> I'll also have no time to work on this any time soon :-(
>>
>> 2018-01-13 1:34 GMT+01:00 Flavio Pompermaier :
>>
>>> Any progress on this Fabian? HBase bulk loading is a common task for us
>>> and it's very annoying and uncomfortable to run a separate YARN job to
>>> accomplish it...
>>>
>>> On 10 Apr 2015 12:26, "Flavio Pompermaier"  wrote:
>>>
>>> Great! That will be awesome.
>>> Thank you Fabian
>>>
>>> On Fri, Apr 10, 2015 at 12:14 PM, Fabian Hueske 
>>> wrote:
>>>
 Hmm, that's a tricky question ;-) I would need to have a closer look.
 But getting custom comparators for sorting and grouping into the Combiner
 is not that trivial because it touches API, Optimizer, and Runtime code.
 However, I did that before for the Reducer and with the recent addition of
 groupCombine the Reducer changes might be just applied to combine.

 I'll be gone next week, but if you want to, we can have a closer look
 at the problem after that.

 2015-04-10 12:07 GMT+02:00 Flavio Pompermaier :

> I think I could also take care of it if somebody can help me and guide
> me a little bit..
> How long do you think it will require to complete such a task?
>
> On Fri, Apr 10, 2015 at 12:02 PM, Fabian Hueske 
> wrote:
>
>> We had an effort to execute any HadoopMR program by simply specifying
>> the JobConf and execute it (even embedded in regular Flink programs).
>> We got quite far but did not complete (counters and custom grouping /
>> sorting functions for Combiners are missing if I remember correctly).
>> I don't think that anybody is working on that right now, but it would
>> definitely be a cool feature.
>>
>> 2015-04-10 11:55 GMT+02:00 Flavio Pompermaier :
>>
>>> Hi guys,
>>>
>>> I have a nice question about Hadoop compatibility.
>>> In https://flink.apache.org/news/2014/11/18/hadoop-compatibilit
>>> y.html you say that you can reuse existing mapreduce programs.
>>> Could it be possible to manage also complex mapreduce programs like
>>> HBase BulkImport that use for example a custom partioner
>>> (org.apache.hadoop.mapreduce.Partitioner)?
>>>
>>> In the bulk-import examples the call 
>>> HFileOutputFormat2.configureIncrementalLoadMap
>>> that sets a series of job parameters (like partitioner, mapper, 
>>> reducers,
>>> etc) -> http://pastebin.com/8VXjYAEf.
>>> The full code of it can be seen at https://github.com/apache/h
>>> base/blob/master/hbase-server/src/main/java/org/apache/hadoo
>>> p/hbase/mapreduce/HFileOutputFormat2.java.
>>>
>>> Do you think there's any change to make it run in flink?
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>

>>>
>>>
>>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809 <+39%200461%20041809>
>


Re: MapState Initializion

2018-01-16 Thread Fabian Hueske
Alternatively, you can also create a keyed MapState as
context.getKeyedStateStore().getMapState() in
CheckpointedFunction.initializeState().

2018-01-16 9:58 GMT+01:00 Fabian Hueske :

> Sorry for the late response.
>
> MapState is currently only support as keyed state but not as operator
> state.
> If you want to create a keyed MapState the object should be created using
> a MapStateDescriptor in the open() method via the RuntimeContext.
>
>
>
> 2018-01-16 1:54 GMT+01:00 Boris Lublinsky :
>
>> Any response to this?
>>
>> List State is created using
>>
>> checkpointedState = context.getOperatorStateStore.getListState 
>> (checkPointDescriptor)
>>
>> But there is no similar APIs for MapState
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>> Begin forwarded message:
>>
>> *From: *Boris Lublinsky 
>> *Subject: **Re: Questions about managed operator state*
>> *Date: *January 14, 2018 at 7:39:00 PM CST
>> *To: *Fabian Hueske 
>> *Cc: *user 
>>
>> Thanks Fabian,
>> After I switched to var it compiles, but its not initialized.
>>
>> @transient private var currentModels : MapState[String, Model] = _
>>
>> Assignes null to MapState.
>> Do I create an empty hashMap there?
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>> On Jan 14, 2018, at 11:09 AM, Fabian Hueske  wrote:
>>
>> Hi Boris,
>>
>> the CheckpointedRestoring interface was removed in Flink 1.4.0 (and
>> deprecated in an earlier version). Unfortunately, the docs have not been
>> updated accordingly. I'll open a JIRA to fix this.
>> The replacements for CheckpointedRestoring are the CheckpointedFunction
>> or ListCheckpointed interfaces (see [1]).
>>
>> I think the compile error is caused because you define newModels as val
>> and not as var.
>>
>> Best, Fabian
>>
>> [1] https://github.com/apache/flink/blob/release-1.3/flink-strea
>> ming-java/src/main/java/org/apache/flink/streaming/api/
>> checkpoint/CheckpointedRestoring.java
>>
>> 2018-01-14 2:39 GMT+01:00 Boris Lublinsky 
>> :
>>
>>> Documentation https://ci.apache.org/projects/flink/flink-doc
>>> s-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>>> Refers to CheckpointedRestoring interface.
>>> *Which jar defines this interface - can’t find it*
>>>
>>> *Also documentation refers to *MapState. Do you have any
>>> example of creation of Map State.
>>> For value State in Scala, I can do just do
>>>
>>> private var sum: ValueState[(Long, Long)] = _
>>>
>>> But it does not work for MapState -
>>> Error:(44, 53) unbound placeholder parameter
>>>   private val newModels : MapState[String, Model] = _
>>>
>>>
>>> Boris Lublinsky
>>> FDP Architect
>>> boris.lublin...@lightbend.com
>>> https://www.lightbend.com/
>>>
>>>
>>
>> Begin forwarded message:
>>
>> *From: *Boris Lublinsky 
>> *Subject: **Questions about managed operator state*
>> *Date: *January 13, 2018 at 7:39:09 PM CST
>> *To: *user 
>>
>> Documentation https://ci.apache.org/projects/flink/flink-
>> docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>> Refers to CheckpointedRestoring interface.
>> *Which jar defines this interface - can’t find it*
>>
>> *Also documentation refers to *MapState. Do you have any example
>> of creation of Map State.
>> For value State in Scala, I can do just do
>>
>> private var sum: ValueState[(Long, Long)] = _
>>
>> But it does not work for MapState -
>> Error:(44, 53) unbound placeholder parameter
>>   private val newModels : MapState[String, Model] = _
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>>
>>
>


Re: Hadoop compatibility and HBase bulk loading

2018-01-16 Thread Flavio Pompermaier
Do you think is that complex to support it? I think we can try to implement
it if someone could give us some support (at least some big picture)

On Tue, Jan 16, 2018 at 10:02 AM, Fabian Hueske  wrote:

> No, I'm not aware of anybody working on extending the Hadoop compatibility
> support.
> I'll also have no time to work on this any time soon :-(
>
> 2018-01-13 1:34 GMT+01:00 Flavio Pompermaier :
>
>> Any progress on this Fabian? HBase bulk loading is a common task for us
>> and it's very annoying and uncomfortable to run a separate YARN job to
>> accomplish it...
>>
>> On 10 Apr 2015 12:26, "Flavio Pompermaier"  wrote:
>>
>> Great! That will be awesome.
>> Thank you Fabian
>>
>> On Fri, Apr 10, 2015 at 12:14 PM, Fabian Hueske 
>> wrote:
>>
>>> Hmm, that's a tricky question ;-) I would need to have a closer look.
>>> But getting custom comparators for sorting and grouping into the Combiner
>>> is not that trivial because it touches API, Optimizer, and Runtime code.
>>> However, I did that before for the Reducer and with the recent addition of
>>> groupCombine the Reducer changes might be just applied to combine.
>>>
>>> I'll be gone next week, but if you want to, we can have a closer look at
>>> the problem after that.
>>>
>>> 2015-04-10 12:07 GMT+02:00 Flavio Pompermaier :
>>>
 I think I could also take care of it if somebody can help me and guide
 me a little bit..
 How long do you think it will require to complete such a task?

 On Fri, Apr 10, 2015 at 12:02 PM, Fabian Hueske 
 wrote:

> We had an effort to execute any HadoopMR program by simply specifying
> the JobConf and execute it (even embedded in regular Flink programs).
> We got quite far but did not complete (counters and custom grouping /
> sorting functions for Combiners are missing if I remember correctly).
> I don't think that anybody is working on that right now, but it would
> definitely be a cool feature.
>
> 2015-04-10 11:55 GMT+02:00 Flavio Pompermaier :
>
>> Hi guys,
>>
>> I have a nice question about Hadoop compatibility.
>> In https://flink.apache.org/news/2014/11/18/hadoop-compatibility.html
>> you say that you can reuse existing mapreduce programs.
>> Could it be possible to manage also complex mapreduce programs like
>> HBase BulkImport that use for example a custom partioner
>> (org.apache.hadoop.mapreduce.Partitioner)?
>>
>> In the bulk-import examples the call 
>> HFileOutputFormat2.configureIncrementalLoadMap
>> that sets a series of job parameters (like partitioner, mapper, reducers,
>> etc) -> http://pastebin.com/8VXjYAEf.
>> The full code of it can be seen at https://github.com/apache/h
>> base/blob/master/hbase-server/src/main/java/org/apache/hadoo
>> p/hbase/mapreduce/HFileOutputFormat2.java.
>>
>> Do you think there's any change to make it run in flink?
>>
>> Best,
>> Flavio
>>
>
>

>>>
>>
>>
>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809 <+39%200461%20041809>


Re: Hadoop compatibility and HBase bulk loading

2018-01-16 Thread Fabian Hueske
No, I'm not aware of anybody working on extending the Hadoop compatibility
support.
I'll also have no time to work on this any time soon :-(

2018-01-13 1:34 GMT+01:00 Flavio Pompermaier :

> Any progress on this Fabian? HBase bulk loading is a common task for us
> and it's very annoying and uncomfortable to run a separate YARN job to
> accomplish it...
>
> On 10 Apr 2015 12:26, "Flavio Pompermaier"  wrote:
>
> Great! That will be awesome.
> Thank you Fabian
>
> On Fri, Apr 10, 2015 at 12:14 PM, Fabian Hueske  wrote:
>
>> Hmm, that's a tricky question ;-) I would need to have a closer look. But
>> getting custom comparators for sorting and grouping into the Combiner is
>> not that trivial because it touches API, Optimizer, and Runtime code.
>> However, I did that before for the Reducer and with the recent addition of
>> groupCombine the Reducer changes might be just applied to combine.
>>
>> I'll be gone next week, but if you want to, we can have a closer look at
>> the problem after that.
>>
>> 2015-04-10 12:07 GMT+02:00 Flavio Pompermaier :
>>
>>> I think I could also take care of it if somebody can help me and guide
>>> me a little bit..
>>> How long do you think it will require to complete such a task?
>>>
>>> On Fri, Apr 10, 2015 at 12:02 PM, Fabian Hueske 
>>> wrote:
>>>
 We had an effort to execute any HadoopMR program by simply specifying
 the JobConf and execute it (even embedded in regular Flink programs).
 We got quite far but did not complete (counters and custom grouping /
 sorting functions for Combiners are missing if I remember correctly).
 I don't think that anybody is working on that right now, but it would
 definitely be a cool feature.

 2015-04-10 11:55 GMT+02:00 Flavio Pompermaier :

> Hi guys,
>
> I have a nice question about Hadoop compatibility.
> In https://flink.apache.org/news/2014/11/18/hadoop-compatibility.html
> you say that you can reuse existing mapreduce programs.
> Could it be possible to manage also complex mapreduce programs like
> HBase BulkImport that use for example a custom partioner
> (org.apache.hadoop.mapreduce.Partitioner)?
>
> In the bulk-import examples the call 
> HFileOutputFormat2.configureIncrementalLoadMap
> that sets a series of job parameters (like partitioner, mapper, reducers,
> etc) -> http://pastebin.com/8VXjYAEf.
> The full code of it can be seen at https://github.com/apache/h
> base/blob/master/hbase-server/src/main/java/org/apache/
> hadoop/hbase/mapreduce/HFileOutputFormat2.java.
>
> Do you think there's any change to make it run in flink?
>
> Best,
> Flavio
>


>>>
>>
>
>


Re: MapState Initializion

2018-01-16 Thread Fabian Hueske
Sorry for the late response.

MapState is currently only support as keyed state but not as operator state.
If you want to create a keyed MapState the object should be created using a
MapStateDescriptor in the open() method via the RuntimeContext.



2018-01-16 1:54 GMT+01:00 Boris Lublinsky :

> Any response to this?
>
> List State is created using
>
> checkpointedState = context.getOperatorStateStore.getListState 
> (checkPointDescriptor)
>
> But there is no similar APIs for MapState
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
> Begin forwarded message:
>
> *From: *Boris Lublinsky 
> *Subject: **Re: Questions about managed operator state*
> *Date: *January 14, 2018 at 7:39:00 PM CST
> *To: *Fabian Hueske 
> *Cc: *user 
>
> Thanks Fabian,
> After I switched to var it compiles, but its not initialized.
>
> @transient private var currentModels : MapState[String, Model] = _
>
> Assignes null to MapState.
> Do I create an empty hashMap there?
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
> On Jan 14, 2018, at 11:09 AM, Fabian Hueske  wrote:
>
> Hi Boris,
>
> the CheckpointedRestoring interface was removed in Flink 1.4.0 (and
> deprecated in an earlier version). Unfortunately, the docs have not been
> updated accordingly. I'll open a JIRA to fix this.
> The replacements for CheckpointedRestoring are the CheckpointedFunction
> or ListCheckpointed interfaces (see [1]).
>
> I think the compile error is caused because you define newModels as val
> and not as var.
>
> Best, Fabian
>
> [1] https://github.com/apache/flink/blob/release-1.3/flink-
> streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/
> CheckpointedRestoring.java
>
> 2018-01-14 2:39 GMT+01:00 Boris Lublinsky :
>
>> Documentation https://ci.apache.org/projects/flink/flink-
>> docs-release-1.4/dev/stream/state/state.html#using-managed-operator-state
>> Refers to CheckpointedRestoring interface.
>> *Which jar defines this interface - can’t find it*
>>
>> *Also documentation refers to *MapState. Do you have any example
>> of creation of Map State.
>> For value State in Scala, I can do just do
>>
>> private var sum: ValueState[(Long, Long)] = _
>>
>> But it does not work for MapState -
>> Error:(44, 53) unbound placeholder parameter
>>   private val newModels : MapState[String, Model] = _
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>>
>
> Begin forwarded message:
>
> *From: *Boris Lublinsky 
> *Subject: **Questions about managed operator state*
> *Date: *January 13, 2018 at 7:39:09 PM CST
> *To: *user 
>
> Documentation https://ci.apache.org/projects/flink/
> flink-docs-release-1.4/dev/stream/state/state.html#using-
> managed-operator-state
> Refers to CheckpointedRestoring interface.
> *Which jar defines this interface - can’t find it*
>
> *Also documentation refers to *MapState. Do you have any example
> of creation of Map State.
> For value State in Scala, I can do just do
>
> private var sum: ValueState[(Long, Long)] = _
>
> But it does not work for MapState -
> Error:(44, 53) unbound placeholder parameter
>   private val newModels : MapState[String, Model] = _
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
>
>


Failed to serialize remote message [class org.apache.flink.runtime.messages.JobManagerMessages$JobFound

2018-01-16 Thread jelmer
HI,

We recently upgraded our test environment to from flink 1.3.2 to flink
1.4.0.

We are using a high availability setup on the job manager. And now often
when I go to the job details in the web ui the call will timeout and the
following error will pop up in the job manager log


akka.remote.MessageSerializer$SerializationException: Failed to serialize
remote message [class
org.apache.flink.runtime.messages.JobManagerMessages$JobFound] using
serializer [class akka.serialization.JavaSerializer].
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:61)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at
akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:889)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:888)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter.writeSend(Endpoint.scala:780)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:755)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:446)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.4.0.jar:1.4.0]
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.4.0.jar:1.4.0]
Caused by: java.io.NotSerializableException:
org.apache.flink.runtime.executiongraph.ExecutionGraph
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
~[na:1.8.0_131]
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
~[na:1.8.0_131]
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
~[na:1.8.0_131]
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
~[na:1.8.0_131]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
~[na:1.8.0_131]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
~[na:1.8.0_131]
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply$mcV$sp(Serializer.scala:321)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at
akka.serialization.JavaSerializer$$anonfun$toBinary$1.apply(Serializer.scala:321)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.serialization.JavaSerializer.toBinary(Serializer.scala:321)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
at akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:47)
~[flink-dist_2.11-1.4.0.jar:1.4.0]
... 17 common frames omitted



I isolated it further, and it seems to be triggered by this call

https://hostname/jobs/28076fffbcf7eab3f17900a54cc7c41d

I cannot reproduce it on my local lapop without HA setup.
Before I dig any deeper, has anyone already come across this ?