[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17334334#comment-17334334 ] Flink Jira Bot commented on FLINK-3133: --- This issue was marked "stale-assigned" and has not received an update in 7 days. It is now automatically unassigned. If you are still working on it, you can assign it to yourself again. Please also give an update about the status of the work. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 0.10.0, 0.10.1, 1.0.0 >Reporter: Maximilian Michels >Assignee: Evgeny Kincharov >Priority: Major > Labels: stale-assigned > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17323733#comment-17323733 ] Flink Jira Bot commented on FLINK-3133: --- This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: API / DataStream >Affects Versions: 0.10.0, 0.10.1, 1.0.0 >Reporter: Maximilian Michels >Assignee: Evgeny Kincharov >Priority: Major > Labels: stale-assigned > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625564#comment-16625564 ] Aljoscha Krettek commented on FLINK-3133: - I think we can close this issue. Or update it to make it more modern. But this is a complicated topic that requires a thorough design. For example, it should work in container environments, Kubernetes, etc. which I think will be quite tricky. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 0.10.0, 0.10.1, 1.0.0 >Reporter: Maximilian Michels >Assignee: Evgeny Kincharov >Priority: Major > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16625491#comment-16625491 ] Aleksei Izmalkin commented on FLINK-3133: - Hello [~mxm], I want to help with this issue. I read conversation history attentively. The last comment was left on August 24, 2017. It is more than year ago. Is this issue still actual or it would be better to close this and open the new one with appropriate description? > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 0.10.0, 0.10.1, 1.0.0 >Reporter: Maximilian Michels >Assignee: Evgeny Kincharov >Priority: Major > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16140168#comment-16140168 ] Aljoscha Krettek commented on FLINK-3133: - Quick note: I'm not against having async execution, in fact I would really like to have this. I don't think any method that streams back data to the program is feasible. Mostly because of failures. In a lot of tests we introduce artificial failures and ensure that the program still computes the expected result. How would that work when collecting the results of a sink back to the program. In my opinion this would only work if the assertion is verified somewhere in the job, because that would also get restarted in case of failure. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Evgeny Kincharov > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16136684#comment-16136684 ] Gábor Hermann commented on FLINK-3133: -- [~aljoscha], [~kenmy] thanks for the feedback! [~aljoscha] I see the problems with communications between JMs/TMs. I guess it's worth waiting for QS to mature before tackling the issue of collect() etc., because we could reuse that code then. I'm also in favor of keeping the DataStream API as it is, and put the code in a separate place (like DataStreamUtils in contrib). That said, would it make sense to create a sort of collectToList() method for the local mini cluster only? For testing that should suffice. We could put it in the DataStreamUtils. For collecting one DataStream, that's already in place by returning an iterator. I think we could modify the code to allow collecting multiple DataStreams in a List. So, modifying my previous example a bit: {code:java} StreamExecutionEnvironment env = StreamEnvironment.getStreamExecutionEnvironment(); DataStream printSink = env.addSource(..).print(); DataStream otherSink = env.addSource(..).map(..).filter(..).print(); Future> printSinkResults = DataStreamUtils.collectToList(printSink) Future> otherSinkResults = DataStreamUtils.collectToList(otherSink) env.execute(); {code} Or if we don't like to expose Future to the users, we could have something like a {{DataStreamCollector}}: {code:java} ... DataStreamCollector printSinkResults = DataStreamUtils.collectToList(printSink) DataStreamCollector otherSinkResults = DataStreamUtils.collectToList(otherSink) ... {code} and after execution we could do: {code:java} List printSinkList = printSinkResults.getList() {code} An API like `PAssert` could be built on top of that, or users could use this collectToList() API directly for testing. [~kenmy] If we go with this, I wouldn't touch your code, and I could open a separate issue. Like you said, this issue is blocked by the async execution. And I believe we can work around async execution by creating a reference for the result before env.execute() with a sort of "non-complete" reference like a Future or a DataStreamCollector. This should not be hard, but I might be missing something. What do you think? > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Evgeny Kincharov > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16130020#comment-16130020 ] Evgeny Kincharov commented on FLINK-3133: - [~gaborhermann], [~aljoscha] thanks for Interest this. This issue is blocked by FLINK-2313 and FLINK-4272. The main idea is this issue is useless without a possibility of a nonblocking execution. I have implemented a scetch of a nonblocking execution (just for testing this issue) [here|https://github.com/kenmy/flink/tree/FLINK-3133_temp]. But it is not within a scope of this issue. As far as I remember "print" method was worked in my test cases. It's about what was done. As for what should be done I agree with [~aljoscha] the nonblocking execution may made the streaming API too complex. >From this point of view the approach with "Future" is a lot more useful here. [~gaborhermann] if you want you may assign this issue to you and continue work on it. I don't mind. I can rebase my changes to current master if you want to use they. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Evgeny Kincharov > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16123522#comment-16123522 ] Aljoscha Krettek commented on FLINK-3133: - I would be in favour of closing this issue. The DataStream API is already quite big as it is and adding more API surface increases the maintenance burden and also the number of things that can go wrong. Also, the communication between TaskManagers is still a problem and is made even harder by recent changes to how people want to deploy Flink, i.e. deployments in Kubernetes, where you have overlay networks. We would therefore have so solve similar problems as Queryable State and other efforts are currently solving. (There is work to make QS use REST to communicate with JobManager/TaskManager and make it work well with overlay networks). That being said, I would like to have a good testing framework, similar to flink-spector (or flink-spector) as part of Flink. I think a generic framework that does something like {{SavepointMigrationTestBase}} would be very good. In there, I'm using Accumulators to listen for certain conditions being met and then cancel the job. This avoids the dreaded {{SuccessException}} that we have in so many tests. We could also use queryable state to do a similar thing. However, this is still blocked on having asynchronous job submission, which I think will be easier to do once a bit more FLIP-6 related work lands, among other things a new REST-based JobClient (see FLINK-7071, FLINK-7072, and FLINK-7073). For reference, {{PAssert}} from Beam is very nice API for testing. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Evgeny Kincharov > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16123262#comment-16123262 ] Gábor Hermann commented on FLINK-3133: -- What's the status of this? We already have collect() implemented [FLINK-1670|https://issues.apache.org/jira/browse/FLINK-1670], but it starts the execution, so we can only use it for one DataStream. For testing, it would be great to have the output of multiple DataStreams, like the design [~kenmy] proposed. One another design I can imagine: {code:java} StreamExecutionEnvironment env = StreamEnvironment.getStreamExecutionEnvironment(); DataStream printSink = env.addSource(..).print(); DataStream otherSink = env.addSource(..).map(..).filter(..).print(); Future> printSinkResults = printSink.collect() Future> otherSinkResults = otherSink.collect() env.execute(); {code} Where the Futures would complete when the execution is finished. Although, this would require the users to know how to use Futures. Do you know of any other effort for making testing easier (apart from external libraries such as https://github.com/ottogroup/flink-spector)? I see that [~kenmy] has already put effort into this, but I'd also be happy take up this issue if nobody's willing to work on it. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Evgeny Kincharov > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15821899#comment-15821899 ] Evgeny Kincharov commented on FLINK-3133: - Hi [~mxm], I assign this issue to me again. I try to explain how I implemented this and some problems that I met with. 1. I planned to make the new sink that only save records passing through. It is not hard. 2. We need to have the possibility to execute the stream pipeline in the nonblocking mode. You propose add the method executeWithControl() instead the execute(). I know this is described in subtasks of FLINK-4272 but I implemented executeWithControl() in this issue only for debugging the sink from 1 (of course I delete it from here before PR) 3. We need to insert this sink into the pipeline. It is the hardest part of the implementation. If I add this sink after executeWithControl() - addSink doesn't change the pipeline due to some conversion transformations during executeWithControl. The my current state here: https://github.com/kenmy/flink/tree/FLINK-3133_temp The commit is here: https://github.com/kenmy/flink/commit/136d51c79008126ed586f2b50a997b03c84b21b3 As a result, I can't see the simple possibility to add "sample" without changing pipeline before start executing, It may be a problem. Possible solutions (IMHO) : 1 Change the design to {code:java} StreamExecutionEnvironment env = StramEnvironment.getStreamExecutionEnvironment(); DataStream printSink = env.addSource(..).print(); ResultQueryable queryObject = env.executeWithResultQueryable(); List sampled = queryObject.retrieve(printSink, Time.seconds(5)); {code} The main idea is to change the pipeline before the starting execution, not after. {code:java} StreamExecutionEnvironment env = StramEnvironment.getStreamExecutionEnvironment(); Sampler sampler = new Sampler(); DataStream streamData = env.addSource(..).map(..).sample(sampler); JobClient jobClient = env.executeWithControl(); Iterable sampled = sampler(Time.seconds(5)); {code} 2. Don't use sink, use another mechanism to intercept DataStream (like extending DataStream by method getSampler that will return object which allows to enable/disable storing transferred data for any DataStream). IMHO "sink" approach looks more lightweight. What solution do you prefer, I recommend [1] but I may not know all bussiness needs. Or may be you know better solution? Thanks. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Evgeny Kincharov > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15741541#comment-15741541 ] Maximilian Michels commented on FLINK-3133: --- Sure, no problem. I appreciate the discussion :) > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735742#comment-15735742 ] Alexander Shoshin commented on FLINK-3133: -- Sorry, I wanted to take this issue but I am not able to take it at the moment. Nevertheless I am sure our discussion will be very helpfull for the next assignee. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15714701#comment-15714701 ] Maximilian Michels commented on FLINK-3133: --- Sorry, I missed your reply. The sampling method would block for as long as it needs to assemble the output of the last 5 seconds. Note that the time domain needs to be taken into account as well. In processing time 5 seconds are different from event time. The implementation is not blocked on the JobClient. If you want to get started, I would suggest to set up a design document to lay out which features you want to implement and how you would gather data from the TaskManagers. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15695133#comment-15695133 ] Alexander Shoshin commented on FLINK-3133: -- [~mxm], hi. Can you answer my previous question please? It seems that you missed it ) > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Alexander Shoshin > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15676175#comment-15676175 ] Alexander Shoshin commented on FLINK-3133: -- Hi ) Thanks, Maximilian. I think that now I understand this issue. One more question: when we call {{jobClient.sampleStream(..)}}, should we immediatly recieve the result of the last 5 seconds (then what if the job is just started and we have not yet have any results?) or we should block execution for 5 seconds till the result will be accumulated? And I see that {{JobClient}} is going to be implemented in FLINK-4272. Should we wait till it will be merged? > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Alexander Shoshin > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674912#comment-15674912 ] Maximilian Michels commented on FLINK-3133: --- Hi [~AlexanderShoshin]! I've adjusted the description a bit. In general, we face the problem that we have potentially infinite output which we can't store in the same way as count/collect/print works in the DataSet API. The idea here is to sample parts of the stream from the client while the job is executing. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Alexander Shoshin > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3133) Introduce collect()/count()/print() methods in DataStream API
[ https://issues.apache.org/jira/browse/FLINK-3133?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15674899#comment-15674899 ] Maximilian Michels commented on FLINK-3133: --- I've made some improvements in the description. > Introduce collect()/count()/print() methods in DataStream API > - > > Key: FLINK-3133 > URL: https://issues.apache.org/jira/browse/FLINK-3133 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Streaming >Affects Versions: 0.10.0, 1.0.0, 0.10.1 >Reporter: Maximilian Michels >Assignee: Alexander Shoshin > Fix For: 1.0.0 > > > The DataSet API's methods {{collect()}}, {{count()}}, and {{print()}} should > be mirrored to the DataStream API. > The semantics of the calls are different. We need to be able to sample parts > of a stream, e.g. by supplying a time period in the arguments to the methods. > Users should use the {{JobClient}} to retrieve the results. > {code:java} > StreamExecutionEnvironment env = > StramEnvironment.getStreamExecutionEnvironment(); > DataStream streamData = env.addSource(..).map(..); > JobClient jobClient = env.executeWithControl(); > Iterable sampled = jobClient.sampleStream(streamData, > Time.seconds(5)); > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)