Re: Regarding caching the evicted elements and re-emitting them to the next window

2017-01-13 Thread Aljoscha Krettek
Hi,
I'm afraid there is no functionality for this in Flink. What you can do,
however, is to not evict these elements from the window buffer but instead
ignore them when processing your elements in the WindowFunction. This way
they will be preserved for the next firing. You have to make sure to
eventually evict some elements, however. Otherwise you would have a memory
leak.

Aljoscha

On Sun, Jan 8, 2017, 23:47 Abdul Salam Shaikh 
wrote:

> Hi,
>
> I am using 1.2-Snapshot version of Apache Flink which provides the new
> enhanced Evictor functionality and using customized triggers for Global
> Window. I have a use case where I am evicting the unwanted event(element)
> for the current window before it is evaluated. However, I am looking for
> options to cache this evicted element and re-use it in the next window. Is
> there a possibility which can help me achieve this in the context of Flink
> or in a more generic programming approach.
>
> Thanks in anticipation!
>


Re: Strategies for Complex Event Processing with guaranteed data consistency

2017-01-13 Thread Fabian Hueske
On thing to add: the Flink KafkaProducer provides only at-least-once if
flush-on-checkpoint is enabled [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean-

2017-01-13 22:02 GMT+01:00 Fabian Hueske :

> Hi Kat,
>
> I did not understand the difference between a case and a trace.
> If I got it right, the goal of your first job is to assemble the
> individual events into cases. Is a case here the last event for a case-id
> or all events of a case-id?
> If a case is the collection of all events (which I assume) what is the
> difference to a trace which is also the list of events (if I got it right)?
>
> In any case, I think your first job can also be solved without a session
> window (which is quite complex internally).
> There are two options:
> 1) use a global window [1] with a custom trigger that triggers for each
> arriving record. A global window does never end, which would be OK since
> your cases do not end as well.
> 2) use a MapFunction with key-partitioned operator state [2]. The map
> function would simply update the state for every new event and emit a new
> result.
>
> Regarding your concerns of losing data when writing to Kafka. Flink's
> KafkaProducer provides at-least-once guarantees, which means that data
> might be written more than once in case of a failure but won't be lost. If
> the Kafka topic is partitioned by case-id and you only need the last record
> per case-id, Kafka's log compaction should give you upsert semantics.
>
> Regarding your question "Is using state in this way a somewhat standard
> practice, or is state intended more for recovery?":
> Many streaming applications require state for their semantics (just like
> yours), i.e., they need to buffer data and wait for more data to arrive. In
> order to guarantee consistent result semantics of an application, the state
> must not be lost and be recovered in case of a failure. So state is not
> intended for recovery, but recovery is needed to guarantee application
> semantics.
>
> As I said before, I did not get the difference between cases and trace, so
> I cannot really comment on the job to analyze traces.
>
> Hope this helps,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/windows.html#global-windows
> [2] https://ci.apache.org/projects/flink/flink-docs-
> release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface
>
> 2017-01-13 11:04 GMT+01:00 Kathleen Sharp :
>
>> I have been playing around with Flink for a few weeks to try to
>> ascertain whether or not it meets our use cases, and also what best
>> practices we should be following. I have a few questions I would
>> appreciate answers to.
>>
>>
>> Our scenario is that we want to process a lot of event data into
>> cases. A case is an inorder sequence of events; this event data could
>> be quite old. We never know when a case is complete, so we just want
>> to have the most up to date picture of what a case looks like.
>>
>>
>> The inorder sequence of events of a case is called the trace. Many
>> cases could have an identical trace. We would like to construct these
>> traces, and do some aggregations on those (case count, average/min/max
>> life-cycle time).
>>
>>
>> We then have further downstream processing we will do on a case, some
>> of which would require additional inputs, either from side-inputs of
>> somehow joining data sources.
>>
>>
>> We don’t really care about event time at the moment, because we just
>> want to build cases and traces with all the data we have received.
>>
>>
>> The end results should be available for our web front end via rest api.
>>
>>
>> Based on the above I have the following idea for a first implementation:
>>
>>
>> Kafka source -> key by case id -> session window with rocks db state
>> backend holding case for that key -> postgres sink
>>
>>
>> The reason for a session window is that, as I mentioned above, we just
>> want to build a group with all the data we have received into kafka up
>> until that point in time. We would experiment with what this gap time
>> should be, and in future it might be specific to the type of group,
>> but for the start a naive approach is acceptable. I think this could
>> be better than just doing it, say, every 10 minutes because we really
>> don’t know yet the frequency of the data received. Also, some inputs
>> to kafka come directly from a CSV upload, so we will get “firehose”
>> periods, and periods of nothing.
>>
>> In short: I think what we have closely matches session behaviour.
>>
>>
>> We also have to implement a postgres sink that is capable of doing
>> upserts. The reason for postgres is to service the rest front end.
>>
>>
>> We then have to build our traces and can see two options for it:
>>
>>
>> 1) The most obvious solution would be to use a kafka sink for the

Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

2017-01-13 Thread Tzu-Li (Gordon) Tai
Hi Andrew,

Your observations are correct. Like you mentioned, the current problem circles 
around how we deal with the pending buffered requests with accordance to 
Flink’s checkpointing.
I’ve filed a JIRA for this, as well as some thoughts for the solution in the 
description: https://issues.apache.org/jira/browse/FLINK-5487. What do you 
think?

Thank you for bringing this up! We should probably fix this soon.
There’s already some on-going effort in fixing some other aspects of proper 
at-least-once support in the Elasticsearch sinks, so I believe this will be 
brought to attention very soon too.

Cheers,
Gordon




On January 11, 2017 at 3:49:06 PM, Andrew Roberts (arobe...@fuze.com) wrote:

I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in 
terms of message delivery. according to (1), the ES sink offers at-least-once 
guarantees. This page doesn’t differentiate between flink-elasticsearch and 
flink-elasticsearch2, so I have to assume for the moment that they both offer 
that guarantee. However, a look at the code (2) shows that the invoke() method 
puts the record into a buffer, and then that buffer is flushed to elasticsearch 
some time later.



Re: Strategies for Complex Event Processing with guaranteed data consistency

2017-01-13 Thread Fabian Hueske
Hi Kat,

I did not understand the difference between a case and a trace.
If I got it right, the goal of your first job is to assemble the individual
events into cases. Is a case here the last event for a case-id or all
events of a case-id?
If a case is the collection of all events (which I assume) what is the
difference to a trace which is also the list of events (if I got it right)?

In any case, I think your first job can also be solved without a session
window (which is quite complex internally).
There are two options:
1) use a global window [1] with a custom trigger that triggers for each
arriving record. A global window does never end, which would be OK since
your cases do not end as well.
2) use a MapFunction with key-partitioned operator state [2]. The map
function would simply update the state for every new event and emit a new
result.

Regarding your concerns of losing data when writing to Kafka. Flink's
KafkaProducer provides at-least-once guarantees, which means that data
might be written more than once in case of a failure but won't be lost. If
the Kafka topic is partitioned by case-id and you only need the last record
per case-id, Kafka's log compaction should give you upsert semantics.

Regarding your question "Is using state in this way a somewhat standard
practice, or is state intended more for recovery?":
Many streaming applications require state for their semantics (just like
yours), i.e., they need to buffer data and wait for more data to arrive. In
order to guarantee consistent result semantics of an application, the state
must not be lost and be recovered in case of a failure. So state is not
intended for recovery, but recovery is needed to guarantee application
semantics.

As I said before, I did not get the difference between cases and trace, so
I cannot really comment on the job to analyze traces.

Hope this helps,
Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#global-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface

2017-01-13 11:04 GMT+01:00 Kathleen Sharp :

> I have been playing around with Flink for a few weeks to try to
> ascertain whether or not it meets our use cases, and also what best
> practices we should be following. I have a few questions I would
> appreciate answers to.
>
>
> Our scenario is that we want to process a lot of event data into
> cases. A case is an inorder sequence of events; this event data could
> be quite old. We never know when a case is complete, so we just want
> to have the most up to date picture of what a case looks like.
>
>
> The inorder sequence of events of a case is called the trace. Many
> cases could have an identical trace. We would like to construct these
> traces, and do some aggregations on those (case count, average/min/max
> life-cycle time).
>
>
> We then have further downstream processing we will do on a case, some
> of which would require additional inputs, either from side-inputs of
> somehow joining data sources.
>
>
> We don’t really care about event time at the moment, because we just
> want to build cases and traces with all the data we have received.
>
>
> The end results should be available for our web front end via rest api.
>
>
> Based on the above I have the following idea for a first implementation:
>
>
> Kafka source -> key by case id -> session window with rocks db state
> backend holding case for that key -> postgres sink
>
>
> The reason for a session window is that, as I mentioned above, we just
> want to build a group with all the data we have received into kafka up
> until that point in time. We would experiment with what this gap time
> should be, and in future it might be specific to the type of group,
> but for the start a naive approach is acceptable. I think this could
> be better than just doing it, say, every 10 minutes because we really
> don’t know yet the frequency of the data received. Also, some inputs
> to kafka come directly from a CSV upload, so we will get “firehose”
> periods, and periods of nothing.
>
> In short: I think what we have closely matches session behaviour.
>
>
> We also have to implement a postgres sink that is capable of doing
> upserts. The reason for postgres is to service the rest front end.
>
>
> We then have to build our traces and can see two options for it:
>
>
> 1) The most obvious solution would be to use a kafka sink for the
> keyed case stream, and to do the trace aggregations in a downstream
> flink job with this kafka topic as a source. However, I have some
> concerns over losing any data (i.e. how do we know whether or not an
> event has been successfully pushed into the kafka stream).
>
>
> 2) Another approach might be to use some other type of sink (perhaps
> postgres), and to use this as a source for the traces job. This would
> help us guarantee data consistency.
>
>
> 3) Or, to somehow re-merge the 

Can serialization be disabled between chains?

2017-01-13 Thread Dmitry Golubets
Hi,

Let's say we have multiple subtask chains and all of them are executing in
the same task manager slot (i.e. in the same JVM).
What's the point in serializing data between them?
Can it be disabled?

The reason I want keep different chains is that some subtasks should be
executed in parallel to each other.
Let's say I have tasks: A -> B
After task A pushed some data to task B I want task A to continue
processing without waiting for task B to finish.
What I'm talking about is a behavior of Akka Streams with disabled fusion.

Best regards,
Dmitry


Re: Objects accessible from all Flink nodes

2017-01-13 Thread Fabian Hueske
Hi Matt,

it is not possible to share an object across different task of the same
operator or even different operators.
This would be globally mutable state which is in general hard to get
efficient in distributed systems.

Something that might work is to use a CoFlatMapOperator with one input
being the training data and the other the actual input.
Then you can train the model and query the model in the same operator. You
would have multiple models, one in each parallel task.
If you can partition the training (and input) data in a meaningful way, you
would have a partition or key specific model. You can also use random
partitioning and have models which are based on random samples of the
training data. Or if you want each model to be based on the same input
data, you can broadcast the training data.

This would look as follows:

val input = ???
val training = ???

val predictions =
input.keyBy(xxx).connect(training.keyBy(xxx)).flatMap(YourFlatMap)) //
partitioned variant
val predictions = input.connect(training.shuffle).flatMap(YourFlatMap)) //
random variant
val predictions = input.connect(training.broadcast).flatMap(YourFlatMap))
// broadcasted variant

An example of a CoFlatMap which trains and queries a prediction model can
be found in the Flink Training [1] (code [2]).

Hope this helps,
Fabian

[1]
http://dataartisans.github.io/flink-training/exercises/timePrediction.html
[2]
https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/state/TravelTimePrediction.java


2017-01-13 15:38 GMT+01:00 Matt :

> Errata: How can an *object (such as the classifier, line 1)* be accessed
> by any Flink node [...]
>
> Just in case, the classifier itself can't be serialized I believe, it's
> part of a framework which I can't modify. In any case, even if it's
> serialized, I guess the cost of moving it to one node and then another
> makes the whole data flow unpractical. It's better to move all created
> instances to one single node where only one instance of the classifier
> is maintained.
>
> I'm not sure if this is possible or how to do this.
>
> On Thu, Jan 12, 2017 at 11:11 PM, Matt  wrote:
>
>> Hello,
>>
>> I have a stream of objects which I use to update the model of a
>> classification algorithm and another stream with the objects I need to
>> classify in real time.
>>
>> The problem is that the instances for training and evaluation are
>> processed on potentially different Flink nodes, but the classifier should
>> be applied to all instances no matter in what node it was generated (ie,
>> the classifier should be accessible from any Flink node).
>>
>> Just to make it clearer, here is what would NOT work since these sink
>> functions are not serializable: https://gist.github.com/b979bf
>> 742b0d2f3da8cc8e5e91207151
>>
>> Two questions here:
>>
>> *1. How can an instance be accessed by any Flink node like this (line 11
>> and 19)? Maybe there's a better approach to this problem.*
>>
>> *2. In the example the second stream (line 15) is started right away but
>> at startup the classifier is not ready to use until it has been trained
>> with enough instances. Is it possible to do this? If I'm not wrong
>> env.execute (line 24) can be used only once.*
>>
>> Regards,
>> Matt
>>
>
>


Re: Terminology: Split, Group and Partition

2017-01-13 Thread Fabian Hueske
I think so far getExecutionPlan() was only used for debugging purpose and
not in programs that would also be executed.
You can open a JIRA issue if you think that this would a valuable feature.

Thanks, Fabian

2017-01-13 16:34 GMT+01:00 Robert Schmidtke :

> Just a side note, I'm guessing there's a bug here: https://github.com/
> apache/flink/blob/master/flink-clients/src/main/java/
> org/apache/flink/client/program/ContextEnvironment.java#L68
>
> It should say createProgramPlan("unnamed job", false);
>
> Otherwise I'm getting an exception complaining that no new sinks have been
> added after the last execution. So currently it is not possible for me to
> first get the execution plan and then run execute the program.
>
> Robert
>
> On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke 
> wrote:
>
>> Hi Fabian,
>>
>> thanks for the quick and comprehensive reply. I'll have a look at the
>> ExecutionPlan using your suggestion to check what actually gets computed,
>> and I'll use the properties as well. If I stumble across something else
>> I'll let you know.
>>
>> Many thanks again!
>> Robert
>>
>> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske  wrote:
>>
>>> Hi Robert,
>>>
>>> let me first describe what splits, groups, and partitions are.
>>>
>>> * Partition: This is basically all data that goes through the same task
>>> instance. If you have an operator with a parallelism of 80, you have 80
>>> partitions. When you call sortPartition() you'll have 80 sorted streams, if
>>> you call mapPartition you iterate over all records in one partition.
>>> * Split: Splits are a concept of InputFormats. An InputFormat can
>>> process several splits. All splits that are processed by the same data
>>> source task make up the partition of that task. So a split is a subset of a
>>> partition. In your case where each task reads exactly one split, the split
>>> is equivalent to the partition.
>>> * Group: A group is based on the groupBy attribute and hence data-driven
>>> and does not depend on the parallelism. A groupReduce requires a
>>> partitioning such that all records with the same grouping attribute are
>>> sent to the same operator, i.e., all are part of the same partition.
>>> Depending on the number of distinct grouping keys (and the hash-function) a
>>> partition can have zero, one, or more groups.
>>>
>>> Now coming to your use case. You have 80 sources running on 5 machines.
>>> All source on the same machine produce records with the same grouping key
>>> (hostname). You can actually give a hint to Flink, that the data returned
>>> by a split is partitioned, grouped, or sorted in a specific way. This works
>>> as follows:
>>>
>>> // String is hostname, Integer is parallel id of the source task
>>> DataSet> = env.createInput(YourFormat);
>>> SplitDataProperties> splitProps =
>>> ((DataSource)text).getSplitDataProperties();
>>> splitProps.splitsGroupedBy(0,1)
>>> splitProps.splitsPartitionedBy(0,1)
>>>
>>> With this info, Flink knows that the data returned by our source is
>>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
>>> run a local groupReduce operation on each of the 80 tasks (hostname and
>>> parallel index result in 80 keys) and locally reduce the data.
>>> Next step would be another .groupBy(0).groupReduce() which gives 16
>>> groups which are distributed across your tasks.
>>>
>>> However, you have to be careful with the SplitDataProperties. If you get
>>> them wrong, the optimizer makes false assumption and the resulting plan
>>> might not compute what you are looking for.
>>> I'd recommend to read the JavaDocs and play a bit with this feature to
>>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help to
>>> figure out what is happening.
>>>
>>> Best,
>>> Fabian
>>>
>>>
>>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke :
>>>
 Hi all,

 I'm having some trouble grasping what the meaning of/difference between
 the following concepts is:

 - Split
 - Group
 - Partition

 Let me elaborate a bit on the problem I'm trying to solve here. In my
 tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
 standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
 JobManager on one node, and a TaskManager on each node. I'm assigning 16
 slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
 Slots).

 The data I want to process resides in a local folder on each worker
 with the same path (say /tmp/input). There can be arbitrarily many input
 files in each worker's folder. I have written a custom input format that
 round-robin assigns the files to each of the 16 local input splits (
 https://github.com/robert-schmidtke/hdfs-statistics-adapter
 

Re: 1.1.4 on YARN - vcores change?

2017-01-13 Thread Shannon Carey
Ufuk & Robert,

There's a good chance you're right! On the EMR master node, where 
yarn-session.sh is run, /etc/hadoop/conf/yarn-site.xml says that 
"yarn.nodemanager.resource.cpu-vcores" is 4.


Meanwhile, on the core nodes, the value in that file is 8.





Shall I submit a JIRA? This might be pretty easy to fix given that 
"yarn-session.sh -q" already knows how to get the vcore count on the nodes. I 
can try to make a PR for it too. I'm still not sure why the containers are 
showing up as only using one vcore though... or if that is expected.

Meanwhile, it seems like overriding yarn.containers.vcores would be a 
successful workaround. Let me know if you disagree.

The other slightly annoying thing that I have to deal with is leaving enough 
memory for the JobManager. Since all task managers are the same size, I either 
need to reduce the size of every task manager (wasting resources), or I have to 
double the task managers (and halve the memory) & subtract one (basically 
doubling the number of separate JVMs & halving the slot density within the 
JVMs) in order to leave room for the JobManager. What do you guys think of the 
following change in approach?

User specifies:
number of taskmanagers
memory per slot (not per taskmanager)
total number of slots (not slots per taskmanager)

Then, Flink would decide how to organize the task managers & slots in order to 
also leave room for the JobManager. This should be straightforward compared to 
bin packing because all slots are the same size. Maybe I'm oversimplifying... 
might be a little tougher if the nodes are different sizes and we don't know on 
what node the ApplicationMaster/JobManager will run.

-Shannon

On 1/13/17, 2:59 AM, "Ufuk Celebi"  wrote:

>On Fri, Jan 13, 2017 at 9:57 AM, Robert Metzger  wrote:
>> Flink is reading the number of available vcores from the local YARN
>> configuration. Is it possible that the YARN / Hadoop config on the machine
>> where you are submitting your job from sets the number of vcores as 4 ?
>
>Shouldn't we retrieve this number from the cluster instead?
>


Re: Queryable State

2017-01-13 Thread Nico Kruber
Hi Dawid,
I'll try to reproduce the error in the next couple of days. Can you also share 
the value deserializer you use? Also, have you tried even smaller examples in 
the meantime? Did they work?

As a side-note in general regarding the queryable state "sink" using ListState 
(".asQueryableState(, ListStateDescriptor)"): everything that enters 
this operator will be stored forever and never cleaned. Eventually, it will 
pile up too much memory and is thus of limited use. Maybe it should even be 
removed from the API.


Nico

On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote:
> Hey Ufuk.
> Did you maybe had a while to have a look at that problem?
> 
> 2017-01-09 10:47 GMT+01:00 Ufuk Celebi :
> > Hey Dawid! Thanks for reporting this. I will try to have a look over
> > the course of the day. From a first impression, this seems like a bug
> > to me.
> > 
> > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz
> > 
> >  wrote:
> > > Hi I was experimenting with the Query State feature and I have some
> > 
> > problems
> > 
> > > querying the state.
> > > 
> > > The code which I use to produce the queryable state is:
> > > env.addSource(kafkaConsumer).map(
> > > 
> > >   e => e match {
> > >   
> > > case LoginClickEvent(_, t) => ("login", 1, t)
> > > case LogoutClickEvent(_, t) => ("logout", 1, t)
> > > case ButtonClickEvent(_, _, t) => ("button", 1, t)
> > >   
> > >   }).keyBy(0).timeWindow(Time.seconds(1))
> > >   .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3,
> > >   e2._3)))
> > >   .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2))
> > >   .keyBy("key")
> > >   .asQueryableState(
> > >   
> > > "type-time-series-count",
> > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]](
> > > 
> > >   "type-time-series-count",
> > >   classOf[KeyedDataPoint[java.lang.Integer]]))
> > > 
> > > As you see it is a rather simple job, in which I try to count events of
> > > different types in windows and then query by event type.
> > > 
> > > In client code I do:
> > > // Query Flink state
> > > val future = client.getKvState(jobId, "type-time-series-count",
> > > 
> > > key.hashCode, seralizedKey)
> > > 
> > > // Await async result
> > > val serializedResult: Array[Byte] = Await.result(
> > > 
> > >   future, new FiniteDuration(
> > >   
> > > 10,
> > > duration.SECONDS))
> > > 
> > > // Deserialize response
> > > val results = deserializeResponse(serializedResult)
> > > 
> > > results
> > >   
> > >   }
> > > 
> > >   private def deserializeResponse(serializedResult: Array[Byte]):
> > > util.List[KeyedDataPoint[lang
> > > 
> > >   .Integer]] = {
> > >   
> > > KvStateRequestSerializer.deserializeList(serializedResult,
> > > 
> > > getValueSerializer())
> > > 
> > >   }
> > > 
> > > As I was trying to debug the issue I see the first element in list gets
> > > deserialized correctly, but it fails on the second one. It seems like
> > > the
> > > serialized result is broken. Do you have any idea if I am doing sth
> > 
> > wrong or
> > 
> > > there is some bug?
> > > 
> > > 
> > > The exception I get is:
> > > java.io.EOFException: null
> > > at
> > > org.apache.flink.runtime.util.DataInputDeserializer.readFully(
> > 
> > DataInputDeserializer.java:157)
> > 
> > > at
> > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF(
> > 
> > DataInputDeserializer.java:240)
> > 
> > > at
> > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(
> > 
> > PojoSerializer.java:386)
> > 
> > > at
> > > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer.
> > 
> > deserializeList(KvStateRequestSerializer.java:487)
> > 
> > > at
> > > com.dataartisans.stateserver.queryclient.QueryClient.
> > 
> > deserializeResponse(QueryClient.scala:44)
> > 
> > > You can browse the exact code at: https://github.com/dawidwys/
> > 
> > flink-intro
> > 
> > > I would be grateful for any advice.
> > > 
> > > Regards
> > > Dawid Wysakowicz


signature.asc
Description: This is a digitally signed message part.


WindowFunction to push data from Kafka to S3

2017-01-13 Thread Samra Kasim
Hi,

I am reading messages off a Kafka Topic and want to process the messages
through Flink and save them into S3. It was pointed out to me that stream
processing of the Kafka data won't be saved to S3 because S3 doesn't allow
data to be appended to a file, so I want to convert the Kafka stream into
batches and save them to S3. Based on other user questions/answers, it
looks like this is possible using windowing by breaking the stream into
batches and creating files. I have written the following code, but it
doesn't work and I am not getting any errors either. I have a sys.out that
shows the tuple is being processed, but it might not be emitted in the
out.collect. Can someone help me figure out what may be the issue? Thanks!

public class S3Sink {

public static void main(String[] args) throws Exception {

Map configs =
ConfigUtils.loadConfigs("/Users/path/to/configs.yaml");



final ParameterTool parameterTool = ParameterTool.fromMap(configs);



StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.getConfig().disableSysoutLogging();

env.getConfig().setGlobalJobParameters(parameterTool);



DataStream messageStream = env

.addSource(new
FlinkKafkaConsumer09(parameterTool.get("kafka.topic"),

new SimpleStringSchema(),

parameterTool.getProperties()));



String uuid = UUID.randomUUID().toString();



DataStreamSink tuple2DataStream = messageStream

.flatMap(new Tupler())

.keyBy(0)

  .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

.apply(new MyWindowFunction())

.writeAsText("s3://flink-test/flink-output-stream/"+ uuid +
"testdoc.txt");


env.execute();

}



private static class Tupler implements FlatMapFunction> {

@Override

public void flatMap(String record, Collector> out) throws Exception {

out.collect(new Tuple2("record",record));

}

}



private static class MyWindowFunction implements
WindowFunction, Tuple2, Tuple,
TimeWindow>{



@Override

public void apply(Tuple key, TimeWindow timeWindow,
Iterable> input,

  Collector> out) throws
Exception {

for (Tuple2 in: input){

System.out.println(in);

out.collect(in);

}

}

}

}

-- 

Thanks,

Sam


Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-01-13 Thread Miguel Coimbra
Hello,

If I missed the answer to this or some essential step of the documentation,
please do tell.
I am having the following problem while trying out the
org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly
API (Java).

Specs: JDK 1.8.0_102 x64
Apache Flink: 1.1.4

Suppose I have a very small (I tried with an example with 38 vertices as
well) dataset stored in a tab-separated file 3-vertex.tsv:

#id1 id2 score
010
020
030

This is just a central vertex with 3 neighbors (disconnected between
themselves).
I am loading the dataset and executing the algorithm with the following
code:


---
// Load the data from the .tsv file.
final DataSet> edgeTuples =
env.readCsvFile(inputPath)
.fieldDelimiter("\t") // node IDs are separated by spaces
.ignoreComments("#")  // comments start with "%"
.types(Long.class, Long.class, Double.class);

// Generate a graph and add reverse edges (undirected).
final Graph graph = Graph.fromTupleDataSet(edgeTuples,
new MapFunction() {
private static final long serialVersionUID =
8713516577419451509L;
public Long map(Long value) {
return value;
}
},
env).getUndirected();

// CommunityDetection parameters.
final double hopAttenuationDelta = 0.5d;
final int iterationCount = 10;

// Prepare and trigger the execution.
DataSet> vs = graph.run(new
org.apache.flink.graph.library.CommunityDetection(iterationCount,
hopAttenuationDelta)).getVertices();

vs.print();
​---​

​Running this code throws the following exception​ (check the bold line):

​org.apache.flink.runtime.client.JobExecutionException: Job execution
failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

*Caused by: java.lang.NullPointerExceptionat
org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)*
at
org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389)
at
org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at
org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at
org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107)
at
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642)
at java.lang.Thread.run(Thread.java:745)​


​After a further look, I set a breakpoint (Eclipse IDE debugging) at the
line in bold:

org.apache.flink.graph.library.CommunityDetection.java (source code
accessed automatically by Maven)
// find the highest score of maxScoreLabel
*double highestScore = labelsWithHighestScore.get(maxScoreLabel);​*

​- maxScoreLabel has the value 3.​

- labelsWithHighestScore was initialized as: Map
labelsWithHighestScore = new TreeMap<>();

- labelsWithHighestScore is a TreeMap and has the values:

{0=0.0}
null
null
[0=0.0]
null
1​

​It seems that the value 3 should have been added to that
​labelsWithHighestScore
some time during execution, but because it wasn't, an exception is thrown.

If anyone is able to shed light on the issue it would be great - what might
be causing it, am I doing something clearly wrong, or has this been fixed
in a another version?

Thank you very much,

Best regards,



Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com 
Skype: miguel.e.coimbra


Re: Terminology: Split, Group and Partition

2017-01-13 Thread Robert Schmidtke
Just a side note, I'm guessing there's a bug here:
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java#L68

It should say createProgramPlan("unnamed job", false);

Otherwise I'm getting an exception complaining that no new sinks have been
added after the last execution. So currently it is not possible for me to
first get the execution plan and then run execute the program.

Robert

On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke 
wrote:

> Hi Fabian,
>
> thanks for the quick and comprehensive reply. I'll have a look at the
> ExecutionPlan using your suggestion to check what actually gets computed,
> and I'll use the properties as well. If I stumble across something else
> I'll let you know.
>
> Many thanks again!
> Robert
>
> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske  wrote:
>
>> Hi Robert,
>>
>> let me first describe what splits, groups, and partitions are.
>>
>> * Partition: This is basically all data that goes through the same task
>> instance. If you have an operator with a parallelism of 80, you have 80
>> partitions. When you call sortPartition() you'll have 80 sorted streams, if
>> you call mapPartition you iterate over all records in one partition.
>> * Split: Splits are a concept of InputFormats. An InputFormat can process
>> several splits. All splits that are processed by the same data source task
>> make up the partition of that task. So a split is a subset of a partition.
>> In your case where each task reads exactly one split, the split is
>> equivalent to the partition.
>> * Group: A group is based on the groupBy attribute and hence data-driven
>> and does not depend on the parallelism. A groupReduce requires a
>> partitioning such that all records with the same grouping attribute are
>> sent to the same operator, i.e., all are part of the same partition.
>> Depending on the number of distinct grouping keys (and the hash-function) a
>> partition can have zero, one, or more groups.
>>
>> Now coming to your use case. You have 80 sources running on 5 machines.
>> All source on the same machine produce records with the same grouping key
>> (hostname). You can actually give a hint to Flink, that the data returned
>> by a split is partitioned, grouped, or sorted in a specific way. This works
>> as follows:
>>
>> // String is hostname, Integer is parallel id of the source task
>> DataSet> = env.createInput(YourFormat);
>> SplitDataProperties> splitProps =
>> ((DataSource)text).getSplitDataProperties();
>> splitProps.splitsGroupedBy(0,1)
>> splitProps.splitsPartitionedBy(0,1)
>>
>> With this info, Flink knows that the data returned by our source is
>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
>> run a local groupReduce operation on each of the 80 tasks (hostname and
>> parallel index result in 80 keys) and locally reduce the data.
>> Next step would be another .groupBy(0).groupReduce() which gives 16
>> groups which are distributed across your tasks.
>>
>> However, you have to be careful with the SplitDataProperties. If you get
>> them wrong, the optimizer makes false assumption and the resulting plan
>> might not compute what you are looking for.
>> I'd recommend to read the JavaDocs and play a bit with this feature to
>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help to
>> figure out what is happening.
>>
>> Best,
>> Fabian
>>
>>
>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke :
>>
>>> Hi all,
>>>
>>> I'm having some trouble grasping what the meaning of/difference between
>>> the following concepts is:
>>>
>>> - Split
>>> - Group
>>> - Partition
>>>
>>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>>> Slots).
>>>
>>> The data I want to process resides in a local folder on each worker with
>>> the same path (say /tmp/input). There can be arbitrarily many input files
>>> in each worker's folder. I have written a custom input format that
>>> round-robin assigns the files to each of the 16 local input splits (
>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that need
>>> processing. Each split reads zero or more files, parsing the contents into
>>> records that are emitted correctly. This works as expected.
>>>
>>> Now we're getting to the questions. How do these 80 input splits relate
>>> to groups and partitions? My understanding of a partition is a subset of my
>>> 

Re: Objects accessible from all Flink nodes

2017-01-13 Thread Matt
Errata: How can an *object (such as the classifier, line 1)* be accessed by
any Flink node [...]

Just in case, the classifier itself can't be serialized I believe, it's
part of a framework which I can't modify. In any case, even if it's
serialized, I guess the cost of moving it to one node and then another
makes the whole data flow unpractical. It's better to move all created
instances to one single node where only one instance of the classifier
is maintained.

I'm not sure if this is possible or how to do this.

On Thu, Jan 12, 2017 at 11:11 PM, Matt  wrote:

> Hello,
>
> I have a stream of objects which I use to update the model of a
> classification algorithm and another stream with the objects I need to
> classify in real time.
>
> The problem is that the instances for training and evaluation are
> processed on potentially different Flink nodes, but the classifier should
> be applied to all instances no matter in what node it was generated (ie,
> the classifier should be accessible from any Flink node).
>
> Just to make it clearer, here is what would NOT work since these sink
> functions are not serializable: https://gist.github.com/
> b979bf742b0d2f3da8cc8e5e91207151
>
> Two questions here:
>
> *1. How can an instance be accessed by any Flink node like this (line 11
> and 19)? Maybe there's a better approach to this problem.*
>
> *2. In the example the second stream (line 15) is started right away but
> at startup the classifier is not ready to use until it has been trained
> with enough instances. Is it possible to do this? If I'm not wrong
> env.execute (line 24) can be used only once.*
>
> Regards,
> Matt
>


Re: Terminology: Split, Group and Partition

2017-01-13 Thread Robert Schmidtke
Hi Fabian,

thanks for the quick and comprehensive reply. I'll have a look at the
ExecutionPlan using your suggestion to check what actually gets computed,
and I'll use the properties as well. If I stumble across something else
I'll let you know.

Many thanks again!
Robert

On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske  wrote:

> Hi Robert,
>
> let me first describe what splits, groups, and partitions are.
>
> * Partition: This is basically all data that goes through the same task
> instance. If you have an operator with a parallelism of 80, you have 80
> partitions. When you call sortPartition() you'll have 80 sorted streams, if
> you call mapPartition you iterate over all records in one partition.
> * Split: Splits are a concept of InputFormats. An InputFormat can process
> several splits. All splits that are processed by the same data source task
> make up the partition of that task. So a split is a subset of a partition.
> In your case where each task reads exactly one split, the split is
> equivalent to the partition.
> * Group: A group is based on the groupBy attribute and hence data-driven
> and does not depend on the parallelism. A groupReduce requires a
> partitioning such that all records with the same grouping attribute are
> sent to the same operator, i.e., all are part of the same partition.
> Depending on the number of distinct grouping keys (and the hash-function) a
> partition can have zero, one, or more groups.
>
> Now coming to your use case. You have 80 sources running on 5 machines.
> All source on the same machine produce records with the same grouping key
> (hostname). You can actually give a hint to Flink, that the data returned
> by a split is partitioned, grouped, or sorted in a specific way. This works
> as follows:
>
> // String is hostname, Integer is parallel id of the source task
> DataSet> = env.createInput(YourFormat);
> SplitDataProperties> splitProps =
> ((DataSource)text).getSplitDataProperties();
> splitProps.splitsGroupedBy(0,1)
> splitProps.splitsPartitionedBy(0,1)
>
> With this info, Flink knows that the data returned by our source is
> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
> run a local groupReduce operation on each of the 80 tasks (hostname and
> parallel index result in 80 keys) and locally reduce the data.
> Next step would be another .groupBy(0).groupReduce() which gives 16 groups
> which are distributed across your tasks.
>
> However, you have to be careful with the SplitDataProperties. If you get
> them wrong, the optimizer makes false assumption and the resulting plan
> might not compute what you are looking for.
> I'd recommend to read the JavaDocs and play a bit with this feature to see
> how it behaves. ExecutionEnvironment.getExecutionPlan() can help to
> figure out what is happening.
>
> Best,
> Fabian
>
>
> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke :
>
>> Hi all,
>>
>> I'm having some trouble grasping what the meaning of/difference between
>> the following concepts is:
>>
>> - Split
>> - Group
>> - Partition
>>
>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>> Slots).
>>
>> The data I want to process resides in a local folder on each worker with
>> the same path (say /tmp/input). There can be arbitrarily many input files
>> in each worker's folder. I have written a custom input format that
>> round-robin assigns the files to each of the 16 local input splits (
>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>> io/SfsInputFormat.java) to obtain a total of 80 input splits that need
>> processing. Each split reads zero or more files, parsing the contents into
>> records that are emitted correctly. This works as expected.
>>
>> Now we're getting to the questions. How do these 80 input splits relate
>> to groups and partitions? My understanding of a partition is a subset of my
>> DataSet that is local to each node. I.e. if I were to repartition the
>> data according to some scheme, a shuffling over workers would occur. After
>> reading all the data, I have 80 partitions, correct?
>>
>> What is less clear to me is the concept of a group, i.e. the result of a
>> groupBy operation. The input files I have are produced on each worker by
>> some other process. I first want to do pre-aggregation (I hope that's the
>> term) on each node before sending data over the network. The records I'm
>> processing contain a 'hostname' attribute, which is set to the worker's
>> hostname that processes the data, because the DataSources 

Re: Kafka topic partition skewness causes watermark not being emitted

2017-01-13 Thread Tzu-Li (Gordon) Tai
Hi,

This is expected behaviour due to how the per-partition watermarks are designed 
in the Kafka consumer, but I think it’s probably a good idea to handle idle 
partitions also when the Kafka consumer itself emits watermarks. I’ve filed a 
JIRA issue for this: https://issues.apache.org/jira/browse/FLINK-5479.

For the time being, I don’t think there will be an easy way to avoid this with 
the existing APIs, unfortunately. Is the skewed partition data intentional, or 
only for experimental purposes?

Best,
Gordon

On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao...@gmail.com) wrote:

Hi team,

I have a topic with 2 partitions in Kafka. I produced all data to partition 0 
and no data to partition 1. I created a Flink job with parallelism to 1 that 
consumes that topic and count the events with session event window (5 seconds 
gap). It turned out that the session event window was never closed even I sent 
a message with 10 minutes gap. After digging into the source code, 
AbstractFetcher[1] that is responsible for sending watermark to downstream 
calculates the min watermark of all partitions. Due to the fact that we don't 
have data in partition 1, the watermark returned from partition 1is always 
Long.MIN_VALUE therefore AbstractFetcher never fires the watermark to 
downstream. 

I want to know if this is expected behavior or a bug. If this is expected 
behavior how do I avoid the delay of watermark firing when data is not evenly 
distributed to all partitions?

This is the timestamp extractor I used

public class ExactTimestampExtractor implements 
AssignerWithPeriodicWatermarks {

private long currentMaxTimestamp = Long.MIN_VALUE;

@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : 
currentMaxTimestamp - 1);
}

@Override
public long extractTimestamp(SessionEvent element, long 
previousElementTimestamp) {
long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT);
if (eventStartTime > currentMaxTimestamp) {
currentMaxTimestamp = eventStartTime;
}

return eventStartTime;
}
}

and this is the Flink topo

// get input data
FlinkKafkaConsumer010 consumer = new 
FlinkKafkaConsumer010<>("topic4",
new MyOwnSchema()
consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor());
DataStream input = env.addSource(consumer);

input.
keyBy("id").
window(EventTimeSessionWindows.withGap(Time.seconds(5))).
reduce(new Reducer(), new WindowFunction()).
print();

//        // execute program
env.execute("a job");

I used the latest code in github

[1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539


Re: Terminology: Split, Group and Partition

2017-01-13 Thread Fabian Hueske
Hi Robert,

let me first describe what splits, groups, and partitions are.

* Partition: This is basically all data that goes through the same task
instance. If you have an operator with a parallelism of 80, you have 80
partitions. When you call sortPartition() you'll have 80 sorted streams, if
you call mapPartition you iterate over all records in one partition.
* Split: Splits are a concept of InputFormats. An InputFormat can process
several splits. All splits that are processed by the same data source task
make up the partition of that task. So a split is a subset of a partition.
In your case where each task reads exactly one split, the split is
equivalent to the partition.
* Group: A group is based on the groupBy attribute and hence data-driven
and does not depend on the parallelism. A groupReduce requires a
partitioning such that all records with the same grouping attribute are
sent to the same operator, i.e., all are part of the same partition.
Depending on the number of distinct grouping keys (and the hash-function) a
partition can have zero, one, or more groups.

Now coming to your use case. You have 80 sources running on 5 machines. All
source on the same machine produce records with the same grouping key
(hostname). You can actually give a hint to Flink, that the data returned
by a split is partitioned, grouped, or sorted in a specific way. This works
as follows:

// String is hostname, Integer is parallel id of the source task
DataSet> = env.createInput(YourFormat);
SplitDataProperties> splitProps =
((DataSource)text).getSplitDataProperties();
splitProps.splitsGroupedBy(0,1)
splitProps.splitsPartitionedBy(0,1)

With this info, Flink knows that the data returned by our source is
partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
run a local groupReduce operation on each of the 80 tasks (hostname and
parallel index result in 80 keys) and locally reduce the data.
Next step would be another .groupBy(0).groupReduce() which gives 16 groups
which are distributed across your tasks.

However, you have to be careful with the SplitDataProperties. If you get
them wrong, the optimizer makes false assumption and the resulting plan
might not compute what you are looking for.
I'd recommend to read the JavaDocs and play a bit with this feature to see
how it behaves. ExecutionEnvironment.getExecutionPlan() can help to figure
out what is happening.

Best,
Fabian


2017-01-13 12:14 GMT+01:00 Robert Schmidtke :

> Hi all,
>
> I'm having some trouble grasping what the meaning of/difference between
> the following concepts is:
>
> - Split
> - Group
> - Partition
>
> Let me elaborate a bit on the problem I'm trying to solve here. In my
> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
> JobManager on one node, and a TaskManager on each node. I'm assigning 16
> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
> Slots).
>
> The data I want to process resides in a local folder on each worker with
> the same path (say /tmp/input). There can be arbitrarily many input files
> in each worker's folder. I have written a custom input format that
> round-robin assigns the files to each of the 16 local input splits (
> https://github.com/robert-schmidtke/hdfs-statistics-
> adapter/blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/io/
> SfsInputFormat.java) to obtain a total of 80 input splits that need
> processing. Each split reads zero or more files, parsing the contents into
> records that are emitted correctly. This works as expected.
>
> Now we're getting to the questions. How do these 80 input splits relate to
> groups and partitions? My understanding of a partition is a subset of my
> DataSet that is local to each node. I.e. if I were to repartition the
> data according to some scheme, a shuffling over workers would occur. After
> reading all the data, I have 80 partitions, correct?
>
> What is less clear to me is the concept of a group, i.e. the result of a
> groupBy operation. The input files I have are produced on each worker by
> some other process. I first want to do pre-aggregation (I hope that's the
> term) on each node before sending data over the network. The records I'm
> processing contain a 'hostname' attribute, which is set to the worker's
> hostname that processes the data, because the DataSources are local. That
> means the records produced by the worker on host1 always contain the
> attribute hostname=host1. Similar for the other 4 workers.
>
> Now what happens if I do a groupBy("hostname")? How do the workers realize
> that no network transfer is necessary? Is a group a logical abstraction, or
> a physical one (in my understanding a partition is physical because it's
> local to exactly one worker).
>
> What I'd like to do next is a reduceGroup to merge multiple records 

Terminology: Split, Group and Partition

2017-01-13 Thread Robert Schmidtke
Hi all,

I'm having some trouble grasping what the meaning of/difference between the
following concepts is:

- Split
- Group
- Partition

Let me elaborate a bit on the problem I'm trying to solve here. In my tests
I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in standalone
mode. Each node has 64G of memory and 32 cores. I'm starting the JobManager
on one node, and a TaskManager on each node. I'm assigning 16 slots to each
TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 Slots).

The data I want to process resides in a local folder on each worker with
the same path (say /tmp/input). There can be arbitrarily many input files
in each worker's folder. I have written a custom input format that
round-robin assigns the files to each of the 16 local input splits (
https://github.com/robert-schmidtke/hdfs-statistics-adapter/blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/io/SfsInputFormat.java)
to obtain a total of 80 input splits that need processing. Each split reads
zero or more files, parsing the contents into records that are emitted
correctly. This works as expected.

Now we're getting to the questions. How do these 80 input splits relate to
groups and partitions? My understanding of a partition is a subset of my
DataSet that is local to each node. I.e. if I were to repartition the
data according to some scheme, a shuffling over workers would occur. After
reading all the data, I have 80 partitions, correct?

What is less clear to me is the concept of a group, i.e. the result of a
groupBy operation. The input files I have are produced on each worker by
some other process. I first want to do pre-aggregation (I hope that's the
term) on each node before sending data over the network. The records I'm
processing contain a 'hostname' attribute, which is set to the worker's
hostname that processes the data, because the DataSources are local. That
means the records produced by the worker on host1 always contain the
attribute hostname=host1. Similar for the other 4 workers.

Now what happens if I do a groupBy("hostname")? How do the workers realize
that no network transfer is necessary? Is a group a logical abstraction, or
a physical one (in my understanding a partition is physical because it's
local to exactly one worker).

What I'd like to do next is a reduceGroup to merge multiple records into
one (some custom, yet straightforward, aggregation) and emit another record
for every couple of input records. Am I correct in assuming that the
Iterable values passed to the reduce function all have the same hostname
value? That is, will the operation have a parallelism of 80, where 5x16
operations will have the same hostname value? Because I have 16 splits per
host, the 16 reduces on host1 should all receive values with
hostname=host1, correct? And after the operation has finished, will the
reduced groups (now actual DataSets again) still be local to the workers?

This is quite a lot to work on I have to admit. I'm happy for any hints,
advice and feedback on this. If there's need for clarification I'd be happy
to provide more information.

Thanks a lot in advance!

Robert

-- 
My GPG Key ID: 336E2680


Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-13 Thread Fabian Hueske
I tested the Table API / SQL a bit.

I implemented a windowed aggregation with the streaming Table API and it
produced the same results as a DataStream API implementation.
Joining a stream with a TableFunction also seemed to work well.
Moreover, I checked the results of a bunch of TPC-H queries (batch SQL) and
all produced correct results.



2017-01-12 17:45 GMT+01:00 Till Rohrmann :

> I'm wondering whether we should not depend the webserver encryption on the
> global encryption activation and activating it instead per default.
>
> On Thu, Jan 12, 2017 at 4:54 PM, Chesnay Schepler 
> wrote:
>
> > FLINK-5470 is a duplicate of FLINK-5298 for which there is also an open
> PR.
> >
> > FLINK-5472 is imo invalid since the webserver does support https, you
> just
> > have to enable it as per the security documentation.
> >
> >
> > On 12.01.2017 16:20, Till Rohrmann wrote:
> >
> > I also found an issue:
> >
> > https://issues.apache.org/jira/browse/FLINK-5470
> >
> > I also noticed that Flink's webserver does not support https requests. It
> > might be worthwhile to add it, though.
> >
> > https://issues.apache.org/jira/browse/FLINK-5472
> >
> > On Thu, Jan 12, 2017 at 11:24 AM, Robert Metzger 
> > wrote:
> >
> >> I also found a bunch of issues
> >>
> >> https://issues.apache.org/jira/browse/FLINK-5465
> >> https://issues.apache.org/jira/browse/FLINK-5462
> >> https://issues.apache.org/jira/browse/FLINK-5464
> >> https://issues.apache.org/jira/browse/FLINK-5463
> >>
> >>
> >> On Thu, Jan 12, 2017 at 9:56 AM, Fabian Hueske < 
> >> fhue...@gmail.com> wrote:
> >>
> >> > I have another bugfix for 1.2.:
> >> >
> >> > https://issues.apache.org/jira/browse/FLINK-2662 (pending PR)
> >> >
> >> > 2017-01-10 15:16 GMT+01:00 Robert Metzger < 
> >> rmetz...@apache.org>:
> >> >
> >> > > Hi,
> >> > >
> >> > > this depends a lot on the number of issues we find during the
> testing.
> >> > >
> >> > >
> >> > > These are the issues I found so far:
> >> > >
> >> > > https://issues.apache.org/jira/browse/FLINK-5379 (unresolved)
> >> > > https://issues.apache.org/jira/browse/FLINK-5383 (resolved)
> >> > > https://issues.apache.org/jira/browse/FLINK-5382 (resolved)
> >> > > https://issues.apache.org/jira/browse/FLINK-5381 (resolved)
> >> > > https://issues.apache.org/jira/browse/FLINK-5380 (pending PR)
> >> > >
> >> > >
> >> > >
> >> > >
> >> > > On Tue, Jan 10, 2017 at 11:58 AM, shijinkui 
> >> > wrote:
> >> > >
> >> > > > Do we have a probable time of 1.2 release? This month or Next
> month?
> >> > > >
> >> > > > -邮件原件-
> >> > > > 发件人: Robert Metzger [mailto: 
> >> rmetz...@apache.org]
> >> > > > 发送时间: 2017年1月3日 20:44
> >> > > > 收件人: d...@flink.apache.org
> >> > > > 抄送: user@flink.apache.org
> >> > > > 主题: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release
> >> > > candidate)
> >> > > >
> >> > > > Hi,
> >> > > >
> >> > > > First of all, I wish everybody a happy new year 2017.
> >> > > >
> >> > > > I've set user@flink in CC so that users who are interested in
> >> helping
> >> > > > with the testing get notified. Please respond only to the dev@
> >> list to
> >> > > > keep the discussion there!
> >> > > >
> >> > > > According to the 1.2 release discussion thread, I've created a
> first
> >> > > > release candidate for Flink 1.2.
> >> > > > The release candidate will not be the final release, because I'm
> >> > certain
> >> > > > that we'll find at least one blocking issue in the candidate :)
> >> > > >
> >> > > > Therefore, the RC is meant as a testing only release candidate.
> >> > > > Please report every issue we need to fix before the next RC in
> this
> >> > > thread
> >> > > > so that we have a good overview.
> >> > > >
> >> > > > The release artifacts are located here:
> >> > > > http://people.apache.org/~rmetzger/flink-1.2.0-rc0/
> >> > > >
> >> > > > The maven staging repository is located here:
> >> > > > https://repository.apache.org/content/repositories/orgapache
> >> flink-
> >> > > >
> >> > > > The release commit (in branch "release-1.2.0-rc0"):
> >> > > > http://git-wip-us.apache.org/repos/asf/flink/commit/f3c59ced
> >> > > >
> >> > > >
> >> > > > Happy testing!
> >> > > >
> >> > >
> >> >
> >>
> >
> >
> >
>


Re: the attribute order in sql 'select * from...'

2017-01-13 Thread Fabian Hueske
I think the sorting is done for consistency reasons, i.e., that all
PojoTypeInfos for the same class behave the same.
Since this code is used in many parts of Flink and many jobs (DataSet,
DataStream, etc.) I would be very careful to change the default behavior
here.

Maybe we can add a constructor that does not sort the fields.

2017-01-13 10:46 GMT+01:00 Hongyuhong :

> Hi Fabian,
>
>
>
> Yes, OrderA is a table of POJO.
>
> But what I consume is that in construct func PojoTypeInfo
>
> The input param (fields)’s order is right, it ‘s change after the sort
> operation, and I’m wonder if the sort operation can be removed?
>
>
>
> public PojoTypeInfo(Class typeClass, List fields) {
>
> super(typeClass);
>
>  checkArgument(Modifier.isPublic(typeClass.getModifiers()),"POJO %s is
> not public", typeClass);
>
>  this.fields = fields.toArray(new PojoField[fields.size()]);
>
>  *Arrays.sort(this.fields, new Comparator() {*
>
> * @Override*
>
> * public int compare(PojoField o1, PojoField o2) {*
>
> *  return o1.getField().getName().compareTo(o2.getField().getName());*
>
> * }*
>
> });
>
>
>
>
>
> *发件人:* Fabian Hueske [mailto:fhue...@gmail.com]
> *发送时间:* 2017年1月13日 17:23
> *收件人:* user@flink.apache.org
> *主题:* Re: the attribute order in sql 'select * from...'
>
>
>
> Hi Yuhong,
>
> I assume that OrderA is a table of POJO objects and you are expecting the
> order of the attribute to be as the order in which the fields of the POJO
> are defined in the source code.
>
> Flink accepts fields which are either public members or accessible via a
> getter and setter.
>
> This makes it difficult to automatically define an order, esp. if some
> fields use getter and setter or public fields. Would the order depend on
> the field (which might not exist in case of getter/setter) or setter or
> getter methods (which might also not exist).
>
> I'm also not sure if it is possible to extract the line number of a method
> or field via reflection.
>
> Best, Fabian
>
>
>
>
>
> 2017-01-13 9:54 GMT+01:00 Hongyuhong :
>
> Hi,
>
> I’m now using streaming sql, And I have the sql like
>
> select *  FROM OrderA where user > 2
>
> the OrderA has 3 attr (user, product, amount)
>
> and I expect the result is as the order like input, but it has been sorted
> by attr name,
>
> and I found the order has already been sorted when call addSource,
>
> What is the purpose of doing so?cause it’s a little not meet our
> requirements.
>
>
>
> Thanks very much.
>
>
>
>
>
> public PojoTypeInfo(Class typeClass, List fields) {
>
>super(typeClass);
>
>
>
>checkArgument(Modifier.isPublic(typeClass.
> getModifiers()),
>
>  "POJO %s is not public", typeClass);
>
>
>
>this.fields = fields.toArray(new
> PojoField[fields.size()]);
>
>
>
>Arrays.sort(this.fields, new Comparator() {
>
> @Override
>
> public int compare(PojoField o1, PojoField o2)
> {
>
>  return o1.getField().getName().
> compareTo(o2.getField().getName());
>
> }
>
>});
>
>
>
>
>
> Best,
>
> Yuhong
>
>
>


Re: some questions about submit flink job on flink-yarn

2017-01-13 Thread Till Rohrmann
Hi Huang,

this seems to be very strange, because the JobManager’s actor system has
bound to the address 9-96-101-177 instead of 9.96.101.177. It seems a if
the . have been replaced by -.

Could you maybe tell me which version of Flink you’re running and also
share the complete JobManager log with us?

I tested it with the latest 1.2 SNAPSHOT version and there it seemed to
work.

Cheers,
Till
​

On Fri, Jan 13, 2017 at 9:02 AM, huangwei (G) 
wrote:

> Dear All,
>
> I get an error in jobmanage.log following when I submit a flink job
> (batch/WordCount.jar) by using command : "./bin/flink run -m
> 9.96.101.177:39180 ./examples/batch/WordCount.jar".
>
> And the flink is on yarn cluster.
>
> Error in jobmanage.log :
> 2017-01-13 15:28:27,402 ERROR akka.remote.EndpointWriter
>   - dropping message [class akka.actor.ActorSelectionMessage]
> for non-local recipient [Actor[akka.tcp://flink@9.96.101.177:39180/]]
> arriving at [akka.tcp://flink@9.96.101.177:39180] inbound addresses are
> [akka.tcp://flink@9-96-101-177:39180]
>
> However, It is success when I use flink web-ui to submit the job.
>
> How to solve this problem?
>
> And otherwise, when I started the flink on yarn, the jobmanage.rpc.port
> and the web port both were changed to 39180 and 57724.
> The configuration following in flink-conf.yaml is just as default :
>
> jobmanager.rpc.port: 6123
>
> and
>
> jobmanager.web.port: 8081
>
> I started the flink on yarn using command : "./bin/yarn-session.sh -n 4".
>
> Why were the ports changed to 39180 and 57724?
>
> Many thanks if there is any help!
>
> HuangWHWHW
> 2017.1.13
>


Strategies for Complex Event Processing with guaranteed data consistency

2017-01-13 Thread Kathleen Sharp
I have been playing around with Flink for a few weeks to try to
ascertain whether or not it meets our use cases, and also what best
practices we should be following. I have a few questions I would
appreciate answers to.


Our scenario is that we want to process a lot of event data into
cases. A case is an inorder sequence of events; this event data could
be quite old. We never know when a case is complete, so we just want
to have the most up to date picture of what a case looks like.


The inorder sequence of events of a case is called the trace. Many
cases could have an identical trace. We would like to construct these
traces, and do some aggregations on those (case count, average/min/max
life-cycle time).


We then have further downstream processing we will do on a case, some
of which would require additional inputs, either from side-inputs of
somehow joining data sources.


We don’t really care about event time at the moment, because we just
want to build cases and traces with all the data we have received.


The end results should be available for our web front end via rest api.


Based on the above I have the following idea for a first implementation:


Kafka source -> key by case id -> session window with rocks db state
backend holding case for that key -> postgres sink


The reason for a session window is that, as I mentioned above, we just
want to build a group with all the data we have received into kafka up
until that point in time. We would experiment with what this gap time
should be, and in future it might be specific to the type of group,
but for the start a naive approach is acceptable. I think this could
be better than just doing it, say, every 10 minutes because we really
don’t know yet the frequency of the data received. Also, some inputs
to kafka come directly from a CSV upload, so we will get “firehose”
periods, and periods of nothing.

In short: I think what we have closely matches session behaviour.


We also have to implement a postgres sink that is capable of doing
upserts. The reason for postgres is to service the rest front end.


We then have to build our traces and can see two options for it:


1) The most obvious solution would be to use a kafka sink for the
keyed case stream, and to do the trace aggregations in a downstream
flink job with this kafka topic as a source. However, I have some
concerns over losing any data (i.e. how do we know whether or not an
event has been successfully pushed into the kafka stream).


2) Another approach might be to use some other type of sink (perhaps
postgres), and to use this as a source for the traces job. This would
help us guarantee data consistency.


3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?), so:

Keyed cases stream -> broadcast -> key by tracehash with rocks db
state backend holding trace for that tracehash -> perform
aggregrations -> postgres sink

Is broadcast an option here? How costly is it?


Which of these approaches (or any other), would you recommend?


-

Another question regarding the state:

As we never know when a case is complete this means that the rocksdb
backend could grow infinitely (!). Obviously we would need to get a
bit smarter here.


Is using state in this way a somewhat standard practice, or is state
intended more for recovery?

Managing growing state: I found some discussion regarding how to clear
state here 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#a10402
which references https://issues.apache.org/jira/browse/FLINK-3946

Thanks,

Kat


Re: the attribute order in sql 'select * from...'

2017-01-13 Thread Timo Walther

Hi Yuhong,

as a solution you can specify the order of your Pojo fields when 
converting from DataStream to Table.


Table  table = tableEnv
   .fromDataSet(env.fromCollection(data), "department AS a, " +
  "age AS b, " +
  "salary AS c, " +
  "name AS d")
   .select("a, b, c, d");


Timo


Am 13/01/17 um 10:22 schrieb Fabian Hueske:

Hi Yuhong,

I assume that OrderA is a table of POJO objects and you are expecting 
the order of the attribute to be as the order in which the fields of 
the POJO are defined in the source code.


Flink accepts fields which are either public members or accessible via 
a getter and setter.
This makes it difficult to automatically define an order, esp. if some 
fields use getter and setter or public fields. Would the order depend 
on the field (which might not exist in case of getter/setter) or 
setter or getter methods (which might also not exist).
I'm also not sure if it is possible to extract the line number of a 
method or field via reflection.


Best, Fabian



2017-01-13 9:54 GMT+01:00 Hongyuhong >:


Hi,

I’m now using streaming sql, And I have the sql like

select *  FROM OrderA where user > 2

the OrderA has 3 attr (user, product, amount)

and I expect the result is as the order like input, but it has
been sorted by attr name,

and I found the order has already been sorted when call addSource,

What is the purpose of doing so?cause it’s a little not meet our
requirements.

Thanks very much.

public PojoTypeInfo(Class typeClass, List fields) {

super(typeClass);

checkArgument(Modifier.isPublic(typeClass.getModifiers()),

"POJO %s is not public", typeClass);

this.fields = fields.toArray(new PojoField[fields.size()]);

Arrays.sort(this.fields, new Comparator() {

@Override

public int compare(PojoField o1, PojoField o2) {

return o1.getField().getName().compareTo(o2.getField().getName());

}

});

Best,

Yuhong






Re: the attribute order in sql 'select * from...'

2017-01-13 Thread Hongyuhong
Hi Fabian,

Yes, OrderA is a table of POJO.
But what I consume is that in construct func PojoTypeInfo
The input param (fields)’s order is right, it ‘s change after the sort 
operation, and I’m wonder if the sort operation can be removed?

public PojoTypeInfo(Class typeClass, List fields) {
super(typeClass);
 checkArgument(Modifier.isPublic(typeClass.getModifiers()),"POJO %s is not 
public", typeClass);
 this.fields = fields.toArray(new PojoField[fields.size()]);
 Arrays.sort(this.fields, new Comparator() {
 @Override
 public int compare(PojoField o1, PojoField o2) {
  return o1.getField().getName().compareTo(o2.getField().getName());
 }
});


发件人: Fabian Hueske [mailto:fhue...@gmail.com]
发送时间: 2017年1月13日 17:23
收件人: user@flink.apache.org
主题: Re: the attribute order in sql 'select * from...'

Hi Yuhong,
I assume that OrderA is a table of POJO objects and you are expecting the order 
of the attribute to be as the order in which the fields of the POJO are defined 
in the source code.

Flink accepts fields which are either public members or accessible via a getter 
and setter.
This makes it difficult to automatically define an order, esp. if some fields 
use getter and setter or public fields. Would the order depend on the field 
(which might not exist in case of getter/setter) or setter or getter methods 
(which might also not exist).
I'm also not sure if it is possible to extract the line number of a method or 
field via reflection.
Best, Fabian


2017-01-13 9:54 GMT+01:00 Hongyuhong 
>:
Hi,
I’m now using streaming sql, And I have the sql like
select *  FROM OrderA where user > 2
the OrderA has 3 attr (user, product, amount)
and I expect the result is as the order like input, but it has been sorted by 
attr name,
and I found the order has already been sorted when call addSource,
What is the purpose of doing so?cause it’s a little not meet our requirements.

Thanks very much.


public PojoTypeInfo(Class typeClass, List fields) {
   super(typeClass);

   checkArgument(Modifier.isPublic(typeClass.getModifiers()),
 "POJO %s is not public", typeClass);

   this.fields = fields.toArray(new PojoField[fields.size()]);

   Arrays.sort(this.fields, new Comparator() {
@Override
public int compare(PojoField o1, PojoField o2) {
 return 
o1.getField().getName().compareTo(o2.getField().getName());
}
   });


Best,
Yuhong



the attribute order in sql 'select * from...'

2017-01-13 Thread Hongyuhong
Hi,
I'm now using streaming sql, And I have the sql like
select *  FROM OrderA where user > 2
the OrderA has 3 attr (user, product, amount)
and I expect the result is as the order like input, but it has been sorted by 
attr name,
and I found the order has already been sorted when call addSource,
What is the purpose of doing so?cause it's a little not meet our requirements.

Thanks very much.


public PojoTypeInfo(Class typeClass, List fields) {
   super(typeClass);

   checkArgument(Modifier.isPublic(typeClass.getModifiers()),
 "POJO %s is not public", typeClass);

   this.fields = fields.toArray(new PojoField[fields.size()]);

   Arrays.sort(this.fields, new Comparator() {
@Override
public int compare(PojoField o1, PojoField o2) {
 return 
o1.getField().getName().compareTo(o2.getField().getName());
}
   });


Best,
Yuhong


Re: 1.1.4 on YARN - vcores change?

2017-01-13 Thread Robert Metzger
Hi Shannon,

Flink is reading the number of available vcores from the local YARN
configuration. Is it possible that the YARN / Hadoop config on the machine
where you are submitting your job from sets the number of vcores as 4 ?


On Fri, Jan 13, 2017 at 12:51 AM, Shannon Carey  wrote:

> Did anything change in 1.1.4 with regard to YARN & vcores?
>
> I'm getting this error when deploying 1.1.4 to my test cluster. Only the
> Flink version changed.
>
>  [0mjava.lang.RuntimeException: Couldn't deploy Yarn cluster
>  [0m  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:384)
>  [0m  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:591)
>  [0m  at 
> org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:465)
>  [0mCaused by: org.apache.flink.configuration.IllegalConfigurationException: 
> The number of virtual cores per node were configured with 8 but Yarn only has 
> 4 virtual cores available. Please note that the number of virtual cores is 
> set to the number of task slots by default unless configured in the Flink 
> config with 'yarn.containers.vcores.'
>  [0m  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:273)
>  [0m  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:393)
>  [0m  at 
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:381)
>  [0m  ... 2 more
>
>
> When I run: ./bin/yarn-session.sh –q
> It shows 8 vCores on each machine:
>
> NodeManagers in the ClusterClient 3|Property |Value
>
> +---+
>
> |NodeID   |ip-10-2-…:8041
>
> |Memory   |12288 MB
>
> |vCores   |8
>
> |HealthReport |
>
> |Containers   |0
>
> +---+
>
> |NodeID   |ip-10-2-…:8041
>
> |Memory   |12288 MB
>
> |vCores   |8
>
> |HealthReport |
>
> |Containers   |0
>
> +---+
>
> |NodeID   |ip-10-2-…:8041
>
> |Memory   |12288 MB
>
> |vCores   |8
>
> |HealthReport |
>
> |Containers   |0
>
> +---+
>
> Summary: totalMemory 36864 totalCores 24
>
> Queue: default, Current Capacity: 0.0 Max Capacity: 1.0 Applications: 0
>
> I'm running:
> ./bin/yarn-session.sh –n 3 --jobManagerMemory 1504 --taskManagerMemory
> 10764 --slots 8 —detached
>
> I have not specified any value for "yarn.containers.vcores" in my config.
>
> I switched to –n 5 and —slots 4, and halved the taskManagerMemory, which
> allowed the cluster to start.
>
> However, in the YARN "Nodes" UI I see "VCores Used: 2" and "VCores Avail:
> 6" on all three nodes. And if I look at one of the Containers, it says,
> "Resource: 5408 Memory, 1 VCores". I don't understand what's happening here.
>
> Thanks…
>