Re: Regarding caching the evicted elements and re-emitting them to the next window
Hi, I'm afraid there is no functionality for this in Flink. What you can do, however, is to not evict these elements from the window buffer but instead ignore them when processing your elements in the WindowFunction. This way they will be preserved for the next firing. You have to make sure to eventually evict some elements, however. Otherwise you would have a memory leak. Aljoscha On Sun, Jan 8, 2017, 23:47 Abdul Salam Shaikhwrote: > Hi, > > I am using 1.2-Snapshot version of Apache Flink which provides the new > enhanced Evictor functionality and using customized triggers for Global > Window. I have a use case where I am evicting the unwanted event(element) > for the current window before it is evaluated. However, I am looking for > options to cache this evicted element and re-use it in the next window. Is > there a possibility which can help me achieve this in the context of Flink > or in a more generic programming approach. > > Thanks in anticipation! >
Re: Strategies for Complex Event Processing with guaranteed data consistency
On thing to add: the Flink KafkaProducer provides only at-least-once if flush-on-checkpoint is enabled [1]. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/api/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBase.html#setFlushOnCheckpoint-boolean- 2017-01-13 22:02 GMT+01:00 Fabian Hueske: > Hi Kat, > > I did not understand the difference between a case and a trace. > If I got it right, the goal of your first job is to assemble the > individual events into cases. Is a case here the last event for a case-id > or all events of a case-id? > If a case is the collection of all events (which I assume) what is the > difference to a trace which is also the list of events (if I got it right)? > > In any case, I think your first job can also be solved without a session > window (which is quite complex internally). > There are two options: > 1) use a global window [1] with a custom trigger that triggers for each > arriving record. A global window does never end, which would be OK since > your cases do not end as well. > 2) use a MapFunction with key-partitioned operator state [2]. The map > function would simply update the state for every new event and emit a new > result. > > Regarding your concerns of losing data when writing to Kafka. Flink's > KafkaProducer provides at-least-once guarantees, which means that data > might be written more than once in case of a failure but won't be lost. If > the Kafka topic is partitioned by case-id and you only need the last record > per case-id, Kafka's log compaction should give you upsert semantics. > > Regarding your question "Is using state in this way a somewhat standard > practice, or is state intended more for recovery?": > Many streaming applications require state for their semantics (just like > yours), i.e., they need to buffer data and wait for more data to arrive. In > order to guarantee consistent result semantics of an application, the state > must not be lost and be recovered in case of a failure. So state is not > intended for recovery, but recovery is needed to guarantee application > semantics. > > As I said before, I did not get the difference between cases and trace, so > I cannot really comment on the job to analyze traces. > > Hope this helps, > Fabian > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.1/apis/streaming/windows.html#global-windows > [2] https://ci.apache.org/projects/flink/flink-docs- > release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface > > 2017-01-13 11:04 GMT+01:00 Kathleen Sharp : > >> I have been playing around with Flink for a few weeks to try to >> ascertain whether or not it meets our use cases, and also what best >> practices we should be following. I have a few questions I would >> appreciate answers to. >> >> >> Our scenario is that we want to process a lot of event data into >> cases. A case is an inorder sequence of events; this event data could >> be quite old. We never know when a case is complete, so we just want >> to have the most up to date picture of what a case looks like. >> >> >> The inorder sequence of events of a case is called the trace. Many >> cases could have an identical trace. We would like to construct these >> traces, and do some aggregations on those (case count, average/min/max >> life-cycle time). >> >> >> We then have further downstream processing we will do on a case, some >> of which would require additional inputs, either from side-inputs of >> somehow joining data sources. >> >> >> We don’t really care about event time at the moment, because we just >> want to build cases and traces with all the data we have received. >> >> >> The end results should be available for our web front end via rest api. >> >> >> Based on the above I have the following idea for a first implementation: >> >> >> Kafka source -> key by case id -> session window with rocks db state >> backend holding case for that key -> postgres sink >> >> >> The reason for a session window is that, as I mentioned above, we just >> want to build a group with all the data we have received into kafka up >> until that point in time. We would experiment with what this gap time >> should be, and in future it might be specific to the type of group, >> but for the start a naive approach is acceptable. I think this could >> be better than just doing it, say, every 10 minutes because we really >> don’t know yet the frequency of the data received. Also, some inputs >> to kafka come directly from a CSV upload, so we will get “firehose” >> periods, and periods of nothing. >> >> In short: I think what we have closely matches session behaviour. >> >> >> We also have to implement a postgres sink that is capable of doing >> upserts. The reason for postgres is to service the rest front end. >> >> >> We then have to build our traces and can see two options for it: >> >> >> 1) The most obvious solution would be to use a kafka sink for the
Re: Fault tolerance guarantees of Elasticsearch sink in flink-elasticsearch2?
Hi Andrew, Your observations are correct. Like you mentioned, the current problem circles around how we deal with the pending buffered requests with accordance to Flink’s checkpointing. I’ve filed a JIRA for this, as well as some thoughts for the solution in the description: https://issues.apache.org/jira/browse/FLINK-5487. What do you think? Thank you for bringing this up! We should probably fix this soon. There’s already some on-going effort in fixing some other aspects of proper at-least-once support in the Elasticsearch sinks, so I believe this will be brought to attention very soon too. Cheers, Gordon On January 11, 2017 at 3:49:06 PM, Andrew Roberts (arobe...@fuze.com) wrote: I’m trying to understand the guarantees made by Flink’s Elasticsearch sink in terms of message delivery. according to (1), the ES sink offers at-least-once guarantees. This page doesn’t differentiate between flink-elasticsearch and flink-elasticsearch2, so I have to assume for the moment that they both offer that guarantee. However, a look at the code (2) shows that the invoke() method puts the record into a buffer, and then that buffer is flushed to elasticsearch some time later.
Re: Strategies for Complex Event Processing with guaranteed data consistency
Hi Kat, I did not understand the difference between a case and a trace. If I got it right, the goal of your first job is to assemble the individual events into cases. Is a case here the last event for a case-id or all events of a case-id? If a case is the collection of all events (which I assume) what is the difference to a trace which is also the list of events (if I got it right)? In any case, I think your first job can also be solved without a session window (which is quite complex internally). There are two options: 1) use a global window [1] with a custom trigger that triggers for each arriving record. A global window does never end, which would be OK since your cases do not end as well. 2) use a MapFunction with key-partitioned operator state [2]. The map function would simply update the state for every new event and emit a new result. Regarding your concerns of losing data when writing to Kafka. Flink's KafkaProducer provides at-least-once guarantees, which means that data might be written more than once in case of a failure but won't be lost. If the Kafka topic is partitioned by case-id and you only need the last record per case-id, Kafka's log compaction should give you upsert semantics. Regarding your question "Is using state in this way a somewhat standard practice, or is state intended more for recovery?": Many streaming applications require state for their semantics (just like yours), i.e., they need to buffer data and wait for more data to arrive. In order to guarantee consistent result semantics of an application, the state must not be lost and be recovered in case of a failure. So state is not intended for recovery, but recovery is needed to guarantee application semantics. As I said before, I did not get the difference between cases and trace, so I cannot really comment on the job to analyze traces. Hope this helps, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html#global-windows [2] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/state.html#using-the-keyvalue-state-interface 2017-01-13 11:04 GMT+01:00 Kathleen Sharp: > I have been playing around with Flink for a few weeks to try to > ascertain whether or not it meets our use cases, and also what best > practices we should be following. I have a few questions I would > appreciate answers to. > > > Our scenario is that we want to process a lot of event data into > cases. A case is an inorder sequence of events; this event data could > be quite old. We never know when a case is complete, so we just want > to have the most up to date picture of what a case looks like. > > > The inorder sequence of events of a case is called the trace. Many > cases could have an identical trace. We would like to construct these > traces, and do some aggregations on those (case count, average/min/max > life-cycle time). > > > We then have further downstream processing we will do on a case, some > of which would require additional inputs, either from side-inputs of > somehow joining data sources. > > > We don’t really care about event time at the moment, because we just > want to build cases and traces with all the data we have received. > > > The end results should be available for our web front end via rest api. > > > Based on the above I have the following idea for a first implementation: > > > Kafka source -> key by case id -> session window with rocks db state > backend holding case for that key -> postgres sink > > > The reason for a session window is that, as I mentioned above, we just > want to build a group with all the data we have received into kafka up > until that point in time. We would experiment with what this gap time > should be, and in future it might be specific to the type of group, > but for the start a naive approach is acceptable. I think this could > be better than just doing it, say, every 10 minutes because we really > don’t know yet the frequency of the data received. Also, some inputs > to kafka come directly from a CSV upload, so we will get “firehose” > periods, and periods of nothing. > > In short: I think what we have closely matches session behaviour. > > > We also have to implement a postgres sink that is capable of doing > upserts. The reason for postgres is to service the rest front end. > > > We then have to build our traces and can see two options for it: > > > 1) The most obvious solution would be to use a kafka sink for the > keyed case stream, and to do the trace aggregations in a downstream > flink job with this kafka topic as a source. However, I have some > concerns over losing any data (i.e. how do we know whether or not an > event has been successfully pushed into the kafka stream). > > > 2) Another approach might be to use some other type of sink (perhaps > postgres), and to use this as a source for the traces job. This would > help us guarantee data consistency. > > > 3) Or, to somehow re-merge the
Can serialization be disabled between chains?
Hi, Let's say we have multiple subtask chains and all of them are executing in the same task manager slot (i.e. in the same JVM). What's the point in serializing data between them? Can it be disabled? The reason I want keep different chains is that some subtasks should be executed in parallel to each other. Let's say I have tasks: A -> B After task A pushed some data to task B I want task A to continue processing without waiting for task B to finish. What I'm talking about is a behavior of Akka Streams with disabled fusion. Best regards, Dmitry
Re: Objects accessible from all Flink nodes
Hi Matt, it is not possible to share an object across different task of the same operator or even different operators. This would be globally mutable state which is in general hard to get efficient in distributed systems. Something that might work is to use a CoFlatMapOperator with one input being the training data and the other the actual input. Then you can train the model and query the model in the same operator. You would have multiple models, one in each parallel task. If you can partition the training (and input) data in a meaningful way, you would have a partition or key specific model. You can also use random partitioning and have models which are based on random samples of the training data. Or if you want each model to be based on the same input data, you can broadcast the training data. This would look as follows: val input = ??? val training = ??? val predictions = input.keyBy(xxx).connect(training.keyBy(xxx)).flatMap(YourFlatMap)) // partitioned variant val predictions = input.connect(training.shuffle).flatMap(YourFlatMap)) // random variant val predictions = input.connect(training.broadcast).flatMap(YourFlatMap)) // broadcasted variant An example of a CoFlatMap which trains and queries a prediction model can be found in the Flink Training [1] (code [2]). Hope this helps, Fabian [1] http://dataartisans.github.io/flink-training/exercises/timePrediction.html [2] https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/state/TravelTimePrediction.java 2017-01-13 15:38 GMT+01:00 Matt: > Errata: How can an *object (such as the classifier, line 1)* be accessed > by any Flink node [...] > > Just in case, the classifier itself can't be serialized I believe, it's > part of a framework which I can't modify. In any case, even if it's > serialized, I guess the cost of moving it to one node and then another > makes the whole data flow unpractical. It's better to move all created > instances to one single node where only one instance of the classifier > is maintained. > > I'm not sure if this is possible or how to do this. > > On Thu, Jan 12, 2017 at 11:11 PM, Matt wrote: > >> Hello, >> >> I have a stream of objects which I use to update the model of a >> classification algorithm and another stream with the objects I need to >> classify in real time. >> >> The problem is that the instances for training and evaluation are >> processed on potentially different Flink nodes, but the classifier should >> be applied to all instances no matter in what node it was generated (ie, >> the classifier should be accessible from any Flink node). >> >> Just to make it clearer, here is what would NOT work since these sink >> functions are not serializable: https://gist.github.com/b979bf >> 742b0d2f3da8cc8e5e91207151 >> >> Two questions here: >> >> *1. How can an instance be accessed by any Flink node like this (line 11 >> and 19)? Maybe there's a better approach to this problem.* >> >> *2. In the example the second stream (line 15) is started right away but >> at startup the classifier is not ready to use until it has been trained >> with enough instances. Is it possible to do this? If I'm not wrong >> env.execute (line 24) can be used only once.* >> >> Regards, >> Matt >> > >
Re: Terminology: Split, Group and Partition
I think so far getExecutionPlan() was only used for debugging purpose and not in programs that would also be executed. You can open a JIRA issue if you think that this would a valuable feature. Thanks, Fabian 2017-01-13 16:34 GMT+01:00 Robert Schmidtke: > Just a side note, I'm guessing there's a bug here: https://github.com/ > apache/flink/blob/master/flink-clients/src/main/java/ > org/apache/flink/client/program/ContextEnvironment.java#L68 > > It should say createProgramPlan("unnamed job", false); > > Otherwise I'm getting an exception complaining that no new sinks have been > added after the last execution. So currently it is not possible for me to > first get the execution plan and then run execute the program. > > Robert > > On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke > wrote: > >> Hi Fabian, >> >> thanks for the quick and comprehensive reply. I'll have a look at the >> ExecutionPlan using your suggestion to check what actually gets computed, >> and I'll use the properties as well. If I stumble across something else >> I'll let you know. >> >> Many thanks again! >> Robert >> >> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske wrote: >> >>> Hi Robert, >>> >>> let me first describe what splits, groups, and partitions are. >>> >>> * Partition: This is basically all data that goes through the same task >>> instance. If you have an operator with a parallelism of 80, you have 80 >>> partitions. When you call sortPartition() you'll have 80 sorted streams, if >>> you call mapPartition you iterate over all records in one partition. >>> * Split: Splits are a concept of InputFormats. An InputFormat can >>> process several splits. All splits that are processed by the same data >>> source task make up the partition of that task. So a split is a subset of a >>> partition. In your case where each task reads exactly one split, the split >>> is equivalent to the partition. >>> * Group: A group is based on the groupBy attribute and hence data-driven >>> and does not depend on the parallelism. A groupReduce requires a >>> partitioning such that all records with the same grouping attribute are >>> sent to the same operator, i.e., all are part of the same partition. >>> Depending on the number of distinct grouping keys (and the hash-function) a >>> partition can have zero, one, or more groups. >>> >>> Now coming to your use case. You have 80 sources running on 5 machines. >>> All source on the same machine produce records with the same grouping key >>> (hostname). You can actually give a hint to Flink, that the data returned >>> by a split is partitioned, grouped, or sorted in a specific way. This works >>> as follows: >>> >>> // String is hostname, Integer is parallel id of the source task >>> DataSet > = env.createInput(YourFormat); >>> SplitDataProperties > splitProps = >>> ((DataSource)text).getSplitDataProperties(); >>> splitProps.splitsGroupedBy(0,1) >>> splitProps.splitsPartitionedBy(0,1) >>> >>> With this info, Flink knows that the data returned by our source is >>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to >>> run a local groupReduce operation on each of the 80 tasks (hostname and >>> parallel index result in 80 keys) and locally reduce the data. >>> Next step would be another .groupBy(0).groupReduce() which gives 16 >>> groups which are distributed across your tasks. >>> >>> However, you have to be careful with the SplitDataProperties. If you get >>> them wrong, the optimizer makes false assumption and the resulting plan >>> might not compute what you are looking for. >>> I'd recommend to read the JavaDocs and play a bit with this feature to >>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help to >>> figure out what is happening. >>> >>> Best, >>> Fabian >>> >>> >>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke : >>> Hi all, I'm having some trouble grasping what the meaning of/difference between the following concepts is: - Split - Group - Partition Let me elaborate a bit on the problem I'm trying to solve here. In my tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in standalone mode. Each node has 64G of memory and 32 cores. I'm starting the JobManager on one node, and a TaskManager on each node. I'm assigning 16 slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 Slots). The data I want to process resides in a local folder on each worker with the same path (say /tmp/input). There can be arbitrarily many input files in each worker's folder. I have written a custom input format that round-robin assigns the files to each of the 16 local input splits ( https://github.com/robert-schmidtke/hdfs-statistics-adapter
Re: 1.1.4 on YARN - vcores change?
Ufuk & Robert, There's a good chance you're right! On the EMR master node, where yarn-session.sh is run, /etc/hadoop/conf/yarn-site.xml says that "yarn.nodemanager.resource.cpu-vcores" is 4. Meanwhile, on the core nodes, the value in that file is 8. Shall I submit a JIRA? This might be pretty easy to fix given that "yarn-session.sh -q" already knows how to get the vcore count on the nodes. I can try to make a PR for it too. I'm still not sure why the containers are showing up as only using one vcore though... or if that is expected. Meanwhile, it seems like overriding yarn.containers.vcores would be a successful workaround. Let me know if you disagree. The other slightly annoying thing that I have to deal with is leaving enough memory for the JobManager. Since all task managers are the same size, I either need to reduce the size of every task manager (wasting resources), or I have to double the task managers (and halve the memory) & subtract one (basically doubling the number of separate JVMs & halving the slot density within the JVMs) in order to leave room for the JobManager. What do you guys think of the following change in approach? User specifies: number of taskmanagers memory per slot (not per taskmanager) total number of slots (not slots per taskmanager) Then, Flink would decide how to organize the task managers & slots in order to also leave room for the JobManager. This should be straightforward compared to bin packing because all slots are the same size. Maybe I'm oversimplifying... might be a little tougher if the nodes are different sizes and we don't know on what node the ApplicationMaster/JobManager will run. -Shannon On 1/13/17, 2:59 AM, "Ufuk Celebi"wrote: >On Fri, Jan 13, 2017 at 9:57 AM, Robert Metzger wrote: >> Flink is reading the number of available vcores from the local YARN >> configuration. Is it possible that the YARN / Hadoop config on the machine >> where you are submitting your job from sets the number of vcores as 4 ? > >Shouldn't we retrieve this number from the cluster instead? >
Re: Queryable State
Hi Dawid, I'll try to reproduce the error in the next couple of days. Can you also share the value deserializer you use? Also, have you tried even smaller examples in the meantime? Did they work? As a side-note in general regarding the queryable state "sink" using ListState (".asQueryableState(, ListStateDescriptor)"): everything that enters this operator will be stored forever and never cleaned. Eventually, it will pile up too much memory and is thus of limited use. Maybe it should even be removed from the API. Nico On Tuesday, 10 January 2017 19:43:40 CET Dawid Wysakowicz wrote: > Hey Ufuk. > Did you maybe had a while to have a look at that problem? > > 2017-01-09 10:47 GMT+01:00 Ufuk Celebi: > > Hey Dawid! Thanks for reporting this. I will try to have a look over > > the course of the day. From a first impression, this seems like a bug > > to me. > > > > On Sun, Jan 8, 2017 at 4:43 PM, Dawid Wysakowicz > > > > wrote: > > > Hi I was experimenting with the Query State feature and I have some > > > > problems > > > > > querying the state. > > > > > > The code which I use to produce the queryable state is: > > > env.addSource(kafkaConsumer).map( > > > > > > e => e match { > > > > > > case LoginClickEvent(_, t) => ("login", 1, t) > > > case LogoutClickEvent(_, t) => ("logout", 1, t) > > > case ButtonClickEvent(_, _, t) => ("button", 1, t) > > > > > > }).keyBy(0).timeWindow(Time.seconds(1)) > > > .reduce((e1, e2) => (e1._1, e1._2 + e2._2, Math.max(e1._3, > > > e2._3))) > > > .map(e => new KeyedDataPoint[java.lang.Integer](e._1, e._3, e._2)) > > > .keyBy("key") > > > .asQueryableState( > > > > > > "type-time-series-count", > > > new ListStateDescriptor[KeyedDataPoint[java.lang.Integer]]( > > > > > > "type-time-series-count", > > > classOf[KeyedDataPoint[java.lang.Integer]])) > > > > > > As you see it is a rather simple job, in which I try to count events of > > > different types in windows and then query by event type. > > > > > > In client code I do: > > > // Query Flink state > > > val future = client.getKvState(jobId, "type-time-series-count", > > > > > > key.hashCode, seralizedKey) > > > > > > // Await async result > > > val serializedResult: Array[Byte] = Await.result( > > > > > > future, new FiniteDuration( > > > > > > 10, > > > duration.SECONDS)) > > > > > > // Deserialize response > > > val results = deserializeResponse(serializedResult) > > > > > > results > > > > > > } > > > > > > private def deserializeResponse(serializedResult: Array[Byte]): > > > util.List[KeyedDataPoint[lang > > > > > > .Integer]] = { > > > > > > KvStateRequestSerializer.deserializeList(serializedResult, > > > > > > getValueSerializer()) > > > > > > } > > > > > > As I was trying to debug the issue I see the first element in list gets > > > deserialized correctly, but it fails on the second one. It seems like > > > the > > > serialized result is broken. Do you have any idea if I am doing sth > > > > wrong or > > > > > there is some bug? > > > > > > > > > The exception I get is: > > > java.io.EOFException: null > > > at > > > org.apache.flink.runtime.util.DataInputDeserializer.readFully( > > > > DataInputDeserializer.java:157) > > > > > at > > > org.apache.flink.runtime.util.DataInputDeserializer.readUTF( > > > > DataInputDeserializer.java:240) > > > > > at > > > org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize( > > > > PojoSerializer.java:386) > > > > > at > > > org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer. > > > > deserializeList(KvStateRequestSerializer.java:487) > > > > > at > > > com.dataartisans.stateserver.queryclient.QueryClient. > > > > deserializeResponse(QueryClient.scala:44) > > > > > You can browse the exact code at: https://github.com/dawidwys/ > > > > flink-intro > > > > > I would be grateful for any advice. > > > > > > Regards > > > Dawid Wysakowicz signature.asc Description: This is a digitally signed message part.
WindowFunction to push data from Kafka to S3
Hi, I am reading messages off a Kafka Topic and want to process the messages through Flink and save them into S3. It was pointed out to me that stream processing of the Kafka data won't be saved to S3 because S3 doesn't allow data to be appended to a file, so I want to convert the Kafka stream into batches and save them to S3. Based on other user questions/answers, it looks like this is possible using windowing by breaking the stream into batches and creating files. I have written the following code, but it doesn't work and I am not getting any errors either. I have a sys.out that shows the tuple is being processed, but it might not be emitted in the out.collect. Can someone help me figure out what may be the issue? Thanks! public class S3Sink { public static void main(String[] args) throws Exception { Mapconfigs = ConfigUtils.loadConfigs("/Users/path/to/configs.yaml"); final ParameterTool parameterTool = ParameterTool.fromMap(configs); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().disableSysoutLogging(); env.getConfig().setGlobalJobParameters(parameterTool); DataStream messageStream = env .addSource(new FlinkKafkaConsumer09(parameterTool.get("kafka.topic"), new SimpleStringSchema(), parameterTool.getProperties())); String uuid = UUID.randomUUID().toString(); DataStreamSink tuple2DataStream = messageStream .flatMap(new Tupler()) .keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .apply(new MyWindowFunction()) .writeAsText("s3://flink-test/flink-output-stream/"+ uuid + "testdoc.txt"); env.execute(); } private static class Tupler implements FlatMapFunction > { @Override public void flatMap(String record, Collector > out) throws Exception { out.collect(new Tuple2 ("record",record)); } } private static class MyWindowFunction implements WindowFunction , Tuple2 , Tuple, TimeWindow>{ @Override public void apply(Tuple key, TimeWindow timeWindow, Iterable > input, Collector > out) throws Exception { for (Tuple2 in: input){ System.out.println(in); out.collect(in); } } } } -- Thanks, Sam
Apache Flink 1.1.4 - Java 8 - CommunityDetection.java:158 - java.lang.NullPointerException
Hello, If I missed the answer to this or some essential step of the documentation, please do tell. I am having the following problem while trying out the org.apache.flink.graph.library.CommunityDetection algorithm of the Gelly API (Java). Specs: JDK 1.8.0_102 x64 Apache Flink: 1.1.4 Suppose I have a very small (I tried with an example with 38 vertices as well) dataset stored in a tab-separated file 3-vertex.tsv: #id1 id2 score 010 020 030 This is just a central vertex with 3 neighbors (disconnected between themselves). I am loading the dataset and executing the algorithm with the following code: --- // Load the data from the .tsv file. final DataSet> edgeTuples = env.readCsvFile(inputPath) .fieldDelimiter("\t") // node IDs are separated by spaces .ignoreComments("#") // comments start with "%" .types(Long.class, Long.class, Double.class); // Generate a graph and add reverse edges (undirected). final Graph graph = Graph.fromTupleDataSet(edgeTuples, new MapFunction () { private static final long serialVersionUID = 8713516577419451509L; public Long map(Long value) { return value; } }, env).getUndirected(); // CommunityDetection parameters. final double hopAttenuationDelta = 0.5d; final int iterationCount = 10; // Prepare and trigger the execution. DataSet > vs = graph.run(new org.apache.flink.graph.library.CommunityDetection(iterationCount, hopAttenuationDelta)).getVertices(); vs.print(); --- Running this code throws the following exception (check the bold line): org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply$mcV$sp(JobManager.scala:805) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$8.apply(JobManager.scala:751) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) *Caused by: java.lang.NullPointerExceptionat org.apache.flink.graph.library.CommunityDetection$VertexLabelUpdater.updateVertex(CommunityDetection.java:158)* at org.apache.flink.graph.spargel.ScatterGatherIteration$GatherUdfSimpleVV.coGroup(ScatterGatherIteration.java:389) at org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:218) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486) at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146) at org.apache.flink.runtime.iterative.task.IterationTailTask.run(IterationTailTask.java:107) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:642) at java.lang.Thread.run(Thread.java:745) After a further look, I set a breakpoint (Eclipse IDE debugging) at the line in bold: org.apache.flink.graph.library.CommunityDetection.java (source code accessed automatically by Maven) // find the highest score of maxScoreLabel *double highestScore = labelsWithHighestScore.get(maxScoreLabel);* - maxScoreLabel has the value 3. - labelsWithHighestScore was initialized as: Map labelsWithHighestScore = new TreeMap<>(); - labelsWithHighestScore is a TreeMap and has the values: {0=0.0} null null [0=0.0] null 1 It seems that the value 3 should have been added to that labelsWithHighestScore some time during execution, but because it wasn't, an exception is thrown. If anyone is able to shed light on the issue it would be great - what might be causing it, am I doing something clearly wrong, or has this been fixed in a another version? Thank you very much, Best regards, Miguel E. Coimbra Email: miguel.e.coim...@gmail.com Skype: miguel.e.coimbra
Re: Terminology: Split, Group and Partition
Just a side note, I'm guessing there's a bug here: https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java#L68 It should say createProgramPlan("unnamed job", false); Otherwise I'm getting an exception complaining that no new sinks have been added after the last execution. So currently it is not possible for me to first get the execution plan and then run execute the program. Robert On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtkewrote: > Hi Fabian, > > thanks for the quick and comprehensive reply. I'll have a look at the > ExecutionPlan using your suggestion to check what actually gets computed, > and I'll use the properties as well. If I stumble across something else > I'll let you know. > > Many thanks again! > Robert > > On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske wrote: > >> Hi Robert, >> >> let me first describe what splits, groups, and partitions are. >> >> * Partition: This is basically all data that goes through the same task >> instance. If you have an operator with a parallelism of 80, you have 80 >> partitions. When you call sortPartition() you'll have 80 sorted streams, if >> you call mapPartition you iterate over all records in one partition. >> * Split: Splits are a concept of InputFormats. An InputFormat can process >> several splits. All splits that are processed by the same data source task >> make up the partition of that task. So a split is a subset of a partition. >> In your case where each task reads exactly one split, the split is >> equivalent to the partition. >> * Group: A group is based on the groupBy attribute and hence data-driven >> and does not depend on the parallelism. A groupReduce requires a >> partitioning such that all records with the same grouping attribute are >> sent to the same operator, i.e., all are part of the same partition. >> Depending on the number of distinct grouping keys (and the hash-function) a >> partition can have zero, one, or more groups. >> >> Now coming to your use case. You have 80 sources running on 5 machines. >> All source on the same machine produce records with the same grouping key >> (hostname). You can actually give a hint to Flink, that the data returned >> by a split is partitioned, grouped, or sorted in a specific way. This works >> as follows: >> >> // String is hostname, Integer is parallel id of the source task >> DataSet > = env.createInput(YourFormat); >> SplitDataProperties > splitProps = >> ((DataSource)text).getSplitDataProperties(); >> splitProps.splitsGroupedBy(0,1) >> splitProps.splitsPartitionedBy(0,1) >> >> With this info, Flink knows that the data returned by our source is >> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to >> run a local groupReduce operation on each of the 80 tasks (hostname and >> parallel index result in 80 keys) and locally reduce the data. >> Next step would be another .groupBy(0).groupReduce() which gives 16 >> groups which are distributed across your tasks. >> >> However, you have to be careful with the SplitDataProperties. If you get >> them wrong, the optimizer makes false assumption and the resulting plan >> might not compute what you are looking for. >> I'd recommend to read the JavaDocs and play a bit with this feature to >> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help to >> figure out what is happening. >> >> Best, >> Fabian >> >> >> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke : >> >>> Hi all, >>> >>> I'm having some trouble grasping what the meaning of/difference between >>> the following concepts is: >>> >>> - Split >>> - Group >>> - Partition >>> >>> Let me elaborate a bit on the problem I'm trying to solve here. In my >>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in >>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the >>> JobManager on one node, and a TaskManager on each node. I'm assigning 16 >>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 >>> Slots). >>> >>> The data I want to process resides in a local folder on each worker with >>> the same path (say /tmp/input). There can be arbitrarily many input files >>> in each worker's folder. I have written a custom input format that >>> round-robin assigns the files to each of the 16 local input splits ( >>> https://github.com/robert-schmidtke/hdfs-statistics-adapter >>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/ >>> io/SfsInputFormat.java) to obtain a total of 80 input splits that need >>> processing. Each split reads zero or more files, parsing the contents into >>> records that are emitted correctly. This works as expected. >>> >>> Now we're getting to the questions. How do these 80 input splits relate >>> to groups and partitions? My understanding of a partition is a subset of my >>>
Re: Objects accessible from all Flink nodes
Errata: How can an *object (such as the classifier, line 1)* be accessed by any Flink node [...] Just in case, the classifier itself can't be serialized I believe, it's part of a framework which I can't modify. In any case, even if it's serialized, I guess the cost of moving it to one node and then another makes the whole data flow unpractical. It's better to move all created instances to one single node where only one instance of the classifier is maintained. I'm not sure if this is possible or how to do this. On Thu, Jan 12, 2017 at 11:11 PM, Mattwrote: > Hello, > > I have a stream of objects which I use to update the model of a > classification algorithm and another stream with the objects I need to > classify in real time. > > The problem is that the instances for training and evaluation are > processed on potentially different Flink nodes, but the classifier should > be applied to all instances no matter in what node it was generated (ie, > the classifier should be accessible from any Flink node). > > Just to make it clearer, here is what would NOT work since these sink > functions are not serializable: https://gist.github.com/ > b979bf742b0d2f3da8cc8e5e91207151 > > Two questions here: > > *1. How can an instance be accessed by any Flink node like this (line 11 > and 19)? Maybe there's a better approach to this problem.* > > *2. In the example the second stream (line 15) is started right away but > at startup the classifier is not ready to use until it has been trained > with enough instances. Is it possible to do this? If I'm not wrong > env.execute (line 24) can be used only once.* > > Regards, > Matt >
Re: Terminology: Split, Group and Partition
Hi Fabian, thanks for the quick and comprehensive reply. I'll have a look at the ExecutionPlan using your suggestion to check what actually gets computed, and I'll use the properties as well. If I stumble across something else I'll let you know. Many thanks again! Robert On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueskewrote: > Hi Robert, > > let me first describe what splits, groups, and partitions are. > > * Partition: This is basically all data that goes through the same task > instance. If you have an operator with a parallelism of 80, you have 80 > partitions. When you call sortPartition() you'll have 80 sorted streams, if > you call mapPartition you iterate over all records in one partition. > * Split: Splits are a concept of InputFormats. An InputFormat can process > several splits. All splits that are processed by the same data source task > make up the partition of that task. So a split is a subset of a partition. > In your case where each task reads exactly one split, the split is > equivalent to the partition. > * Group: A group is based on the groupBy attribute and hence data-driven > and does not depend on the parallelism. A groupReduce requires a > partitioning such that all records with the same grouping attribute are > sent to the same operator, i.e., all are part of the same partition. > Depending on the number of distinct grouping keys (and the hash-function) a > partition can have zero, one, or more groups. > > Now coming to your use case. You have 80 sources running on 5 machines. > All source on the same machine produce records with the same grouping key > (hostname). You can actually give a hint to Flink, that the data returned > by a split is partitioned, grouped, or sorted in a specific way. This works > as follows: > > // String is hostname, Integer is parallel id of the source task > DataSet > = env.createInput(YourFormat); > SplitDataProperties > splitProps = > ((DataSource)text).getSplitDataProperties(); > splitProps.splitsGroupedBy(0,1) > splitProps.splitsPartitionedBy(0,1) > > With this info, Flink knows that the data returned by our source is > partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to > run a local groupReduce operation on each of the 80 tasks (hostname and > parallel index result in 80 keys) and locally reduce the data. > Next step would be another .groupBy(0).groupReduce() which gives 16 groups > which are distributed across your tasks. > > However, you have to be careful with the SplitDataProperties. If you get > them wrong, the optimizer makes false assumption and the resulting plan > might not compute what you are looking for. > I'd recommend to read the JavaDocs and play a bit with this feature to see > how it behaves. ExecutionEnvironment.getExecutionPlan() can help to > figure out what is happening. > > Best, > Fabian > > > 2017-01-13 12:14 GMT+01:00 Robert Schmidtke : > >> Hi all, >> >> I'm having some trouble grasping what the meaning of/difference between >> the following concepts is: >> >> - Split >> - Group >> - Partition >> >> Let me elaborate a bit on the problem I'm trying to solve here. In my >> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in >> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the >> JobManager on one node, and a TaskManager on each node. I'm assigning 16 >> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 >> Slots). >> >> The data I want to process resides in a local folder on each worker with >> the same path (say /tmp/input). There can be arbitrarily many input files >> in each worker's folder. I have written a custom input format that >> round-robin assigns the files to each of the 16 local input splits ( >> https://github.com/robert-schmidtke/hdfs-statistics-adapter >> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/ >> io/SfsInputFormat.java) to obtain a total of 80 input splits that need >> processing. Each split reads zero or more files, parsing the contents into >> records that are emitted correctly. This works as expected. >> >> Now we're getting to the questions. How do these 80 input splits relate >> to groups and partitions? My understanding of a partition is a subset of my >> DataSet that is local to each node. I.e. if I were to repartition the >> data according to some scheme, a shuffling over workers would occur. After >> reading all the data, I have 80 partitions, correct? >> >> What is less clear to me is the concept of a group, i.e. the result of a >> groupBy operation. The input files I have are produced on each worker by >> some other process. I first want to do pre-aggregation (I hope that's the >> term) on each node before sending data over the network. The records I'm >> processing contain a 'hostname' attribute, which is set to the worker's >> hostname that processes the data, because the DataSources
Re: Kafka topic partition skewness causes watermark not being emitted
Hi, This is expected behaviour due to how the per-partition watermarks are designed in the Kafka consumer, but I think it’s probably a good idea to handle idle partitions also when the Kafka consumer itself emits watermarks. I’ve filed a JIRA issue for this: https://issues.apache.org/jira/browse/FLINK-5479. For the time being, I don’t think there will be an easy way to avoid this with the existing APIs, unfortunately. Is the skewed partition data intentional, or only for experimental purposes? Best, Gordon On January 12, 2017 at 5:28:40 PM, tao xiao (xiaotao...@gmail.com) wrote: Hi team, I have a topic with 2 partitions in Kafka. I produced all data to partition 0 and no data to partition 1. I created a Flink job with parallelism to 1 that consumes that topic and count the events with session event window (5 seconds gap). It turned out that the session event window was never closed even I sent a message with 10 minutes gap. After digging into the source code, AbstractFetcher[1] that is responsible for sending watermark to downstream calculates the min watermark of all partitions. Due to the fact that we don't have data in partition 1, the watermark returned from partition 1is always Long.MIN_VALUE therefore AbstractFetcher never fires the watermark to downstream. I want to know if this is expected behavior or a bug. If this is expected behavior how do I avoid the delay of watermark firing when data is not evenly distributed to all partitions? This is the timestamp extractor I used public class ExactTimestampExtractor implements AssignerWithPeriodicWatermarks { private long currentMaxTimestamp = Long.MIN_VALUE; @Nullable @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentMaxTimestamp - 1); } @Override public long extractTimestamp(SessionEvent element, long previousElementTimestamp) { long eventStartTime = (long) element.get(SessionEvent.SESSION_START_DT); if (eventStartTime > currentMaxTimestamp) { currentMaxTimestamp = eventStartTime; } return eventStartTime; } } and this is the Flink topo // get input data FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010<>("topic4", new MyOwnSchema() consumer.assignTimestampsAndWatermarks(new ExactTimestampExtractor()); DataStream input = env.addSource(consumer); input. keyBy("id"). window(EventTimeSessionWindows.withGap(Time.seconds(5))). reduce(new Reducer(), new WindowFunction()). print(); // // execute program env.execute("a job"); I used the latest code in github [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java#L539
Re: Terminology: Split, Group and Partition
Hi Robert, let me first describe what splits, groups, and partitions are. * Partition: This is basically all data that goes through the same task instance. If you have an operator with a parallelism of 80, you have 80 partitions. When you call sortPartition() you'll have 80 sorted streams, if you call mapPartition you iterate over all records in one partition. * Split: Splits are a concept of InputFormats. An InputFormat can process several splits. All splits that are processed by the same data source task make up the partition of that task. So a split is a subset of a partition. In your case where each task reads exactly one split, the split is equivalent to the partition. * Group: A group is based on the groupBy attribute and hence data-driven and does not depend on the parallelism. A groupReduce requires a partitioning such that all records with the same grouping attribute are sent to the same operator, i.e., all are part of the same partition. Depending on the number of distinct grouping keys (and the hash-function) a partition can have zero, one, or more groups. Now coming to your use case. You have 80 sources running on 5 machines. All source on the same machine produce records with the same grouping key (hostname). You can actually give a hint to Flink, that the data returned by a split is partitioned, grouped, or sorted in a specific way. This works as follows: // String is hostname, Integer is parallel id of the source task DataSet> = env.createInput(YourFormat); SplitDataProperties > splitProps = ((DataSource)text).getSplitDataProperties(); splitProps.splitsGroupedBy(0,1) splitProps.splitsPartitionedBy(0,1) With this info, Flink knows that the data returned by our source is partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to run a local groupReduce operation on each of the 80 tasks (hostname and parallel index result in 80 keys) and locally reduce the data. Next step would be another .groupBy(0).groupReduce() which gives 16 groups which are distributed across your tasks. However, you have to be careful with the SplitDataProperties. If you get them wrong, the optimizer makes false assumption and the resulting plan might not compute what you are looking for. I'd recommend to read the JavaDocs and play a bit with this feature to see how it behaves. ExecutionEnvironment.getExecutionPlan() can help to figure out what is happening. Best, Fabian 2017-01-13 12:14 GMT+01:00 Robert Schmidtke : > Hi all, > > I'm having some trouble grasping what the meaning of/difference between > the following concepts is: > > - Split > - Group > - Partition > > Let me elaborate a bit on the problem I'm trying to solve here. In my > tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in > standalone mode. Each node has 64G of memory and 32 cores. I'm starting the > JobManager on one node, and a TaskManager on each node. I'm assigning 16 > slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 > Slots). > > The data I want to process resides in a local folder on each worker with > the same path (say /tmp/input). There can be arbitrarily many input files > in each worker's folder. I have written a custom input format that > round-robin assigns the files to each of the 16 local input splits ( > https://github.com/robert-schmidtke/hdfs-statistics- > adapter/blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/io/ > SfsInputFormat.java) to obtain a total of 80 input splits that need > processing. Each split reads zero or more files, parsing the contents into > records that are emitted correctly. This works as expected. > > Now we're getting to the questions. How do these 80 input splits relate to > groups and partitions? My understanding of a partition is a subset of my > DataSet that is local to each node. I.e. if I were to repartition the > data according to some scheme, a shuffling over workers would occur. After > reading all the data, I have 80 partitions, correct? > > What is less clear to me is the concept of a group, i.e. the result of a > groupBy operation. The input files I have are produced on each worker by > some other process. I first want to do pre-aggregation (I hope that's the > term) on each node before sending data over the network. The records I'm > processing contain a 'hostname' attribute, which is set to the worker's > hostname that processes the data, because the DataSources are local. That > means the records produced by the worker on host1 always contain the > attribute hostname=host1. Similar for the other 4 workers. > > Now what happens if I do a groupBy("hostname")? How do the workers realize > that no network transfer is necessary? Is a group a logical abstraction, or > a physical one (in my understanding a partition is physical because it's > local to exactly one worker). > > What I'd like to do next is a reduceGroup to merge multiple records
Terminology: Split, Group and Partition
Hi all, I'm having some trouble grasping what the meaning of/difference between the following concepts is: - Split - Group - Partition Let me elaborate a bit on the problem I'm trying to solve here. In my tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in standalone mode. Each node has 64G of memory and 32 cores. I'm starting the JobManager on one node, and a TaskManager on each node. I'm assigning 16 slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 Slots). The data I want to process resides in a local folder on each worker with the same path (say /tmp/input). There can be arbitrarily many input files in each worker's folder. I have written a custom input format that round-robin assigns the files to each of the 16 local input splits ( https://github.com/robert-schmidtke/hdfs-statistics-adapter/blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/io/SfsInputFormat.java) to obtain a total of 80 input splits that need processing. Each split reads zero or more files, parsing the contents into records that are emitted correctly. This works as expected. Now we're getting to the questions. How do these 80 input splits relate to groups and partitions? My understanding of a partition is a subset of my DataSet that is local to each node. I.e. if I were to repartition the data according to some scheme, a shuffling over workers would occur. After reading all the data, I have 80 partitions, correct? What is less clear to me is the concept of a group, i.e. the result of a groupBy operation. The input files I have are produced on each worker by some other process. I first want to do pre-aggregation (I hope that's the term) on each node before sending data over the network. The records I'm processing contain a 'hostname' attribute, which is set to the worker's hostname that processes the data, because the DataSources are local. That means the records produced by the worker on host1 always contain the attribute hostname=host1. Similar for the other 4 workers. Now what happens if I do a groupBy("hostname")? How do the workers realize that no network transfer is necessary? Is a group a logical abstraction, or a physical one (in my understanding a partition is physical because it's local to exactly one worker). What I'd like to do next is a reduceGroup to merge multiple records into one (some custom, yet straightforward, aggregation) and emit another record for every couple of input records. Am I correct in assuming that the Iterable values passed to the reduce function all have the same hostname value? That is, will the operation have a parallelism of 80, where 5x16 operations will have the same hostname value? Because I have 16 splits per host, the 16 reduces on host1 should all receive values with hostname=host1, correct? And after the operation has finished, will the reduced groups (now actual DataSets again) still be local to the workers? This is quite a lot to work on I have to admit. I'm happy for any hints, advice and feedback on this. If there's need for clarification I'd be happy to provide more information. Thanks a lot in advance! Robert -- My GPG Key ID: 336E2680
Re: 答复: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release candidate)
I tested the Table API / SQL a bit. I implemented a windowed aggregation with the streaming Table API and it produced the same results as a DataStream API implementation. Joining a stream with a TableFunction also seemed to work well. Moreover, I checked the results of a bunch of TPC-H queries (batch SQL) and all produced correct results. 2017-01-12 17:45 GMT+01:00 Till Rohrmann: > I'm wondering whether we should not depend the webserver encryption on the > global encryption activation and activating it instead per default. > > On Thu, Jan 12, 2017 at 4:54 PM, Chesnay Schepler > wrote: > > > FLINK-5470 is a duplicate of FLINK-5298 for which there is also an open > PR. > > > > FLINK-5472 is imo invalid since the webserver does support https, you > just > > have to enable it as per the security documentation. > > > > > > On 12.01.2017 16:20, Till Rohrmann wrote: > > > > I also found an issue: > > > > https://issues.apache.org/jira/browse/FLINK-5470 > > > > I also noticed that Flink's webserver does not support https requests. It > > might be worthwhile to add it, though. > > > > https://issues.apache.org/jira/browse/FLINK-5472 > > > > On Thu, Jan 12, 2017 at 11:24 AM, Robert Metzger > > wrote: > > > >> I also found a bunch of issues > >> > >> https://issues.apache.org/jira/browse/FLINK-5465 > >> https://issues.apache.org/jira/browse/FLINK-5462 > >> https://issues.apache.org/jira/browse/FLINK-5464 > >> https://issues.apache.org/jira/browse/FLINK-5463 > >> > >> > >> On Thu, Jan 12, 2017 at 9:56 AM, Fabian Hueske < > >> fhue...@gmail.com> wrote: > >> > >> > I have another bugfix for 1.2.: > >> > > >> > https://issues.apache.org/jira/browse/FLINK-2662 (pending PR) > >> > > >> > 2017-01-10 15:16 GMT+01:00 Robert Metzger < > >> rmetz...@apache.org>: > >> > > >> > > Hi, > >> > > > >> > > this depends a lot on the number of issues we find during the > testing. > >> > > > >> > > > >> > > These are the issues I found so far: > >> > > > >> > > https://issues.apache.org/jira/browse/FLINK-5379 (unresolved) > >> > > https://issues.apache.org/jira/browse/FLINK-5383 (resolved) > >> > > https://issues.apache.org/jira/browse/FLINK-5382 (resolved) > >> > > https://issues.apache.org/jira/browse/FLINK-5381 (resolved) > >> > > https://issues.apache.org/jira/browse/FLINK-5380 (pending PR) > >> > > > >> > > > >> > > > >> > > > >> > > On Tue, Jan 10, 2017 at 11:58 AM, shijinkui > >> > wrote: > >> > > > >> > > > Do we have a probable time of 1.2 release? This month or Next > month? > >> > > > > >> > > > -邮件原件- > >> > > > 发件人: Robert Metzger [mailto: > >> rmetz...@apache.org] > >> > > > 发送时间: 2017年1月3日 20:44 > >> > > > 收件人: d...@flink.apache.org > >> > > > 抄送: user@flink.apache.org > >> > > > 主题: [DISCUSS] Apache Flink 1.2.0 RC0 (Non-voting testing release > >> > > candidate) > >> > > > > >> > > > Hi, > >> > > > > >> > > > First of all, I wish everybody a happy new year 2017. > >> > > > > >> > > > I've set user@flink in CC so that users who are interested in > >> helping > >> > > > with the testing get notified. Please respond only to the dev@ > >> list to > >> > > > keep the discussion there! > >> > > > > >> > > > According to the 1.2 release discussion thread, I've created a > first > >> > > > release candidate for Flink 1.2. > >> > > > The release candidate will not be the final release, because I'm > >> > certain > >> > > > that we'll find at least one blocking issue in the candidate :) > >> > > > > >> > > > Therefore, the RC is meant as a testing only release candidate. > >> > > > Please report every issue we need to fix before the next RC in > this > >> > > thread > >> > > > so that we have a good overview. > >> > > > > >> > > > The release artifacts are located here: > >> > > > http://people.apache.org/~rmetzger/flink-1.2.0-rc0/ > >> > > > > >> > > > The maven staging repository is located here: > >> > > > https://repository.apache.org/content/repositories/orgapache > >> flink- > >> > > > > >> > > > The release commit (in branch "release-1.2.0-rc0"): > >> > > > http://git-wip-us.apache.org/repos/asf/flink/commit/f3c59ced > >> > > > > >> > > > > >> > > > Happy testing! > >> > > > > >> > > > >> > > >> > > > > > > >
Re: the attribute order in sql 'select * from...'
I think the sorting is done for consistency reasons, i.e., that all PojoTypeInfos for the same class behave the same. Since this code is used in many parts of Flink and many jobs (DataSet, DataStream, etc.) I would be very careful to change the default behavior here. Maybe we can add a constructor that does not sort the fields. 2017-01-13 10:46 GMT+01:00 Hongyuhong: > Hi Fabian, > > > > Yes, OrderA is a table of POJO. > > But what I consume is that in construct func PojoTypeInfo > > The input param (fields)’s order is right, it ‘s change after the sort > operation, and I’m wonder if the sort operation can be removed? > > > > public PojoTypeInfo(Class typeClass, List fields) { > > super(typeClass); > > checkArgument(Modifier.isPublic(typeClass.getModifiers()),"POJO %s is > not public", typeClass); > > this.fields = fields.toArray(new PojoField[fields.size()]); > > *Arrays.sort(this.fields, new Comparator() {* > > * @Override* > > * public int compare(PojoField o1, PojoField o2) {* > > * return o1.getField().getName().compareTo(o2.getField().getName());* > > * }* > > }); > > > > > > *发件人:* Fabian Hueske [mailto:fhue...@gmail.com] > *发送时间:* 2017年1月13日 17:23 > *收件人:* user@flink.apache.org > *主题:* Re: the attribute order in sql 'select * from...' > > > > Hi Yuhong, > > I assume that OrderA is a table of POJO objects and you are expecting the > order of the attribute to be as the order in which the fields of the POJO > are defined in the source code. > > Flink accepts fields which are either public members or accessible via a > getter and setter. > > This makes it difficult to automatically define an order, esp. if some > fields use getter and setter or public fields. Would the order depend on > the field (which might not exist in case of getter/setter) or setter or > getter methods (which might also not exist). > > I'm also not sure if it is possible to extract the line number of a method > or field via reflection. > > Best, Fabian > > > > > > 2017-01-13 9:54 GMT+01:00 Hongyuhong : > > Hi, > > I’m now using streaming sql, And I have the sql like > > select * FROM OrderA where user > 2 > > the OrderA has 3 attr (user, product, amount) > > and I expect the result is as the order like input, but it has been sorted > by attr name, > > and I found the order has already been sorted when call addSource, > > What is the purpose of doing so?cause it’s a little not meet our > requirements. > > > > Thanks very much. > > > > > > public PojoTypeInfo(Class typeClass, List fields) { > >super(typeClass); > > > >checkArgument(Modifier.isPublic(typeClass. > getModifiers()), > > "POJO %s is not public", typeClass); > > > >this.fields = fields.toArray(new > PojoField[fields.size()]); > > > >Arrays.sort(this.fields, new Comparator() { > > @Override > > public int compare(PojoField o1, PojoField o2) > { > > return o1.getField().getName(). > compareTo(o2.getField().getName()); > > } > >}); > > > > > > Best, > > Yuhong > > >
Re: some questions about submit flink job on flink-yarn
Hi Huang, this seems to be very strange, because the JobManager’s actor system has bound to the address 9-96-101-177 instead of 9.96.101.177. It seems a if the . have been replaced by -. Could you maybe tell me which version of Flink you’re running and also share the complete JobManager log with us? I tested it with the latest 1.2 SNAPSHOT version and there it seemed to work. Cheers, Till On Fri, Jan 13, 2017 at 9:02 AM, huangwei (G)wrote: > Dear All, > > I get an error in jobmanage.log following when I submit a flink job > (batch/WordCount.jar) by using command : "./bin/flink run -m > 9.96.101.177:39180 ./examples/batch/WordCount.jar". > > And the flink is on yarn cluster. > > Error in jobmanage.log : > 2017-01-13 15:28:27,402 ERROR akka.remote.EndpointWriter > - dropping message [class akka.actor.ActorSelectionMessage] > for non-local recipient [Actor[akka.tcp://flink@9.96.101.177:39180/]] > arriving at [akka.tcp://flink@9.96.101.177:39180] inbound addresses are > [akka.tcp://flink@9-96-101-177:39180] > > However, It is success when I use flink web-ui to submit the job. > > How to solve this problem? > > And otherwise, when I started the flink on yarn, the jobmanage.rpc.port > and the web port both were changed to 39180 and 57724. > The configuration following in flink-conf.yaml is just as default : > > jobmanager.rpc.port: 6123 > > and > > jobmanager.web.port: 8081 > > I started the flink on yarn using command : "./bin/yarn-session.sh -n 4". > > Why were the ports changed to 39180 and 57724? > > Many thanks if there is any help! > > HuangWHWHW > 2017.1.13 >
Strategies for Complex Event Processing with guaranteed data consistency
I have been playing around with Flink for a few weeks to try to ascertain whether or not it meets our use cases, and also what best practices we should be following. I have a few questions I would appreciate answers to. Our scenario is that we want to process a lot of event data into cases. A case is an inorder sequence of events; this event data could be quite old. We never know when a case is complete, so we just want to have the most up to date picture of what a case looks like. The inorder sequence of events of a case is called the trace. Many cases could have an identical trace. We would like to construct these traces, and do some aggregations on those (case count, average/min/max life-cycle time). We then have further downstream processing we will do on a case, some of which would require additional inputs, either from side-inputs of somehow joining data sources. We don’t really care about event time at the moment, because we just want to build cases and traces with all the data we have received. The end results should be available for our web front end via rest api. Based on the above I have the following idea for a first implementation: Kafka source -> key by case id -> session window with rocks db state backend holding case for that key -> postgres sink The reason for a session window is that, as I mentioned above, we just want to build a group with all the data we have received into kafka up until that point in time. We would experiment with what this gap time should be, and in future it might be specific to the type of group, but for the start a naive approach is acceptable. I think this could be better than just doing it, say, every 10 minutes because we really don’t know yet the frequency of the data received. Also, some inputs to kafka come directly from a CSV upload, so we will get “firehose” periods, and periods of nothing. In short: I think what we have closely matches session behaviour. We also have to implement a postgres sink that is capable of doing upserts. The reason for postgres is to service the rest front end. We then have to build our traces and can see two options for it: 1) The most obvious solution would be to use a kafka sink for the keyed case stream, and to do the trace aggregations in a downstream flink job with this kafka topic as a source. However, I have some concerns over losing any data (i.e. how do we know whether or not an event has been successfully pushed into the kafka stream). 2) Another approach might be to use some other type of sink (perhaps postgres), and to use this as a source for the traces job. This would help us guarantee data consistency. 3) Or, to somehow re-merge the keyed cases stream (is this a broadcast?), so: Keyed cases stream -> broadcast -> key by tracehash with rocks db state backend holding trace for that tracehash -> perform aggregrations -> postgres sink Is broadcast an option here? How costly is it? Which of these approaches (or any other), would you recommend? - Another question regarding the state: As we never know when a case is complete this means that the rocksdb backend could grow infinitely (!). Obviously we would need to get a bit smarter here. Is using state in this way a somewhat standard practice, or is state intended more for recovery? Managing growing state: I found some discussion regarding how to clear state here http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Time-To-Live-Setting-for-State-StateDescriptor-td10391.html#a10402 which references https://issues.apache.org/jira/browse/FLINK-3946 Thanks, Kat
Re: the attribute order in sql 'select * from...'
Hi Yuhong, as a solution you can specify the order of your Pojo fields when converting from DataStream to Table. Table table = tableEnv .fromDataSet(env.fromCollection(data), "department AS a, " + "age AS b, " + "salary AS c, " + "name AS d") .select("a, b, c, d"); Timo Am 13/01/17 um 10:22 schrieb Fabian Hueske: Hi Yuhong, I assume that OrderA is a table of POJO objects and you are expecting the order of the attribute to be as the order in which the fields of the POJO are defined in the source code. Flink accepts fields which are either public members or accessible via a getter and setter. This makes it difficult to automatically define an order, esp. if some fields use getter and setter or public fields. Would the order depend on the field (which might not exist in case of getter/setter) or setter or getter methods (which might also not exist). I'm also not sure if it is possible to extract the line number of a method or field via reflection. Best, Fabian 2017-01-13 9:54 GMT+01:00 Hongyuhong>: Hi, I’m now using streaming sql, And I have the sql like select * FROM OrderA where user > 2 the OrderA has 3 attr (user, product, amount) and I expect the result is as the order like input, but it has been sorted by attr name, and I found the order has already been sorted when call addSource, What is the purpose of doing so?cause it’s a little not meet our requirements. Thanks very much. public PojoTypeInfo(Class typeClass, List fields) { super(typeClass); checkArgument(Modifier.isPublic(typeClass.getModifiers()), "POJO %s is not public", typeClass); this.fields = fields.toArray(new PojoField[fields.size()]); Arrays.sort(this.fields, new Comparator() { @Override public int compare(PojoField o1, PojoField o2) { return o1.getField().getName().compareTo(o2.getField().getName()); } }); Best, Yuhong
Re: the attribute order in sql 'select * from...'
Hi Fabian, Yes, OrderA is a table of POJO. But what I consume is that in construct func PojoTypeInfo The input param (fields)’s order is right, it ‘s change after the sort operation, and I’m wonder if the sort operation can be removed? public PojoTypeInfo(Class typeClass, List fields) { super(typeClass); checkArgument(Modifier.isPublic(typeClass.getModifiers()),"POJO %s is not public", typeClass); this.fields = fields.toArray(new PojoField[fields.size()]); Arrays.sort(this.fields, new Comparator() { @Override public int compare(PojoField o1, PojoField o2) { return o1.getField().getName().compareTo(o2.getField().getName()); } }); 发件人: Fabian Hueske [mailto:fhue...@gmail.com] 发送时间: 2017年1月13日 17:23 收件人: user@flink.apache.org 主题: Re: the attribute order in sql 'select * from...' Hi Yuhong, I assume that OrderA is a table of POJO objects and you are expecting the order of the attribute to be as the order in which the fields of the POJO are defined in the source code. Flink accepts fields which are either public members or accessible via a getter and setter. This makes it difficult to automatically define an order, esp. if some fields use getter and setter or public fields. Would the order depend on the field (which might not exist in case of getter/setter) or setter or getter methods (which might also not exist). I'm also not sure if it is possible to extract the line number of a method or field via reflection. Best, Fabian 2017-01-13 9:54 GMT+01:00 Hongyuhong>: Hi, I’m now using streaming sql, And I have the sql like select * FROM OrderA where user > 2 the OrderA has 3 attr (user, product, amount) and I expect the result is as the order like input, but it has been sorted by attr name, and I found the order has already been sorted when call addSource, What is the purpose of doing so?cause it’s a little not meet our requirements. Thanks very much. public PojoTypeInfo(Class typeClass, List fields) { super(typeClass); checkArgument(Modifier.isPublic(typeClass.getModifiers()), "POJO %s is not public", typeClass); this.fields = fields.toArray(new PojoField[fields.size()]); Arrays.sort(this.fields, new Comparator() { @Override public int compare(PojoField o1, PojoField o2) { return o1.getField().getName().compareTo(o2.getField().getName()); } }); Best, Yuhong
the attribute order in sql 'select * from...'
Hi, I'm now using streaming sql, And I have the sql like select * FROM OrderA where user > 2 the OrderA has 3 attr (user, product, amount) and I expect the result is as the order like input, but it has been sorted by attr name, and I found the order has already been sorted when call addSource, What is the purpose of doing so?cause it's a little not meet our requirements. Thanks very much. public PojoTypeInfo(Class typeClass, List fields) { super(typeClass); checkArgument(Modifier.isPublic(typeClass.getModifiers()), "POJO %s is not public", typeClass); this.fields = fields.toArray(new PojoField[fields.size()]); Arrays.sort(this.fields, new Comparator() { @Override public int compare(PojoField o1, PojoField o2) { return o1.getField().getName().compareTo(o2.getField().getName()); } }); Best, Yuhong
Re: 1.1.4 on YARN - vcores change?
Hi Shannon, Flink is reading the number of available vcores from the local YARN configuration. Is it possible that the YARN / Hadoop config on the machine where you are submitting your job from sets the number of vcores as 4 ? On Fri, Jan 13, 2017 at 12:51 AM, Shannon Careywrote: > Did anything change in 1.1.4 with regard to YARN & vcores? > > I'm getting this error when deploying 1.1.4 to my test cluster. Only the > Flink version changed. > > [0mjava.lang.RuntimeException: Couldn't deploy Yarn cluster > [0m at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:384) > [0m at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:591) > [0m at > org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:465) > [0mCaused by: org.apache.flink.configuration.IllegalConfigurationException: > The number of virtual cores per node were configured with 8 but Yarn only has > 4 virtual cores available. Please note that the number of virtual cores is > set to the number of task slots by default unless configured in the Flink > config with 'yarn.containers.vcores.' > [0m at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.isReadyForDeployment(AbstractYarnClusterDescriptor.java:273) > [0m at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:393) > [0m at > org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploy(AbstractYarnClusterDescriptor.java:381) > [0m ... 2 more > > > When I run: ./bin/yarn-session.sh –q > It shows 8 vCores on each machine: > > NodeManagers in the ClusterClient 3|Property |Value > > +---+ > > |NodeID |ip-10-2-…:8041 > > |Memory |12288 MB > > |vCores |8 > > |HealthReport | > > |Containers |0 > > +---+ > > |NodeID |ip-10-2-…:8041 > > |Memory |12288 MB > > |vCores |8 > > |HealthReport | > > |Containers |0 > > +---+ > > |NodeID |ip-10-2-…:8041 > > |Memory |12288 MB > > |vCores |8 > > |HealthReport | > > |Containers |0 > > +---+ > > Summary: totalMemory 36864 totalCores 24 > > Queue: default, Current Capacity: 0.0 Max Capacity: 1.0 Applications: 0 > > I'm running: > ./bin/yarn-session.sh –n 3 --jobManagerMemory 1504 --taskManagerMemory > 10764 --slots 8 —detached > > I have not specified any value for "yarn.containers.vcores" in my config. > > I switched to –n 5 and —slots 4, and halved the taskManagerMemory, which > allowed the cluster to start. > > However, in the YARN "Nodes" UI I see "VCores Used: 2" and "VCores Avail: > 6" on all three nodes. And if I look at one of the Containers, it says, > "Resource: 5408 Memory, 1 VCores". I don't understand what's happening here. > > Thanks… >