Flink memory usage

2017-11-03 Thread AndreaKinn
Hi,
I would like to share some considerations about Flink memory consumption.
I have a cluster composed of three nodes: 1 used both as JM and TM and other
2 TM.

I ran two identical applications (in different moments) on it. The only
difference is that on the second one I doubled every operators, essentially
to check what changes in resource's usage.

Analysing the outcomes on cpu side effectively the efforts are doubled.
Doing the same with memory I had these results:


 

which to me seems completely counterintuitive since the results are
essentially equal.
I can imagine in the second case the memory was effectively almost full but
why Flink gets such a lot of memory even in the first case?
How it is explained this behaviour?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
Thank you very much for your steady response, Kostas!

Cheers,
Federico

2017-11-03 16:26 GMT+01:00 Kostas Kloudas :

> Hi Federico,
>
> Thanks for trying it out!
> Great to hear that your problem was fixed!
>
> The feature freeze for the release is going to be next week, and I would
> expect 1 or 2 more weeks testing.
> So I would say in 2.5 weeks. But this is of course subject to potential
> issues we may find during testing.
>
> Cheers,
> Kostas
>
> On Nov 3, 2017, at 4:22 PM, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
> Hi Kostas,
>
> I just tried running the same job with 1.4-SNAPSHOT for 10 minutes and it
> didn't crash, so that was the same underlying issue of the JIRA you linked.
>
> Do you happen to know when it's expected the 1.4 stable release?
>
> Thank you very much,
> Federico
>
> 2017-11-03 15:25 GMT+01:00 Kostas Kloudas :
>
>> Perfect! thanks a lot!
>>
>> Kostas
>>
>> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio <
>> federico.dambro...@smartlab.ws> wrote:
>>
>> Hi Kostas,
>>
>> yes, I'm using 1.3.2. I'll try the current master and I'll get back to
>> you.
>>
>> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas :
>>
>>> Hi Federico,
>>>
>>> I assume that you are using Flink 1.3, right?
>>>
>>> In this case, in 1.4 we have fixed a bug that seems similar to your case:
>>> https://issues.apache.org/jira/browse/FLINK-7756
>>>
>>> Could you try the current master to see if it fixes your problem?
>>>
>>> Thanks,
>>> Kostas
>>>
>>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
>>> federico.dambro...@smartlab.ws> wrote:
>>>
>>>  Could not find id for entry:
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Federico D'Ambrosio
>>
>>
>>
>
>
> --
> Federico D'Ambrosio
>
>
>


-- 
Federico D'Ambrosio


Re: Making external calls from a FlinkKafkaPartitioner

2017-11-03 Thread Ron Crocker
Thanks Nico -

Thanks for the feedback, and nice catch on the missing volatile. 

Ron
—
Ron Crocker
Principal Engineer & Architect
( ( •)) New Relic
rcroc...@newrelic.com
M: +1 630 363 8835

> On Nov 3, 2017, at 7:48 AM, Nico Kruber  wrote:
> 
> Hi Ron,
> imho your code should be fine (except for a potential visibility problem on 
> the 
> changes of the non-volatile partitionMap member, depending on your needs).
> 
> The #open() method should be called (once) for each sink initialization 
> (according to the javadoc) and then you should be fine with the asynchronous 
> updater thread.
> I'm including Gordon (cc'd) just to be sure as he may know more.
> 
> 
> Nico
> 
> On Friday, 3 November 2017 04:06:02 CET Ron Crocker wrote:
>> We have a system where the Kafka partition a message should go into is a
>> function of a value in the message. Often, it’s value % # partitions, but
>> for some values it’s not - it’s a specified list of partitions that changes
>> over time. Our “simple Java library” that produces messages for this system
>> also has a background thread that periodically polls a HTTP endpoint (at a
>> rate of 1/minute as its default) to refresh that list of special cases.
>> 
>> It’s easy to create a FlinkKafkaPartitioner that does the mod operation;
>> what I’m not so sure about is how to get this polling operation into the
>> partitioner. I’m about to try it the obvious way (create a background
>> thread that polls the URL and updates the partition map), but I wonder if
>> that’s actually going to cause a bunch of problems for the Flink runtime.
>> 
>> Here’s the code that I have right now:
>> public class EventInsertPartitioner extends KafkaPartitioner> String>> { private final String partitionerURL;
>>private final long updateIntervalInMillis;
>>private Map partitionMap;
>>private ScheduledExecutorService executor;
>> 
>>public EventInsertPartitioner(String partitionerURL, long
>> updateIntervalInMillis) { this.partitionerURL = partitionerURL;
>>this.updateIntervalInMillis = updateIntervalInMillis;
>>this.partitionMap = new HashMap<>();
>>}
>> 
>>@Override
>>public void open(int parallelInstanceId, int parallelInstances, int[]
>> partitions) { executor = Executors.newScheduledThreadPool(1);
>>executor.scheduleAtFixedRate(
>>() -> updatePartitionMapRunnable(),
>>updateIntervalInMillis,
>>updateIntervalInMillis,
>>TimeUnit.MILLISECONDS);
>> 
>>}
>> 
>>private void updatePartitionMapRunnable() {
>>// Make synchronous request to partitionerURL
>>// This is a simple JSON that matches our data
>>String response = "{1:[1,2,3],2:[2]}";
>>// Replace current partitionMap with new HashMap from the response
>>this.partitionMap = convertResponseToMap(response);
>>// Replacing the current value of partitionMap with the updated
>> version doesn't // require synchronization
>>}
>> 
>>private Map convertResponseToMap(String response) {
>> Map hashMap = new HashMap<>();
>>// Convert response to JSON structure and just use that?
>>// or Iterate and add to local hashMap
>>return hashMap;
>>}
>> 
>>@Override
>>public int partition(Tuple2 next, byte[] serializedKey,
>> byte[] serializedValue, int numPartitions) { long myKey = next.f0;
>> 
>>if (partitionMap.containsKey(myKey)) {
>>List partitions = partitionMap.get(myKey);
>>myKey =
>> partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); }
>> 
>>return (int)(myKey % numPartitions);
>>}
>> }
>> Ron
>> —
>> Ron Crocker
>> Principal Engineer & Architect
>> ( ( •)) New Relic
>> rcroc...@newrelic.com
>> M: +1 630 363 8835



Re: Using Flink Ml with DataStream

2017-11-03 Thread Adarsh Jain
Hi Chesnay,

Thanks for the reply, do you know how to serve using the trained model?

Where is the model saved?

Regards,
Adarsh



‌

On Wed, Nov 1, 2017 at 4:46 PM, Chesnay Schepler  wrote:

> I don't believe this to be possible. The ML library works exclusively with
> the Batch API.
>
>
> On 30.10.2017 12:52, Adarsh Jain wrote:
>
>
> Hi,
>
> Is there a way to use Stochastic Outlier Selection (SOS) and/or SVM using
> CoCoA with streaming data.
>
> Please suggest and give pointers.
>
> Regards,
> Adarsh
>
> ‌
>
>
>


Re: Incremental checkpointing documentation

2017-11-03 Thread Nico Kruber
Hi Elias,
let me answer the questions to the best of my knowledge, but in general I 
think this is as expected.
(Let me give a link to the docs explaining the activation [1] for other 
readers first.)

On Friday, 3 November 2017 01:11:52 CET Elias Levy wrote:
> What is the interaction of incremental checkpointing and external
> checkpoints?

Externalized checkpoints may be incremental [2] (I'll fix the formatting error 
that is not rendering the arguments as a list, making them less visible)

> Any interaction with the state.checkpoints.num-retained config?

Yes, this remains the number of available checkpoints. There may, however, be 
more folders containing RocksDB state that was originally put into checkpoint 
X but is also still required in checkpoint X+10 or so. These files will be 
cleaned up once they are not needed anymore.

> Does incremental checkpointing require any maintenance?

No, state is cleaned up once it is not used/referenced anymore.

> Any interaction with savepoints?

No, a savepoint uses Flink's own data format and is not incremental [3].

> Does it perform better against certain "file systems"?  E.g. it S3 not
> recommended for it?  How about EFS?

I can't think of a reason this should be any different to non-incremental 
checkpoints. Maybe Stefan (cc'd) has some more info on this.

For more details on the whole topic, I can recommend Stefan's talk at the last 
Flink Forward [4] though.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/
large_state_tuning.html#tuning-rocksdb
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/
checkpoints.html#difference-to-savepoints
[3] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/
savepoints.html
[4] https://www.youtube.com/watch?
v=dWQ24wERItM=36=PLDX4T_cnKjD0JeULl1X6iTn7VIkDeYX_X

signature.asc
Description: This is a digitally signed message part.


Re: Making external calls from a FlinkKafkaPartitioner

2017-11-03 Thread Nico Kruber
Hi Ron,
imho your code should be fine (except for a potential visibility problem on the 
changes of the non-volatile partitionMap member, depending on your needs).

The #open() method should be called (once) for each sink initialization 
(according to the javadoc) and then you should be fine with the asynchronous 
updater thread.
I'm including Gordon (cc'd) just to be sure as he may know more.


Nico

On Friday, 3 November 2017 04:06:02 CET Ron Crocker wrote:
> We have a system where the Kafka partition a message should go into is a
> function of a value in the message. Often, it’s value % # partitions, but
> for some values it’s not - it’s a specified list of partitions that changes
> over time. Our “simple Java library” that produces messages for this system
> also has a background thread that periodically polls a HTTP endpoint (at a
> rate of 1/minute as its default) to refresh that list of special cases.
> 
> It’s easy to create a FlinkKafkaPartitioner that does the mod operation;
> what I’m not so sure about is how to get this polling operation into the
> partitioner. I’m about to try it the obvious way (create a background
> thread that polls the URL and updates the partition map), but I wonder if
> that’s actually going to cause a bunch of problems for the Flink runtime.
> 
> Here’s the code that I have right now:
> public class EventInsertPartitioner extends KafkaPartitioner String>> { private final String partitionerURL;
> private final long updateIntervalInMillis;
> private Map partitionMap;
> private ScheduledExecutorService executor;
> 
> public EventInsertPartitioner(String partitionerURL, long
> updateIntervalInMillis) { this.partitionerURL = partitionerURL;
> this.updateIntervalInMillis = updateIntervalInMillis;
> this.partitionMap = new HashMap<>();
> }
> 
> @Override
> public void open(int parallelInstanceId, int parallelInstances, int[]
> partitions) { executor = Executors.newScheduledThreadPool(1);
> executor.scheduleAtFixedRate(
> () -> updatePartitionMapRunnable(),
> updateIntervalInMillis,
> updateIntervalInMillis,
> TimeUnit.MILLISECONDS);
> 
> }
> 
> private void updatePartitionMapRunnable() {
> // Make synchronous request to partitionerURL
> // This is a simple JSON that matches our data
> String response = "{1:[1,2,3],2:[2]}";
> // Replace current partitionMap with new HashMap from the response
> this.partitionMap = convertResponseToMap(response);
> // Replacing the current value of partitionMap with the updated
> version doesn't // require synchronization
> }
> 
> private Map convertResponseToMap(String response) {
> Map hashMap = new HashMap<>();
> // Convert response to JSON structure and just use that?
> // or Iterate and add to local hashMap
> return hashMap;
> }
> 
> @Override
> public int partition(Tuple2 next, byte[] serializedKey,
> byte[] serializedValue, int numPartitions) { long myKey = next.f0;
> 
> if (partitionMap.containsKey(myKey)) {
> List partitions = partitionMap.get(myKey);
> myKey =
> partitions.get(ThreadLocalRandom.current().nextInt(partitions.size())); }
> 
> return (int)(myKey % numPartitions);
> }
> }
> Ron
> —
> Ron Crocker
> Principal Engineer & Architect
> ( ( •)) New Relic
> rcroc...@newrelic.com
> M: +1 630 363 8835

signature.asc
Description: This is a digitally signed message part.


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Kostas Kloudas
Perfect! thanks a lot!

Kostas

> On Nov 3, 2017, at 3:23 PM, Federico D'Ambrosio 
>  wrote:
> 
> Hi Kostas, 
> 
> yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.
> 
> 2017-11-03 15:21 GMT+01:00 Kostas Kloudas  >:
> Hi Federico,
> 
> I assume that you are using Flink 1.3, right?
> 
> In this case, in 1.4 we have fixed a bug that seems similar to your case:
> https://issues.apache.org/jira/browse/FLINK-7756 
> 
> 
> Could you try the current master to see if it fixes your problem?
> 
> Thanks,
> Kostas
> 
>> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio 
>> > 
>> wrote:
>> 
>>  Could not find id for entry:
>> 
> 
> 
> 
> 
> -- 
> Federico D'Ambrosio



Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
Hi Kostas,

yes, I'm using 1.3.2. I'll try the current master and I'll get back to you.

2017-11-03 15:21 GMT+01:00 Kostas Kloudas :

> Hi Federico,
>
> I assume that you are using Flink 1.3, right?
>
> In this case, in 1.4 we have fixed a bug that seems similar to your case:
> https://issues.apache.org/jira/browse/FLINK-7756
>
> Could you try the current master to see if it fixes your problem?
>
> Thanks,
> Kostas
>
> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
>  Could not find id for entry:
>
>
>
>


-- 
Federico D'Ambrosio


Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Kostas Kloudas
Hi Federico,

I assume that you are using Flink 1.3, right?

In this case, in 1.4 we have fixed a bug that seems similar to your case:
https://issues.apache.org/jira/browse/FLINK-7756 


Could you try the current master to see if it fixes your problem?

Thanks,
Kostas

> On Nov 3, 2017, at 3:12 PM, Federico D'Ambrosio 
>  wrote:
> 
>  Could not find id for entry: 
>



Re: FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
I'm sorry, I realized that the stacktrack was poorly formatted, here it is
a better formatting:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 for operator KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operatorKeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1 (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed
keyed state future.
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
... 5 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
... 7 more
Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509716685000, 0), [SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)],
1)
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:971)
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:838)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:928)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:852)
at
org.apache.flink.runtime.state.heap.NestedMapsStateTable$NestedMapsStateTableSnapshot.writeMappingsInKeyGroup(NestedMapsStateTable.java:355)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:347)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
at
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.snapshot(HeapKeyedStateBackend.java:372)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:397)
   

FlinkCEP, circular references and checkpointing failures

2017-11-03 Thread Federico D'Ambrosio
Hello everyone,

I'm a bit experimenting with FlinkCEP and I'm noticing weird failures when
it comes to checkpoints and within clauses windows closing at the same time
a (synchronous, both on Fs and RocksDB, stored in hdfs) checkpoint occurs.

The following is the relevant code:

val env : StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(6) //Checkpoints every minute
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints-dir"))

//Pattern
val pattern =
  Pattern

.begin[EventWithId]("flying").oneOrMore.where(_.event.instantValues.altitude
>= 37000)
.notNext("disappearing").where(_.event.instantValues.altitude >=
37000).within(Time.minutes(1))

// Associate KeyedStream with pattern to be detected
val patternStream  = CEP.pattern(streamById, pattern)

which causes failure on the second checkpoint with the following exception
stack trace:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint
2 fo   r operator
KeyedCEPPatternOperator -> alert-select -> Sink:
notification-sink-1
(1/1).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:970)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:51
1)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: Could not materialize checkpoint 2 for
operator
KeyedCEPPatternOperator -> alert-select -> Sink: notification-sink-1 (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalStateExcept
ion: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"
YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registration":"C-G
OJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time":"2017-
11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
150971668500   0, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:4
3)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRu
nnable.run(StreamTask.java:897)
... 5 more
Suppressed: java.lang.Exception: Could not properly cancel managed
keyedstate future.
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:90)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.cleanup(StreamTask.java:1023)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncChec
kpointRunnable.run(StreamTask.java:961)
... 5 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.IllegalSta
teException: Could not find id for entry:
SharedBufferEntry(ValueTimeWrapper({"o
rigin":"YUL","destination":"YWG","flight":"AC8593","aircraft":"CRJ7","registrati
on":"C-GOJZ","callsign":"JZA593","speed":370,"altitude":38000,"course":287,"time
":"2017-11-03 13:44:45.000","lat":47.9129,"lon":-87.2922,"_id":"AC8593-1"},
1509   716685000, 0),
[SharedBufferEdge(null, 5), SharedBufferEdge(null, 6)], 1)
at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUti
l.java:43)
at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(S
tateUtil.java:85)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResu
lt.cancel(OperatorSnapshotResult.java:88)
... 7 more
Caused by: java.lang.IllegalStateException: Could not find id for
entry:
SharedBufferEntry(ValueTimeWrapper({"origin":"YUL","destination":"YWG","flight"
:"AC8593","aircraft":"CRJ7","registration":"C-GOJZ","callsign":"JZA593","speed":
370,"altitude":38000,"course":287,"time":"2017-11-03
13:44:45.000","lat":47.9129
,"lon":-87.2922,"_id":"AC8593-1"}, 1509716685000, 0),
[SharedBufferEdge(null,
5)   ,
SharedBufferEdge(null, 6)], 1)
at
org.apache.flink.util.Preconditions.checkState(Preconditions.
java:195)
at

Re: Negative values using latency marker

2017-11-03 Thread Nico Kruber
Hi Tovi,
if I see this correctly, the LatencyMarker gets its initial timstamp during 
creation at the source and the latency is reported as a metric at a sink by 
comparing the initial timestamp with the current time.
If the clocks between the two machines involved diverge, e.g. the sinks clock 
falling behind, the difference may be negative.


Nico

On Thursday, 2 November 2017 17:58:51 CET Sofer, Tovi  wrote:
> Hi group,
> 
> Can someone maybe elaborate how can latency gauge shown by latency marker be
> negative?
> 
> 2017-11-02 18:54:56,842 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Sink: FinalSink.0.latency: {LatencySourceDescriptor{vertexID=1,
> subtaskIndex=0}={p99=-5.0, p50=-5.0, min=-5.0, max=-5.0, p95=-5.0,
> mean=-5.0}, LatencySourceDescriptor{vertexID=1, subtaskIndex=1}={p99=-5.0,
> p50=-5.0, min=-5.0, max=-5.0, p95=-5.0, mean=-5.0},
> LatencySourceDescriptor{vertexID=1, subtaskIndex=2}={p99=-6.0, p50=-6.0,
> min=-6.0, max=-6.0, p95=-6.0, mean=-6.0},
> LatencySourceDescriptor{vertexID=1, subtaskIndex=3}={p99=-6.0, p50=-6.0,
> min=-6.0, max=-6.0, p95=-6.0, mean=-6.0}} 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.60SecWarmUpRecordsCounter: 2858446
> 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.3.numRecordsOut: 1954784 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.ActualRecordsCounter: 4962675 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.AverageLatencyMs: 0.0753785 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.HighLatencyMsgPercentage: 0.5918576
> 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.0.numRecordsOutPerSecond:
> 12943.1167 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.numRecordsInPerSecond: 51751.4 2017-11-02
> 18:54:56,843 INFO  com.citi.artemis.flink.reporters.ArtemisReporter -
> [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.3.numRecordsOutPerSecond: 12935.05
> 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.2.numRecordsOutPerSecond:
> 12946.9166 2017-11-02 18:54:56,843 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.Source: fixTopicConsumerSource.1.numRecordsOutPerSecond:
> 12926.3168 2017-11-02 18:54:56,844 INFO 
> com.citi.artemis.flink.reporters.ArtemisReporter - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.f40ca57104c8642218e5b8979a380ee4.Flink Streaming
> Job.AverageE2ELatencyChecker.0.LatencyHistogram: count:1 min:24753
> max:19199891 mean:77637.6484 stddev:341333.9414842662 p50:40752.0
> p75:49809.0 p95:190480.95 p98:539110.819994 p99:749224.889995
> p999:3817927.9259998496
> 
> Regards,
> Tovi



signature.asc
Description: This is a digitally signed message part.


Fwd: Initialise side input state

2017-11-03 Thread Maxim Parkachov
Hi Xingcan,

On Fri, Nov 3, 2017 at 3:38 AM, Xingcan Cui  wrote:

> Hi Maxim,
>
> if I understand correctly, you actually need to JOIN the fast stream with
> the slow stream. Could you please share more details about your problem?
>

Sure I can explain more, with some example of pseudo-code. I have external
DB with price list with following structure:

case class PriceList(productId, price)

My events are purchase events with following structure:

case class Purchase(productId, amount)

I would like to get final stream with TotalAmount = Amount*Price in
structure like this:

case class PurchaseTotal(productId, totalAmount)

I have 2 corresponding input streams:

val prices = env.addSource(new PriceListSource).keyBy(_.productId)
val purchases = env.addSource(new PurchaseSource).keyBy(_.productId)

PriceListSource delivers me all CHANGES to external DB table.

Calculate function looks similar to:

class CalculateFunction extends CoProcessFunction[Purchase, PriceList,
PurchaseTotal] {

  private var price: ValueState[Int] = _

  override def processElement1... {
out.collect(PurchaseTotal(purchase.productId, purchase.amount *
priceList.value))
  }

  override def processElement2... {
price.update(priceList.value)
  }
}

And finally pipeline:

purchases.connect(prices).process(new CalculateFunction).print

The issue is, when I start program my price ValueState is empty and will
not be populated with data which is not updated in DB.
BTW, I cannot use AsyncIO to query DB, because of several technical
restrictions.

1. When you mentioned "they have the same key", did you mean all the data
> get the same key or the logic should be applied with fast.key = slow.key?
>

I meant here that productId in purchase event is definitely exist in
external price list DB (so, it is kind of inner join)


> 2. What should be done to initialize the state?
>

I need to read external DB table and populate price ValueState before
processing first purchase event.

Hope this minimal example helps to understand.
Maxim.


>
> Best,
> Xingcan
>
>
> On Fri, Nov 3, 2017 at 5:54 AM, Maxim Parkachov 
> wrote:
>
>> Hi Flink users,
>>
>> I'm struggling with some basic concept and would appreciate some help. I
>> have 2 Input streams, one is fast event stream and one is slow changing
>> dimension. They have the same key and I use CoProcessFunction to store
>> slow data in state and enrich fast data from this state. Everything
>> works as expected.
>>
>> Before I start processing fast streams on first run, I would like to 
>> completely
>> initialise state. I though it could be done in open(), but I don't
>> understand how it will be re-distributed across parallel operators.
>>
>> Another alternative would be to create custom source and push all slow 
>> dimension
>> data downstream, but I could not find how to hold processing fast data
>> until state is initialised.
>>
>> I realise that FLIP-17 (Side Inputs) is what I need, but is there some other
>> way to implement it now ?
>>
>> Thanks,
>> Maxim.
>>
>>
>