Re: [DISCUSS] FLIP-6 - Flink Deployment and Process Model - Standalone, Yarn, Mesos, Kubernetes, etc.
The design looks great - it solves for very diverse deployment modes, allows for heterogeneous TMs, and promotes job isolation. Some feedback: *Dispatcher* The dispatcher concept here expands nicely on what was introduced in the Mesos design doc (MESOS-1984). The most significant difference being the job-centric orientation of the dispatcher API. FLIP-6 seems to eliminate the concept of a session (or, defines it simply as the lifecycle of a JM); is that correct? Do you agree I should revise the Mesos dispatcher design to be job-centric? I'll be taking the first crack at implementing the dispatcher (for Mesos only) in MESOS-1984 (T2). I’ll keep FLIP-6 in mind as I go. The dispatcher's backend behavior will vary significantly for Mesos vs standalone vs others. Assumedly a base class with concrete implementations will be introduced. To echo the FLIP-6 design as I understand it: 1) Standalone a) The dispatcher process starts an RM, dispatcher frontend, and "local" dispatcher backend at startup. b) Upon job submission, the local dispatcher backend creates an in-process JM actor for the job. c) The JM allocates slots as normal. The RM draws from its pool of registered TM, which grows and shrinks due (only) to external events. 2) Mesos a) The dispatcher process starts a dispatcher frontend and "Mesos" dispatcher backend at startup. b) Upon job submission, the Mesos dispatcher backend creates a Mesos task (dubbed an "AppMaster") which contains a JM/RM for the job. c) The system otherwise functions as described in the Mesos design doc. *Client* I'm concerned about the two code paths that the client uses to launch a job (with-dispatcher vs without-dispatcher). Maybe it could be unified by saying that the client always calls the dispatcher, and that the dispatcher is hostable in either the client or in a separate process. The only variance would be the client-to-dispatcher transport (local vs HTTP). *RM* On the issue of RM statefulness, we can say that the RM does not persist slot allocation (the ground truth is in the TM), but may persist other information (related to cluster manager interaction). For example, the Mesos RM persists the assigned framework identifier and per-task planning information (as is highly recommended by the Mesos development guide). On RM fencing, I was already wondering whether to add it to the Mesos RM, so it is nice to see it being introduced more generally. My rationale is, the dispatcher cannot guarantee that only a single RM is running, because orphaned tasks are possible in certain Mesos failure situations. Similarly, I’m unsure whether YARN provides a strong guarantee about the AM. *User Code* Having job code on the system classpath seems possible in only a subset of cases. The variability may be complex. How important is this optimization? *Security Implications* It should be noted that the standalone embodiment doesn't offer isolation between jobs. The whole system will have a single security context (as it does now). Meanwhile, the ‘high-trust’ nature of the dispatcher in other scenarios is rightly emphasized. The fact that user code shouldn't be run in the dispatcher process (except in standalone) must be kept in mind. The design doc of FLINK-3929 (section C2) has more detail on that. -Eron > On Jul 28, 2016, at 2:22 AM, Maximilian Michelswrote: > > Hi Stephan, > > Thanks for the nice wrap-up of ideas and discussions we had over the > last months (not all on the mailing list though because we were just > getting started with the FLIP process). The document is very > comprehensive and explains the changes in great details, even up to > the message passing level. > > What I really like about the FLIP is that we delegate multi-tenancy > away from the JobManager to the resource management framework and the > dispatchers. This will help to make the JobManager component cleaner > and simpler. The prospect of having the user jars directly in the > system classpath of the workers, instead of dealing with custom class > loaders, is very nice. > > The model we have for acquiring and releasing resources wouldn't work > particularly well with all the new deployment options, so +1 on a new > task slot request/offer system and +1 for making the ResourceManager > responsible for TaskManager registration and slot management. This is > well aligned with the initial idea of the ResourceManager component. > > We definitely need good testing for these changes since the > possibility of bugs increases with the additional number of messages > introduced. > > The only thing that bugs me is whether we make the Standalone mode a > bit less nice to use. The initial bootstrapping of the nodes via the > local dispatchers and the subsequent registration of TaskManagers and > allocation of slots could cause some delay. It's not a major concern > though because it will take little time
Re: Discard out-of-order events
@Gyula: This is documented in the JavaDoc of the `allowedLateness(..)` method and in the docs: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#dealing-with-late-data @Kevin: Thanks for the explanation, I'll get back to you soon (sort of in a rush). Cheers, Max On Fri, Jul 29, 2016 at 10:57 AM, Gyula Fórawrote: > Hi Max, > > So if I understand correctly the window operators now, by default, discard > late elements? > Is this documented somewhere? > > Gyula > > Maximilian Michels ezt írta (időpont: 2016. júl. 29., P, > 10:40): > >> Hi! >> >> I'm not sure whether I understand your question. The purpose of Event >> Time is to be able to process out-of-order events. Do you want to >> discard late elements? In the upcoming Flink 1.1.0 you can set the >> `allowedLateness` on a windowed stream. The default is 0, so late >> elements are discarded; late elements are elements which arrive after >> the Watermark has reached the operator. >> >> Cheers, >> Max >> >> On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs >> wrote: >> > Is it possible to discard events that are out-of-order (in terms of event >> > time)? >> > >>
Re: [VOTE] Release Apache Flink 1.1.0 (RC1)
When running "mvn clean verify" with Hadoop version 2.6.1 the Zookeeper/Leader Election tests fail with this: java.lang.NoSuchMethodError: org.apache.curator.utils.PathUtils.validatePath(Ljava/lang/String;)Ljava/lang/String; at org.apache.curator.framework.imps.NamespaceImpl.(NamespaceImpl.java:37) at org.apache.curator.framework.imps.CuratorFrameworkImpl.(CuratorFrameworkImpl.java:113) at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:124) at org.apache.flink.runtime.util.ZooKeeperUtils.startCuratorFramework(ZooKeeperUtils.java:101) at org.apache.flink.runtime.util.ZooKeeperUtils.createLeaderRetrievalService(ZooKeeperUtils.java:143) at org.apache.flink.runtime.util.LeaderRetrievalUtils.createLeaderRetrievalService(LeaderRetrievalUtils.java:70) at org.apache.flink.runtime.leaderelection.ZooKeeperLeaderRetrievalTest.testTimeoutOfFindConnectingAddress(ZooKeeperLeaderRetrievalTest.java:187) I'll continue testing other parts and other Hadoop versions. On Wed, 27 Jul 2016 at 11:51 Ufuk Celebiwrote: > Dear Flink community, > > Please vote on releasing the following candidate as Apache Flink version > 1.1.0. > > I've CC'd u...@flink.apache.org as users are encouraged to help > testing Flink 1.1.0 for their specific use cases. Please feel free to > report issues and successful tests on dev@flink.apache.org. > > The commit to be voted on: > 3a18463 (http://git-wip-us.apache.org/repos/asf/flink/commit/3a18463) > > Branch: > release-1.1.0-rc1 > ( > https://git1-us-west.apache.org/repos/asf/flink/repo?p=flink.git;a=shortlog;h=refs/heads/release-1.1.0-rc1 > ) > > The release artifacts to be voted on can be found at: > http://people.apache.org/~uce/flink-1.1.0-rc1/ > > The release artifacts are signed with the key with fingerprint 9D403309: > http://www.apache.org/dist/flink/KEYS > > The staging repository for this release can be found at: > https://repository.apache.org/content/repositories/orgapacheflink-1098 > > There is also a Google doc to coordinate the testing efforts. This is > a copy of the release document found in our Wiki: > > https://docs.google.com/document/d/1cDZGtnGJKLU1fLw8AE_FzkoDLOR8amYT2oc3mD0_lw4/edit?usp=sharing > > - > > Thanks to everyone who contributed to this release candidate. > > The vote is open for the next 3 days (not counting the weekend) and > passes if a majority of at least three +1 PMC votes are cast. > > The vote ends on Monday August 1st, 2016. > > [ ] +1 Release this package as Apache Flink 1.1.0 > [ ] -1 Do not release this package, because ... >
Re: FlinkKafkaConsumer09
Thanks! On Fri, Jul 29, 2016 at 11:43 AM, Gordon Tai (戴資力)wrote: > Hi Max, > > Sure, I was planning to do so, but wanted to see if it was a reasonable > feature to add before opening a JIRA :) > Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280 > > Regards, > Gordon > > On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michels wrote: > >> Hi Tai, >> >> Should definitely be possible. Would you mind opening a JIRA issue >> with the description you posted? >> >> Thanks, >> Max >> >> On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon wrote: >> > Hi Kevin, >> > >> > Just a re-clarification: for Kafka 0.9 it would be “earliest”, & >> “smallest” >> > for the older Kafka 0.8. >> > >> > I’m wondering whether or not it is reasonable to add a Flink-specific way >> > to set the consumer’s starting position to “earliest” and “latest”, >> without >> > respecting the external Kafka offset store. Perhaps we can change the >> > current behaviour (checking committed offsets in Kafka as starting point) >> > as a user option, and add new options to read from “earliest” and >> “latest” >> > regardless of the groupId and externally committed offsets. I think this >> > better matches how users usually interpret the functionality of setting >> > starting positions, while also keeping the “auto.offset.reset” behaviour >> > that frequent Kafka users are used to. Also, this would also more clearly >> > define that under the context of Flink, the external Kafka offset store >> is >> > used only to expose the consumers progress to the outside world, and not >> > used to manipulate how topics are read. >> > >> > Just an idea I have in mind, not sure if it would be a reasonable add. >> It’d >> > be great to hear what other think of this. >> > >> > Regards, >> > Gordon >> > >> > >> > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) >> wrote: >> > >> > Thank you Gordon and Max, >> > >> > Thank you Gordon, that explains the behaviour a bit better to me. I am >> > now adding the timestamp to the group ID and that is a good workaround >> > for now. The "smallest" option is unfortunately not available in this >> > version of the FlinkKafkaConsumer class. >> > >> > Cheers, >> > Kevin >> > >> > >> > On 28.07.2016 10:39, Maximilian Michels wrote: >> >> Hi Kevin, >> >> >> >> You need to use properties.setProperty("auto.offset.reset", >> >> "smallest") for Kafka 9 to start from the smallest offset. Note, that >> >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset", >> >> "earliest") to achieve the same behavior. >> >> >> >> Kafka keeps track of the offsets per group id. If you have already >> >> read from a topic with a certain group id and want to restart from the >> >> smallest offset available, you need to generate a unique group id. >> >> >> >> Cheers, >> >> Max >> >> >> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs >> > wrote: >> >>> Hi, >> >>> >> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09 >> > class. I >> >>> am using Flink 1.0.3. >> >>> >> >>> These are my properties: >> >>> >> >>> val properties = new Properties() >> >>> properties.setProperty("bootstrap.servers", config.urlKafka) >> >>> properties.setProperty("group.id", COLLECTOR_NAME) >> >>> properties.setProperty("auto.offset.reset", *"earliest"*) >> >>> >> >>> According to the new consumer API of Kafka, this should result in the >> >>> following: >> >>> >> >>> /auto.offset.reset: * smallest : automatically reset the offset to the >> >>> smallest offset/ (source: >> >>> https://kafka.apache.org/documentation.html#newconsumerapi) >> >>> >> >>> However, it starts from the latest item in my topic. Is this a bug or >> am >> > I >> >>> doing something wrong? >> >>> >> >>> Regards, >> >>> Kevin >> >>> >> > > > > -- > Tzu-Li (Gordon) Tai
[jira] [Created] (FLINK-4284) DataSet/CEP link to non-existant "Linking with Flink" section
Chesnay Schepler created FLINK-4284: --- Summary: DataSet/CEP link to non-existant "Linking with Flink" section Key: FLINK-4284 URL: https://issues.apache.org/jira/browse/FLINK-4284 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.1.0 Reporter: Chesnay Schepler Relevant section for DataSet (apis/batch/index.md; L 57-60): {code} The following program is a complete, working example of WordCount. You can copy paste the code to run it locally. You only have to include the correct Flink's library into your project (see Section [Linking with Flink](#linking-with-flink)) and specify the imports. Then you are ready to go! {code} Relevant section for CEP(apis/streaming/libs/cep.md; L 45-48: {code} ## Getting Started If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/apis/batch/index.html#linking-with-flink). Next, you have to add the FlinkCEP dependency to the `pom.xml` of your project. {code} The CEP doc probably shouldn't refer to the DataSet documentation at all. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4283) ExecutionGraphRestartTest fails
Chesnay Schepler created FLINK-4283: --- Summary: ExecutionGraphRestartTest fails Key: FLINK-4283 URL: https://issues.apache.org/jira/browse/FLINK-4283 Project: Flink Issue Type: Bug Affects Versions: 1.1.0 Environment: Ubuntu 14.04 W10 Reporter: Chesnay Schepler I encounter reliable failures for the following tests: testRestartAutomatically(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) Time elapsed: 120.089 sec <<< FAILURE! java.lang.AssertionError: expected: but was: at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:743) at org.junit.Assert.assertEquals(Assert.java:118) at org.junit.Assert.assertEquals(Assert.java:144) at org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680) at org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testRestartAutomatically(ExecutionGraphRestartTest.java:155) taskShouldNotFailWhenFailureRateLimitWasNotExceeded(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) Time elapsed: 2.055 sec <<< FAILURE! java.lang.AssertionError: expected: but was: at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:743) at org.junit.Assert.assertEquals(Assert.java:118) at org.junit.Assert.assertEquals(Assert.java:144) at org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.restartAfterFailure(ExecutionGraphRestartTest.java:680) at org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.taskShouldNotFailWhenFailureRateLimitWasNotExceeded(ExecutionGraphRestartTest.java:180) testFailingExecutionAfterRestart(org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest) Time elapsed: 120.079 sec <<< FAILURE! java.lang.AssertionError: expected: but was: at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.failNotEquals(Assert.java:743) at org.junit.Assert.assertEquals(Assert.java:118) at org.junit.Assert.assertEquals(Assert.java:144) at org.apache.flink.runtime.executiongraph.ExecutionGraphRestartTest.testFailingExecutionAfterRestart(ExecutionGraphRestartTest.java:397) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4282) Add Offset Parameter to WindowAssigners
Aljoscha Krettek created FLINK-4282: --- Summary: Add Offset Parameter to WindowAssigners Key: FLINK-4282 URL: https://issues.apache.org/jira/browse/FLINK-4282 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Aljoscha Krettek Currently, windows are always aligned to EPOCH, which basically means days are aligned with GMT. This is somewhat problematic for people living in different timezones. And offset parameter would allow to adapt the window assigner to the timezone. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink
About processing time and timestamps: The timestamp is either set in the source of in an in-between TimestampAssigner that can be used with DataStream.assignTimestampsAndWatermarks(). However, the timestamp in the element is normally not a "processing-time timestamp". I think it might make sense to split the functionality for the evictors into two parts: one that implicitly sets a timestamp and one that uses these timestamps. It could look like this: DataStream input = ... // this makes the current processing time explicit in the tuples: DataStream> withTimestamps = input.map(new ReifyProcessingTIme()); withTimestamps .keyBy(...) .window(..) .evictor(new ProcessingTimeEvictor()) .apply(...) where ProcessingTimeEvictor looks like this: class ProcessingTimeEvictor extends Evictor > { void evictBefore(Iterable >, ...); void evictAfter ... } This would make everything that is happening explicit in the type signatures and explicit for the user. Cheers, Aljoscha On Thu, 28 Jul 2016 at 18:32 Aljoscha Krettek wrote: > Hi, > in fact, changing it to Iterable would simplify things because then we > would not have to duplicate code for the EvictingWindowOperator any more. > It could be a very thin subclass of WindowOperator. > > Cheers, > Aljoscha > > On Wed, 27 Jul 2016 at 03:56 Vishnu Viswanath < > vishnu.viswanat...@gmail.com> wrote: > >> Hi Aljoscha, >> >> Regarding your concern - to not expose the StreamRecord in the Evictor, >> were you able to find any alternative? >> >> I tried to make the methods take Iterable input similar to the >> WindowFunction, but that didn't work since we have to clear the state and >> add the elements back to the state (to fix the bug mentioned in the >> previous mail) >> >> If you think the interface that accepts Iterable >> elements is >> good enough, I have the changes ready. >> >> Thanks, >> Vishnu >> >> On Mon, Jul 25, 2016 at 7:48 AM, Aljoscha Krettek >> wrote: >> >> > Hi, >> > the elements are currently not being removed from the buffers. That's a >> bug >> > that we could fix while adding the new Evictor interface. >> > >> > Cheers, >> > Aljoscha >> > >> > On Mon, 25 Jul 2016 at 13:00 Radu Tudoran >> wrote: >> > >> > > Hi Aljoscha, >> > > >> > > Can you point us to the way it is handled now. Is there anything else >> for >> > > the removing of elements other than the skip in >> EvictingWindowOperator. >> > Is >> > > there something as it was before version 1.x where you had an explicit >> > > remove from window buffers? >> > > >> > > Dr. Radu Tudoran >> > > Research Engineer - Big Data Expert >> > > IT R Division >> > > >> > > >> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > European Research Center >> > > Riesstrasse 25, 80992 München >> > > >> > > E-mail: radu.tudo...@huawei.com >> > > Mobile: +49 15209084330 >> > > Telephone: +49 891588344173 >> > > >> > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com >> > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, >> > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN >> > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063, >> > > Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN >> > > This e-mail and its attachments contain confidential information from >> > > HUAWEI, which is intended only for the person or entity whose address >> is >> > > listed above. Any use of the information contained herein in any way >> > > (including, but not limited to, total or partial disclosure, >> > reproduction, >> > > or dissemination) by persons other than the intended recipient(s) is >> > > prohibited. If you receive this e-mail in error, please notify the >> sender >> > > by phone or email immediately and delete it! >> > > >> > > >> > > -Original Message- >> > > From: Aljoscha Krettek [mailto:aljos...@apache.org] >> > > Sent: Monday, July 25, 2016 11:45 AM >> > > To: dev@flink.apache.org >> > > Subject: Re: [DISCUSS][FLIP-4] Enhance Window Evictor in Flink >> > > >> > > Hi, >> > > I think there is not yet a clear specification for how the actual >> removal >> > > of elements from the buffer will work. I think naively one can do: >> > > >> > > Iterable currentElements = state.get() >> > > evictor.evict(currentElements); // this will remove some stuff from >> > there, >> > > or mark for removal >> > > >> > > state.clear() >> > > // the Iterable does not loop over the removed/marked elements >> > > for (E element : currentElements) { >> > > state.add(element) >> > > } >> > > >> > > This is very costly but the only way I see of doing this right now >> with >> > > every state backend. >> > > >> > > Cheers, >> > > Aljoscha >> > > >> > > On Mon, 25 Jul 2016 at 09:46 Radu Tudoran >> > wrote: >> > > >> > > > Hi, >> > > > >> > > > Thanks for the clarification. Can
[jira] [Created] (FLINK-4281) Wrap all Calcite Exceptions in Flink Exceptions
Timo Walther created FLINK-4281: --- Summary: Wrap all Calcite Exceptions in Flink Exceptions Key: FLINK-4281 URL: https://issues.apache.org/jira/browse/FLINK-4281 Project: Flink Issue Type: Bug Components: Table API & SQL Reporter: Timo Walther Some exceptions are already wrapped in Flink exceptions but there are still exceptions thrown by Calcite. I would propose that all Exceptions thrown by the Table API are Flink's Exceptions, esp. the FlinkPlannerImpl exceptions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: FlinkKafkaConsumer09
Hi Max, Sure, I was planning to do so, but wanted to see if it was a reasonable feature to add before opening a JIRA :) Here's the new JIRA: https://issues.apache.org/jira/browse/FLINK-4280 Regards, Gordon On Fri, Jul 29, 2016 at 4:03 PM, Maximilian Michelswrote: > Hi Tai, > > Should definitely be possible. Would you mind opening a JIRA issue > with the description you posted? > > Thanks, > Max > > On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordon wrote: > > Hi Kevin, > > > > Just a re-clarification: for Kafka 0.9 it would be “earliest”, & > “smallest” > > for the older Kafka 0.8. > > > > I’m wondering whether or not it is reasonable to add a Flink-specific way > > to set the consumer’s starting position to “earliest” and “latest”, > without > > respecting the external Kafka offset store. Perhaps we can change the > > current behaviour (checking committed offsets in Kafka as starting point) > > as a user option, and add new options to read from “earliest” and > “latest” > > regardless of the groupId and externally committed offsets. I think this > > better matches how users usually interpret the functionality of setting > > starting positions, while also keeping the “auto.offset.reset” behaviour > > that frequent Kafka users are used to. Also, this would also more clearly > > define that under the context of Flink, the external Kafka offset store > is > > used only to expose the consumers progress to the outside world, and not > > used to manipulate how topics are read. > > > > Just an idea I have in mind, not sure if it would be a reasonable add. > It’d > > be great to hear what other think of this. > > > > Regards, > > Gordon > > > > > > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) > wrote: > > > > Thank you Gordon and Max, > > > > Thank you Gordon, that explains the behaviour a bit better to me. I am > > now adding the timestamp to the group ID and that is a good workaround > > for now. The "smallest" option is unfortunately not available in this > > version of the FlinkKafkaConsumer class. > > > > Cheers, > > Kevin > > > > > > On 28.07.2016 10:39, Maximilian Michels wrote: > >> Hi Kevin, > >> > >> You need to use properties.setProperty("auto.offset.reset", > >> "smallest") for Kafka 9 to start from the smallest offset. Note, that > >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset", > >> "earliest") to achieve the same behavior. > >> > >> Kafka keeps track of the offsets per group id. If you have already > >> read from a topic with a certain group id and want to restart from the > >> smallest offset available, you need to generate a unique group id. > >> > >> Cheers, > >> Max > >> > >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs > > wrote: > >>> Hi, > >>> > >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09 > > class. I > >>> am using Flink 1.0.3. > >>> > >>> These are my properties: > >>> > >>> val properties = new Properties() > >>> properties.setProperty("bootstrap.servers", config.urlKafka) > >>> properties.setProperty("group.id", COLLECTOR_NAME) > >>> properties.setProperty("auto.offset.reset", *"earliest"*) > >>> > >>> According to the new consumer API of Kafka, this should result in the > >>> following: > >>> > >>> /auto.offset.reset: * smallest : automatically reset the offset to the > >>> smallest offset/ (source: > >>> https://kafka.apache.org/documentation.html#newconsumerapi) > >>> > >>> However, it starts from the latest item in my topic. Is this a bug or > am > > I > >>> doing something wrong? > >>> > >>> Regards, > >>> Kevin > >>> > -- Tzu-Li (Gordon) Tai
[jira] [Created] (FLINK-4279) [py] Set flink dependencies to provided
Chesnay Schepler created FLINK-4279: --- Summary: [py] Set flink dependencies to provided Key: FLINK-4279 URL: https://issues.apache.org/jira/browse/FLINK-4279 Project: Flink Issue Type: Improvement Components: Python API Affects Versions: 1.1.0 Reporter: Chesnay Schepler Assignee: Chesnay Schepler Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Discard out-of-order events
Goodmorning :-), Thank you for your answer. Let me explain my problem more thoroughly (maybe other options are possible here, not necessary with allowedLateness). The most compact description of my problem would be Stream Enrichment. More concrete, suppose I have two streams, where I want to enrich one stream with another stream: val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c") val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A")) The goal here, is to mix information from the infoStream into the mainStream so the enriched stream would contain the following tuples: ("a", "Whoops, it is A") ("a", "Whoops, it is A") ("b", "It is B") ("a", "Whoops, it is A") ("a", "Whoops, it is A") ("b", "It is B") ("b", "It is B") ("a", "Whoops, it is A") ("c", "It is C") ("b", "It is B") ("a", "Whoops, it is A") ("c", "It is C") It is not a requirement that the enriched stream has the same ordering on the elements as the mainStream. However, it is important that new elements in the infoStream override older elements from the infoStream. You can see here that (4, "a", "Whoops, it is A") arrived later than (1, "a", "It is F") (if you look at the event time, which is the first element of every tuple). So, the infoStream (at t=4) should contain the following tuples: (4, "a", "Whoops, it is A") (2, "b", "It is B") (3, "c", "It is C") So, what I thought of, is iterating over the infoStream can keep relevant records. Then coGroup it with the mainStream to enrich it. This approach works fine: val mainStream = env.fromElements("a", "a", "b", "a", "a", "b", "b", "a", "c", "b", "a", "c") val infoStream = env.fromElements((1, "a", "It is F"), (2, "b", "It is B"), (3, "c", "It is C"), (4, "a", "Whoops, it is A")) .keyBy(1) .iterate(iteration => { val filtered = iteration.keyBy(1).maxBy(0) (iteration, filtered) }) mainStream .coGroup(infoStream) .where[String]((x: String) => x) .equalTo(_._2) .window(TumblingProcessingTimeWindows.of(Time.milliseconds(50))) { (first: Iterator[String], second: Iterator[(Int, String, String)], out: Collector[(String, String)]) => { val records = scala.collection.mutable.MutableList[(Int, String, String)]() for ((record: (Int, String, String)) <- second) { records += record } for ((key: String) <- first) { var bestDescription = "?" for (record <- records) { if (record._2 == key) { bestDescription = record._3 } } out.collect((key, bestDescription)) } } } .print() My questions for you: - Can I make this more efficient? - Is there a way of mixing datasets and datastreams? That would be really awesome (for at least this use case). - Is there a way to ensure checkpoints, since I am using an iterative stream here? - Can I get rid of the TumblingProcessingTimeWindows? Because in fact, all of this can be done by Apache Spark. It would be great if Apache Flink could archieve a higher throughput rate than Apache Spark in this use case. I am curious to your answers! Cheers, Kevin On 29.07.2016 10:40, Maximilian Michels wrote: Hi! I'm not sure whether I understand your question. The purpose of Event Time is to be able to process out-of-order events. Do you want to discard late elements? In the upcoming Flink 1.1.0 you can set the `allowedLateness` on a windowed stream. The default is 0, so late elements are discarded; late elements are elements which arrive after the Watermark has reached the operator. Cheers, Max On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobswrote: Is it possible to discard events that are out-of-order (in terms of event time)?
Re: Discard out-of-order events
Hi Max, So if I understand correctly the window operators now, by default, discard late elements? Is this documented somewhere? Gyula Maximilian Michelsezt írta (időpont: 2016. júl. 29., P, 10:40): > Hi! > > I'm not sure whether I understand your question. The purpose of Event > Time is to be able to process out-of-order events. Do you want to > discard late elements? In the upcoming Flink 1.1.0 you can set the > `allowedLateness` on a windowed stream. The default is 0, so late > elements are discarded; late elements are elements which arrive after > the Watermark has reached the operator. > > Cheers, > Max > > On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobs > wrote: > > Is it possible to discard events that are out-of-order (in terms of event > > time)? > > >
Re: Discard out-of-order events
Hi! I'm not sure whether I understand your question. The purpose of Event Time is to be able to process out-of-order events. Do you want to discard late elements? In the upcoming Flink 1.1.0 you can set the `allowedLateness` on a windowed stream. The default is 0, so late elements are discarded; late elements are elements which arrive after the Watermark has reached the operator. Cheers, Max On Thu, Jul 28, 2016 at 6:49 PM, Kevin Jacobswrote: > Is it possible to discard events that are out-of-order (in terms of event > time)? >
Re: FlinkKafkaConsumer09
Hi Tai, Should definitely be possible. Would you mind opening a JIRA issue with the description you posted? Thanks, Max On Thu, Jul 28, 2016 at 11:16 AM, Tai Gordonwrote: > Hi Kevin, > > Just a re-clarification: for Kafka 0.9 it would be “earliest”, & “smallest” > for the older Kafka 0.8. > > I’m wondering whether or not it is reasonable to add a Flink-specific way > to set the consumer’s starting position to “earliest” and “latest”, without > respecting the external Kafka offset store. Perhaps we can change the > current behaviour (checking committed offsets in Kafka as starting point) > as a user option, and add new options to read from “earliest” and “latest” > regardless of the groupId and externally committed offsets. I think this > better matches how users usually interpret the functionality of setting > starting positions, while also keeping the “auto.offset.reset” behaviour > that frequent Kafka users are used to. Also, this would also more clearly > define that under the context of Flink, the external Kafka offset store is > used only to expose the consumers progress to the outside world, and not > used to manipulate how topics are read. > > Just an idea I have in mind, not sure if it would be a reasonable add. It’d > be great to hear what other think of this. > > Regards, > Gordon > > > On July 28, 2016 at 4:44:02 PM, Kevin Jacobs (kevin.jac...@cern.ch) wrote: > > Thank you Gordon and Max, > > Thank you Gordon, that explains the behaviour a bit better to me. I am > now adding the timestamp to the group ID and that is a good workaround > for now. The "smallest" option is unfortunately not available in this > version of the FlinkKafkaConsumer class. > > Cheers, > Kevin > > > On 28.07.2016 10:39, Maximilian Michels wrote: >> Hi Kevin, >> >> You need to use properties.setProperty("auto.offset.reset", >> "smallest") for Kafka 9 to start from the smallest offset. Note, that >> in Kafka 8 you need to use properties.setProperty("auto.offset.reset", >> "earliest") to achieve the same behavior. >> >> Kafka keeps track of the offsets per group id. If you have already >> read from a topic with a certain group id and want to restart from the >> smallest offset available, you need to generate a unique group id. >> >> Cheers, >> Max >> >> On Thu, Jul 28, 2016 at 10:14 AM, Kevin Jacobs > wrote: >>> Hi, >>> >>> I am currently facing strange behaviour of the FlinkKafkaConsumer09 > class. I >>> am using Flink 1.0.3. >>> >>> These are my properties: >>> >>> val properties = new Properties() >>> properties.setProperty("bootstrap.servers", config.urlKafka) >>> properties.setProperty("group.id", COLLECTOR_NAME) >>> properties.setProperty("auto.offset.reset", *"earliest"*) >>> >>> According to the new consumer API of Kafka, this should result in the >>> following: >>> >>> /auto.offset.reset: * smallest : automatically reset the offset to the >>> smallest offset/ (source: >>> https://kafka.apache.org/documentation.html#newconsumerapi) >>> >>> However, it starts from the latest item in my topic. Is this a bug or am > I >>> doing something wrong? >>> >>> Regards, >>> Kevin >>>