Re: [DISCUSS] FLIP-27: Refactor Source Interface

2020-01-15 Thread Becket Qin
Hi Steven, Unfortunately we were behind schedule and did not get this into 1.10... So it will be in 1.11 instead. Thanks, Jiangjie (Becket) Qin On Thu, Jan 16, 2020 at 10:39 AM Steven Wu wrote: > Becket, is FLIP-27 still on track to be released in 1.10? > > On Tue, Jan 7, 2020 at 7:04 PM Beck

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2020-01-15 Thread Steven Wu
Becket, is FLIP-27 still on track to be released in 1.10? On Tue, Jan 7, 2020 at 7:04 PM Becket Qin wrote: > Hi folks, > > Happy new year! > > Stephan and I chatted offline yesterday. After reading the email thread > again, I found that I have misunderstood Dawid's original proposal > regarding

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2020-01-07 Thread Becket Qin
Hi folks, Happy new year! Stephan and I chatted offline yesterday. After reading the email thread again, I found that I have misunderstood Dawid's original proposal regarding the behavior of env.source(BoundedSource) and had an incorrect impression about the behavior of java covariant return type

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-22 Thread Becket Qin
Hi Steven, I think the current proposal is what you mentioned - a Kafka source that can be constructed in either BOUNDED or UNBOUNDED mode. And Flink can get the boundedness by invoking getBoundedness(). So one can create a Kafka source by doing something like the following: new KafkaSource().st

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-19 Thread Jark Wu
Hi, First of all, I think it is not called "UNBOUNDED", according to the FLIP-27, it is called "CONTINUOUS_UNBOUNDED". And from the description of the Boundedness in the FLIP-27[1] declares clearly what Becket and I think. public enum Boundedness { /** * A bounded source processes the d

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-19 Thread Steven Wu
Becket, Regarding "UNBOUNDED source that stops at some point", I found it difficult to grasp what UNBOUNDED really mean. If we want to use Kafka source with an end/stop time, I guess you call it UNBOUNDED kafka source that stops (aka BOUNDED-streaming). The terminology is a little confusing to me

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-19 Thread Becket Qin
I had an offline chat with Jark, and here are some more thoughts: 1. From SQL perspective, BOUNDED source leads to the batch execution mode, UNBOUNDED source leads to the streaming execution mode. 2. The semantic of UNBOUNDED source is may or may not stop. The semantic of BOUNDED source is will st

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-19 Thread Becket Qin
Hi Timo, Bounded is just a special case of unbounded and every bounded source can > also be treated as an unbounded source. This would unify the API if > people don't need a bounded operation. With option 3 users can still get a unified API with something like below: DataStream boundedStream =

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-19 Thread Timo Walther
Hi Becket, regarding *Option 3* I think we can relax the constraints for env.source(): // MySource can be bounded or unbounded DataStream dataStream = env.source(mySource); // MySource must be bounded, otherwise throws exception. BoundedDataStream boundedDataStream = env.boundedSource(mySource)

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-18 Thread Becket Qin
Hi Jark, Please see the reply below: Regarding to option#3, my concern is that if we don't support streaming > mode for bounded source, > how could we create a testing source for streaming mode? Currently, all the > testing source for streaming > are bounded, so that the integration test will fin

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-17 Thread Jark Wu
Hi Becket, That's great we have reached a consensus on Source#getBoundedness(). Regarding to option#3, my concern is that if we don't support streaming mode for bounded source, how could we create a testing source for streaming mode? Currently, all the testing source for streaming are bounded, so

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-17 Thread Becket Qin
Hi folks, Thanks for the comments. I am convinced that the Source API should not take boundedness as a parameter after it is constructed. What Timo and Dawid suggested sounds a reasonable solution to me. So the Source API would become: Source { Boundedness getBoundedness(); } Assuming the ab

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-16 Thread Timo Walther
Hi Becket, I completely agree with Dawid's suggestion. The information about the boundedness should come out of the source. Because most of the streaming sources can be made bounded based on some connector specific criterion. In Kafka, it would be an end offset or end timestamp but in any case

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-15 Thread Becket Qin
Hi Dawid and Jark, I think the discussion ultimately boils down to the question that which one of the following two final states do we want? Once we make this decision, everything else can be naturally derived. *Final state 1*: Separate API for bounded / unbounded DataStream & Table. That means a

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-11 Thread Jingsong Li
Hi Becket, I also have some performance concerns too. If I understand correctly, SourceOutput will emit data per record into the queue? I'm worried about the multithreading performance of this queue. > One example is some batched messaging systems which only have an offset for the entire batch i

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-11 Thread Jark Wu
Hi Becket, I think Dawid explained things clearly and makes a lot of sense. I'm also in favor of #2, because #1 doesn't work for our future unified envrionment. You can see the vision in this documentation [1]. In the future, we would like to drop the global streaming/batch mode in SQL (i.e. Envi

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-11 Thread Piotr Nowojski
Hi, Regarding the: Collection getNextRecords() I’m pretty sure such design would unfortunately impact the performance (accessing and potentially creating the collection on the hot path). Also the InputStatus emitNext(DataOutput output) throws Exception; or Status pollNext(SourceOutput sourceO

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-11 Thread Till Rohrmann
Hi Becket, quick clarification from my side because I think you misunderstood my question. I did not suggest to let the SourceReader return only a single record at a time when calling getNextRecords. As the return type indicates, the method can return an arbitrary number of records. Cheers, Till

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-11 Thread Dawid Wysakowicz
Hi Becket, Issue #1 - Design of Source interface I mentioned the lack of a method like Source#createEnumerator(Boundedness boundedness, SplitEnumeratorContext context), because without the current proposal is not complete/does not work. If we say that boundedness is an intrinsic property of a so

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-10 Thread Becket Qin
Hi folks, Thanks for the discussion, great feedback. Also thanks Dawid for the explanation, it is much clearer now. One thing that is indeed missing from the FLIP is how the boundedness is passed to the Source implementation. So the API should be Source#createEnumerator(Boundedness boundedness, S

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-10 Thread Till Rohrmann
Hi everyone, thanks for drafting this FLIP. It reads very well. Concerning Dawid's proposal, I tend to agree. The boundedness could come from the source and tell the system how to treat the operator (scheduling wise). From a user's perspective it should be fine to get back a DataStream when calli

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-09 Thread Jingsong Li
Hi all, I think current design is good. My understanding is: For execution mode: bounded mode and continuous mode, It's totally different. I don't think we have the ability to integrate the two models at present. It's about scheduling, memory, algorithms, States, etc. we shouldn't confuse them.

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-09 Thread Jark Wu
I agree with Dawid's point that the boundedness information should come from the source itself (e.g. the end timestamp), not through env.boundedSouce()/continuousSource(). I think if we want to support something like `env.source()` that derive the execution mode from source, `supportsBoundedness(Bo

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-09 Thread Dawid Wysakowicz
One more thing. In the current proposal, with the supportsBoundedness(Boundedness) method and the boundedness coming from either continuousSource or boundedSource I could not find how this information is fed back to the SplitEnumerator. Best, Dawid On 09/12/2019 13:52, Becket Qin wrote: > Hi Daw

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-09 Thread Dawid Wysakowicz
Hi Becket, I am not sure if I understood the last paragraph correctly, but let me clarify my thoughts. I would not add any bounded/batch specific methods to the DataStream. Imo all the user facing bounded/batch specific methods should be exposed through the new BoundedDataStream interface. 1. U

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-09 Thread Becket Qin
Hi Dawid, Thanks for the comments. This actually brings another relevant question about what does a "bounded source" imply. I actually had the same impression when I look at the Source API. Here is what I understand after some discussion with Stephan. The bounded source has the following impacts.

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-04 Thread Becket Qin
ing, just want to speak up some ideas in my mind. >> >> >> +1 to the FLIP and detailed design! >> >> >> >> [1]. >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Side+Inputs+for+DataStream+API >> >> >> Best, >> >&

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-04 Thread Becket Qin
gt; Best, > > Jiayi Liao > > Original Message > *Sender:* Stephan Ewen > *Recipient:* dev > *Date:* Wednesday, Dec 4, 2019 18:25 > *Subject:* Re: [DISCUSS] FLIP-27: Refactor Source Interface > > Thanks, Becket, for updating this. > > I agree with moving the aspects you

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-04 Thread bupt_ljy
: Stephan Ewen Recipient: dev Date: Wednesday, Dec 4, 2019 18:25 Subject: Re: [DISCUSS] FLIP-27: Refactor Source Interface Thanks, Becket, for updating this. I agree with moving the aspects you mentioned into separate FLIPs - this one way becoming unwieldy in size. +1 to the FLIP in its current

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-04 Thread Stephan Ewen
Thanks, Becket, for updating this. I agree with moving the aspects you mentioned into separate FLIPs - this one way becoming unwieldy in size. +1 to the FLIP in its current state. Its a very detailed write-up, nicely done! On Wed, Dec 4, 2019 at 7:38 AM Becket Qin wrote: > Hi all, > > Sorry fo

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-03 Thread Becket Qin
Hi all, Sorry for the long belated update. I have updated FLIP-27 wiki page with the latest proposals. Some noticeable changes include: 1. A new generic communication mechanism between SplitEnumerator and SourceReader. 2. Some detail API method signature changes. We left a few things out of this

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-11-15 Thread Stephan Ewen
Hi Łukasz! Becket and me are working hard on figuring out the last details and implementing the first PoC. We would update the FLIP hopefully next week. There is a fair chance that a first version of this will be in 1.10, but I think it will take another release to battle test it and migrate the

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-11-15 Thread Łukasz Jędrzejewski
Hi, This proposal looks very promising for us. Do you have any plans in which Flink release it is going to be released? We are thinking on using a Data Set API for our future use cases but on the other hand Data Set API is going to be deprecated so using proposed bounded data streams solution c

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-10-01 Thread Thomas Weise
Thanks for putting together this proposal! I see that the "Per Split Event Time" and "Event Time Alignment" sections are still TBD. It would probably be good to flesh those out a bit before proceeding too far as the event time alignment will probably influence the interaction with the split reade

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-07-26 Thread Biao Liu
Hi Stephan, Thank you for feedback! Will take a look at your branch before public discussing. On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen wrote: > Hi Biao! > > Thanks for reviving this. I would like to join this discussion, but am > quite occupied with the 1.9 release, so can we maybe pause

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-07-25 Thread Stephan Ewen
Hi Biao! Thanks for reviving this. I would like to join this discussion, but am quite occupied with the 1.9 release, so can we maybe pause this discussion for a week or so? In the meantime I can share some suggestion based on prior experiments: How to do watermarks / timestamp extractors in a si

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-07-25 Thread Biao Liu
Hi devs, Since 1.9 is nearly released, I think we could get back to FLIP-27. I believe it should be included in 1.10. There are so many things mentioned in document of FLIP-27. [1] I think we'd better discuss them separately. However the wiki is not a good place to discuss. I wrote google doc abo

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-03-28 Thread Biao Liu
Hi Steven, Thank you for the feedback. Please take a look at the document FLIP-27 which is updated recently. A lot of details of enumerator were added in this document. I think it would help. Steven Wu 于2019年

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-03-27 Thread Steven Wu
This proposal mentioned that SplitEnumerator might run on the JobManager or in a single task on a TaskManager. if enumerator is a single task on a taskmanager, then the job DAG can never been embarrassingly parallel anymore. That will nullify the leverage of fine-grained recovery for embarrassingl

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-01-28 Thread Biao Liu
Hi Stephan & Piotrek, Thank you for feedback. It seems that there are a lot of things to do in community. I am just afraid that this discussion may be forgotten since there so many proposals recently. Anyway, wish to see the split topics soon :) Piotr Nowojski 于2019年1月24日周四 下午8:21写道: > Hi Biao

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-01-24 Thread Stephan Ewen
Before creating any JIRA issues, we need to converge a bit further on the design. There are too many unsolved questions in the above summary. I would try and come up with a next version of the interface proposal in the coming days and use that as the base to continue the discussion. Whether this

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-01-24 Thread Piotr Nowojski
Hi Biao! This discussion was stalled because of preparations for the open sourcing & merging Blink. I think before creating the tickets we should split this discussion into topics/areas outlined by Stephan and create Flips for that. I think there is no chance for this to be completed in couple

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-01-20 Thread Biao Liu
Hi community, The summary of Stephan makes a lot sense to me. It is much clearer indeed after splitting the complex topic into small ones. I was wondering is there any detail plan for next step? If not, I would like to push this thing forward by creating some JIRA issues. Another question is that s

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-30 Thread Stephan Ewen
Thanks everyone for the lively discussion. Let me try to summarize where I see convergence in the discussion and open issues. I'll try to group this by design aspect of the source. Please let me know if I got things wrong or missed something crucial here. For issues 1-3, if the below reflects the

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-27 Thread Kostas Kloudas
Hi Biao, Thanks for the answer! So given the multi-threaded readers, now we have as open questions: 1) How do we let the checkpoints pass through our multi-threaded reader operator? 2) Do we have separate reader and source operators or not? In the strategy that has a separate source, the source

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-26 Thread Biao Liu
Hi Kostas again, Did I misunderstand you in last response? If you mean checkpoint in the scenario that the source and the split reader are in different operators, like Aljoscha's prototype. That's indeed a problem, so I think that's would not be the final version. Aljoscha also said in FLIP doc t

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-26 Thread Biao Liu
Hi Kostas, Regarding the checkpoint of "per thread for each split mode". IMO, there are severals things source operator need to do. 1. Source operator need to record all splits in checkpoint. The unfinished splits must be recorded. I'm not sure whether we could skip recording the finished splits,

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-26 Thread Kostas Kloudas
Hi all, >From the discussion, I understand that we are leaning towards a design where the user writes a single-threaded SplitReader, which Flink executes on another thread (not the main task thread). This way the task can have multiple readers running concurrently, each one reading a different spl

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-25 Thread Biao Liu
Hi community, Glad to see this topic is still so active. Thanks for replying @Piotrek and @Becket. Last time, I expressed some rough ideas about the thread model. However I found that it's hard to describe clearly in mailing list. So I wrote it down with some graphs, exampled some kinds of models

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-23 Thread Guowei Ma
Hi,Piotr Sorry for so late to response. First of all I think Flink runtime can assigned a thread for a StreamTask, which likes 'Actor' model. The number of threads for a StreamTask should not be proportional to the operator or other things. This will give Flink the ability to scale horizontally

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-22 Thread Becket Qin
Hi Piotrek, Regarding the split assignment. My hunch is that Flink might not have enough information to assign the splits to the readers in the best way. Even if a SplitReader says it COULD take another split, it does not mean it is the best reader to take the split. For example, it is possible th

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-22 Thread Piotr Nowojski
Hi Becket, I think the problem is not with the split re-assignment, but with dynamic split discovery. We do not always know before the hand the number of splits (for example Kafka partition/topic discovery, but this can also happen in batch), while the source parallelism is fixed/known before h

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-21 Thread Becket Qin
Thanks Piotrek, > void SplitReader#addSplit(Split) > boolean SplitReader#doesWantMoreSplits() I have two questions about this API. 1. What if the SplitReader implementation cannot easily add a split to read on the fly? 2. Does Flink have to be involved in splits assignment? I am wondering if it

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-21 Thread Piotr Nowojski
Hi again, > However I don't like the thread mode which starts a thread for each split. > Starting extra thread in operator is not an ideal way IMO. Especially > thread count is decided by split count. So I was wondering if there is a > more elegant way. Do we really want these threads in Flink cor

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-21 Thread Becket Qin
Hi Aljoscha, Good point on the potential optimization in the source. One thing to clarify, by "adding a minimumTimestamp()/maximumTimestamp() method pair to the split interface", did you mean "split reader interface"? If so, what should the readers do if they do not have such additional informatio

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-18 Thread Biao Liu
Hi community, Thank you guys for sharing ideas. The thing I really concern is about the thread mode. Actually in Alibaba, we have implemented our "split reader" based source two years ago. That's based on "SourceFunction", it's just an extension not a refactoring. It's almost same with the versio

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-17 Thread Thomas Weise
@Aljoscha to address your question first: In the case of the Kinesis consumer (with current Kinesis consumer API), there would also be N+1 threads. I have implemented a prototype similar to what is shown in Jamie's document, where the thread ownership is similar to what you have done for Kafka. Th

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-16 Thread Piotr Nowojski
Hi Jamie, As it was already covered with my discussion with Becket, there is an easy way to provide blocking API on top of non-blocking API. And yes we both agreed that blocking API is easier to implement by users. I also do not agree with respect to usefulness of non blocking API. Actually Ka

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-15 Thread Jamie Grier
Thanks Aljoscha for getting this effort going! There's been plenty of discussion here already and I'll add my big +1 to making this interface very simple to implement for a new Source/SplitReader. Writing a new production quality connector for Flink is very difficult today and requires a lot of d

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-15 Thread Piotr Nowojski
Hi, One more thing. I think the Kafka client would be a good example of a connector that could use of this `isBlocked()`/callbacks single threaded API from the “Pattern 2” If we have N threads per N splits, there would be no need for the (N+1)th thread. It could be implemented as a non blockin

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-15 Thread Piotr Nowojski
Hi Re: Becket > WRT the confusion between advance() / getCurrent(), do you think it would > help if we combine them and have something like: > > CompletableFuture getNext(); > long getWatermark(); > long getCurrentTimestamp(); I think that technically this would work the same as `CompletableFut

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-15 Thread Steven Wu
> And each split has its own (internal) thread for reading from Kafka and putting messages in an internal queue to pull from. This is similar to how the current Kafka source is implemented, which has a separate fetcher thread. Aljoscha, in kafka case, one split may contain multiple kafka partitio

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-15 Thread Aljoscha Krettek
Hi, I thought I had sent this mail a while ago but I must have forgotten to send it. There is another thing we should consider for splits: the range of timestamps that it can contain. For example, the splits of a file source would know what the minimum and maximum timestamp in the splits is, ro

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-14 Thread Becket Qin
Hi Piotrek, Thanks a lot for the detailed reply. All makes sense to me. WRT the confusion between advance() / getCurrent(), do you think it would help if we combine them and have something like: CompletableFuture getNext(); long getWatermark(); long getCurrentTimestamp(); Cheers, Jiangjie (Bec

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-13 Thread Piotr Nowojski
Hi, Thanks again for the detailed answer :) Sorry for responding with a delay. > Completely agree that in pattern 2, having a callback is necessary for that > single thread outside of the connectors. And the connectors MUST have > internal threads. Yes, this thread will have to exists somewhere.

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-09 Thread Becket Qin
Hi Piotrek, Thanks for the explanation. We are probably talking about the same thing but in different ways. To clarify a little bit, I think there are two patterns to read from a connector. Pattern 1: Thread-less connector with a blocking read API. Outside of the connector, there is one IO thread

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-09 Thread Piotr Nowojski
Hi Good point with select/epoll, however I do not see how they couldn’t be with Flink if we would like single task in Flink to be single-threaded (and I believe we should pursue this goal). If your connector blocks on `select`, then it can not process/handle control messages from Flink, like ch

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-08 Thread Becket Qin
Hi Piotrek, > But I don’t see a reason why we should expose both blocking `take()` and non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or connector) would have to do the same busy > looping anyway and I think it would be better to have a simpler connector API (that would s

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-07 Thread Piotr Nowojski
Hi Becket, With my proposal, both of your examples would have to be solved by the connector and solution to both problems would be the same: Pretend that connector is never blocked (`isBlocked() { return NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion (or semi blocking wit

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-07 Thread Becket Qin
Hi Piotr, I might have misunderstood you proposal. But let me try to explain my concern. I am thinking about the following case: 1. a reader has the following two interfaces, boolean isBlocked() T getNextElement() 2. the implementation of getNextElement() is non-blocking. 3. The reader is

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-07 Thread Piotr Nowojski
Hi, a) > BTW, regarding the isBlock() method, I have a few more questions. 21, Is a > method isReady() with boolean as a return value > equivalent? Personally I found it is a little bit confusing in what is > supposed to be returned when the future is completed. 22. if > the implementation of i

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-06 Thread Becket Qin
Hi Biao, Thanks for the explanation. The current API makes more sense to me now. It basically means: 1. Readers should all be non-blocking 2. The offset advancing and the record fetching are two steps. 3. After each advance() call, the currentRecord, currentTimestamp and watermark will all be upda

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-06 Thread Biao Liu
Regarding the naming style. The advantage of `poll()` style is that basically the name of `poll` means it should be a non-blocking operator, same with `Queue` in Java API. It's easy to understand. We don't need to write too much in docs to imply the implementation should not do something heavy. Ho

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-05 Thread Becket Qin
Thanks for updating the wiki, Aljoscha. The isDone()/advance()/getCurrent() API looks more similar to hasNext()/isNextReady()/getNext(), but implying some different behaviors. If users call getCurrent() twice without calling advance() in between, will they get the same record back? From the API i

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-05 Thread Aljoscha Krettek
I updated the FLIP [1] with some Javadoc for the SplitReader to outline what I had in mind with the interface. Sorry for not doing that earlier, it's not quite clear how the methods should work from the name alone. The gist of it is that advance() should be non-blocking, so isDone/advance()/get

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-05 Thread Biao Liu
Thanks Aljoscha for bringing us this discussion! 1. I think one of the reason about separating `advance()` and `getCurrent()` is that we have several different types returned by source. Not just the `record`, but also the timestamp of record and the watermark. If we don't separate these into diffe

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-04 Thread Becket Qin
Hi Thomas, The iterator-like API was also the first thing that came to me. But it seems a little confusing that hasNext() does not mean "the stream has not ended", but means "the next record is ready", which is repurposing the well known meaning of hasNext(). If we follow the hasNext()/next() patt

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-04 Thread Thomas Weise
Couple more points regarding discovery: The proposal mentions that discovery could be outside the execution graph. Today, discovered partitions/shards are checkpointed. I believe that will also need to be the case in the future, even when discovery and reading are split between different tasks. F

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-04 Thread Thomas Weise
Thanks for getting the ball rolling on this! Can the number of splits decrease? Yes, splits can be closed and go away. An example would be a shard merge in Kinesis (2 existing shards will be closed and replaced with a new shard). Regarding advance/poll/take: IMO the least restrictive approach wou

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-04 Thread Guowei Ma
Hi, Thanks Aljoscha for this FLIP. 1. I agree with Piotr and Becket that the non-blocking source is very important. But in addition to `Future/poll`, there may be another way to achieve this. I think it may be not very memory friendly if every advance call return a Future. public interface Listen

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-03 Thread Becket Qin
Thanks for the explanation, Piotr, I agree that the completable future solution would work for single-threaded readers. From API perspective, returning a completable future means the reader must have an internal thread to complete that future. I was actually thinking of some sources that are "thre

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-02 Thread Piotr Nowojski
Hey Becket, Re 2. With: If source is purely single threaded and blocking, then it could be implemented in the following way: /* * Return a future, which when completed means that source has more data and getNext() will not block. * If you wish to use benefits of non blocking connectors, plea

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-01 Thread Becket Qin
Thanks for the FLIP, Aljoscha. The proposal makes sense to me. Separating the split discovery and consumption is very useful as it enables Flink to better manage the sources. Looking at the interface, I have a few questions: 1. *SplitEnumerator*.*discoverNewSplits()* seems assuming that the numbe

Re: [DISCUSS] FLIP-27: Refactor Source Interface

2018-11-01 Thread Piotr Nowojski
Hi, Thanks Aljoscha for starting this, it’s blocking quite a lot of other possible improvements. I have one proposal. Instead of having a method: boolean advance() throws IOException; I would replace it with /* * Return a future, which when completed means that source has more data and getN