Re: Regarding caching the evicted elements and re-emitting them to the next window

2017-01-13 Thread Aljoscha Krettek
Hi, I'm afraid there is no functionality for this in Flink. What you can do, however, is to not evict these elements from the window buffer but instead ignore them when processing your elements in the WindowFunction. This way they will be preserved for the next firing. You have to make sure to even

Re: Strategies for Complex Event Processing with guaranteed data consistency

2017-01-13 Thread Fabian Hueske
On thing to add: the Flink KafkaProducer provides only at-least-once if flush-on-checkpoint is enabled [1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean- 2017-01-13 22:

Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?

2017-01-13 Thread Tzu-Li (Gordon) Tai
Hi Andrew, Your observations are correct. Like you mentioned, the current problem circles around how we deal with the pending buffered requests with accordance to Flink’s checkpointing. I’ve filed a JIRA for this, as well as some thoughts for the solution in the description: https://issues.apac

Re: Strategies for Complex Event Processing with guaranteed data consistency

2017-01-13 Thread Fabian Hueske
Hi Kat, I did not understand the difference between a case and a trace. If I got it right, the goal of your first job is to assemble the individual events into cases. Is a case here the last event for a case-id or all events of a case-id? If a case is the collection of all events (which I assume)

Can serialization be disabled between chains?

2017-01-13 Thread Dmitry Golubets
Hi, Let's say we have multiple subtask chains and all of them are executing in the same task manager slot (i.e. in the same JVM). What's the point in serializing data between them? Can it be disabled? The reason I want keep different chains is that some subtasks should be executed in parallel to

Re: Objects accessible from all Flink nodes

2017-01-13 Thread Fabian Hueske
Hi Matt, it is not possible to share an object across different task of the same operator or even different operators. This would be globally mutable state which is in general hard to get efficient in distributed systems. Something that might work is to use a CoFlatMapOperator with one input bein

Re: Terminology: Split, Group and Partition

2017-01-13 Thread Fabian Hueske
I think so far getExecutionPlan() was only used for debugging purpose and not in programs that would also be executed. You can open a JIRA issue if you think that this would a valuable feature. Thanks, Fabian 2017-01-13 16:34 GMT+01:00 Robert Schmidtke : > Just a side note, I'm guessing there's

Re: 1.1.4 on YARN - vcores change?

2017-01-13 Thread Shannon Carey
Ufuk & Robert, There's a good chance you're right! On the EMR master node, where yarn-session.sh is run, /etc/hadoop/conf/yarn-site.xml says that "yarn.nodemanager.resource.cpu-vcores" is 4. Meanwhile, on the core nodes, the value in that file is 8. Shall I submit a JIRA? This might be pr

Re: Queryable State

2017-01-13 Thread Nico Kruber
Hi Dawid, I'll try to reproduce the error in the next couple of days. Can you also share the value deserializer you use? Also, have you tried even smaller examples in the meantime? Did they work? As a side-note in general regarding the queryable state "sink" using ListState (".asQueryableState(

WindowFunction to push data from Kafka to S3

2017-01-13 Thread Samra Kasim
Hi, I am reading messages off a Kafka Topic and want to process the messages through Flink and save them into S3. It was pointed out to me that stream processing of the Kafka data won't be saved to S3 because S3 doesn't allow data to be appended to a file, so I want to convert the Kafka stream int

Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException

2017-01-13 Thread Miguel Coimbra
Hello, If I missed the answer to this or some essential step of the documentation, please do tell. I am having the following problem while trying out the org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API (Java). Specs: JDK 1.8.0_102 x64 Apache Flink: 1.1.4 Suppose I ha

Re: Terminology: Split, Group and Partition

2017-01-13 Thread Robert Schmidtke
Just a side note, I'm guessing there's a bug here: https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java#L68 It should say createProgramPlan("unnamed job", false); Otherwise I'm getting an exception complaining that no new

Re: Objects accessible from all Flink nodes

2017-01-13 Thread Matt
Errata: How can an *object (such as the classifier, line 1)* be accessed by any Flink node [...] Just in case, the classifier itself can't be serialized I believe, it's part of a framework which I can't modify. In any case, even if it's serialized, I guess the cost of moving it to one node and the

Re: Terminology: Split, Group and Partition

2017-01-13 Thread Robert Schmidtke
Hi Fabian, thanks for the quick and comprehensive reply. I'll have a look at the ExecutionPlan using your suggestion to check what actually gets computed, and I'll use the properties as well. If I stumble across something else I'll let you know. Many thanks again! Robert On Fri, Jan 13, 2017 at

Re: Kafka topic partition skewness causes watermark not being emitted

2017-01-13 Thread Tzu-Li (Gordon) Tai
Hi, This is expected behaviour due to how the per-partition watermarks are designed in the Kafka consumer, but I think it’s probably a good idea to handle idle partitions also when the Kafka consumer itself emits watermarks. I’ve filed a JIRA issue for this: https://issues.apache.org/jira/brows

Re: Terminology: Split, Group and Partition

2017-01-13 Thread Fabian Hueske
Hi Robert, let me first describe what splits, groups, and partitions are. * Partition: This is basically all data that goes through the same task instance. If you have an operator with a parallelism of 80, you have 80 partitions. When you call sortPartition() you'll have 80 sorted streams, if you

Terminology: Split, Group and Partition

2017-01-13 Thread Robert Schmidtke
Hi all, I'm having some trouble grasping what the meaning of/difference between the following concepts is: - Split - Group - Partition Let me elaborate a bit on the problem I'm trying to solve here. In my tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in standalone mode. Each

Re: the attribute order in sql 'select * from...'

2017-01-13 Thread Timo Walther
Especially, as it might also change the serialized binary format. Am 13/01/17 um 11:24 schrieb Fabian Hueske: I think the sorting is done for consistency reasons, i.e., that all PojoTypeInfos for the same class behave the same. Since this code is used in many parts of Flink and many jobs (DataS

Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)

2017-01-13 Thread Fabian Hueske
I tested the Table API / SQL a bit. I implemented a windowed aggregation with the streaming Table API and it produced the same results as a DataStream API implementation. Joining a stream with a TableFunction also seemed to work well. Moreover, I checked the results of a bunch of TPC-H queries (ba

Re: the attribute order in sql 'select * from...'

2017-01-13 Thread Fabian Hueske
I think the sorting is done for consistency reasons, i.e., that all PojoTypeInfos for the same class behave the same. Since this code is used in many parts of Flink and many jobs (DataSet, DataStream, etc.) I would be very careful to change the default behavior here. Maybe we can add a constructor

Re: some questions about submit flink job on flink-yarn

2017-01-13 Thread Till Rohrmann
Hi Huang, this seems to be very strange, because the JobManager’s actor system has bound to the address 9-96-101-177 instead of 9.96.101.177. It seems a if the . have been replaced by -. Could you maybe tell me which version of Flink you’re running and also share the complete JobManager log with

Strategies for Complex Event Processing with guaranteed data consistency

2017-01-13 Thread Kathleen Sharp
I have been playing around with Flink for a few weeks to try to ascertain whether or not it meets our use cases, and also what best practices we should be following. I have a few questions I would appreciate answers to. Our scenario is that we want to process a lot of event data into cases. A cas

Re: the attribute order in sql 'select * from...'

2017-01-13 Thread Timo Walther
Hi Yuhong, as a solution you can specify the order of your Pojo fields when converting from DataStream to Table. Table table = tableEnv .fromDataSet(env.fromCollection(data), "department AS a, " + "age AS b, " + "salary AS c, " + "name AS d") .select("a, b, c, d"); T

Re: the attribute order in sql 'select * from...'

2017-01-13 Thread Hongyuhong
Hi Fabian, Yes, OrderA is a table of POJO. But what I consume is that in construct func PojoTypeInfo The input param (fields)’s order is right, it ‘s change after the sort operation, and I’m wonder if the sort operation can be removed? public PojoTypeInfo(Class typeClass, List fields) { super(ty

Re: the attribute order in sql 'select * from...'

2017-01-13 Thread Fabian Hueske
Hi Yuhong, I assume that OrderA is a table of POJO objects and you are expecting the order of the attribute to be as the order in which the fields of the POJO are defined in the source code. Flink accepts fields which are either public members or accessible via a getter and setter. This makes it

the attribute order in sql 'select * from...'

2017-01-13 Thread Hongyuhong
Hi, I'm now using streaming sql, And I have the sql like select * FROM OrderA where user > 2 the OrderA has 3 attr (user, product, amount) and I expect the result is as the order like input, but it has been sorted by attr name, and I found the order has already been sorted when call addSource, Wh

Re: 1.1.4 on YARN - vcores change?

2017-01-13 Thread Ufuk Celebi
On Fri, Jan 13, 2017 at 9:57 AM, Robert Metzger wrote: > Flink is reading the number of available vcores from the local YARN > configuration. Is it possible that the YARN / Hadoop config on the machine > where you are submitting your job from sets the number of vcores as 4 ? Shouldn't we retrieve

Re: 1.1.4 on YARN - vcores change?

2017-01-13 Thread Robert Metzger
Hi Shannon, Flink is reading the number of available vcores from the local YARN configuration. Is it possible that the YARN / Hadoop config on the machine where you are submitting your job from sets the number of vcores as 4 ? On Fri, Jan 13, 2017 at 12:51 AM, Shannon Carey wrote: > Did anythi