Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-05-17 Thread AJAY GUPTA
Hi all, As per the discussion on this thread, we can conclude with the following points. 1) Batch control tuples will need to be handled separately from watermark tuples. 2) Add support for start batch and stop batch control tuples 3) Add support for reset control tuples to indicate to windowed

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-05-11 Thread Bhupesh Chawda
I think we should have the support to allow running multiple batches through the same DAG. Resetting the watermark to the initial watermark seems like a good idea. The windowed operator needs to understand the start/end batch control tuple and reset the watermark. ~ Bhupesh

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-05-11 Thread Thomas Weise
Usually batches are processed by different instances of a topology. First of all we should agree that running multiple batches through the same DAG *in sequence* is something that we want to address. If yes, then there is the reset problem you are referring to, and it only occurs when you want to

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-05-09 Thread AJAY GUPTA
After some discussion and trying out the approach discussed above, it seems we would need to separate out the concepts of Watermarks and Batch Control tuples. The windowed operator needs to be modified to understand batch control tuples. Even if we have watermark tuples which also include batch

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-04-29 Thread Vlad Rozov
public static class Pojo implements Tuple { @Override public Object getValue() { return this; } } @Override public void populateDAG(DAG dag, Configuration conf) { CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class); WindowedOperatorImpl

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-04-29 Thread AJAY GUPTA
Even this will not work because the output port of CsvParser is of type Object. Even though Customer extends Tuple, it will still fail to work since Tuple gets output as Object. *DefaultOutputPort output = new DefaultOutputPort();* The input port type at windowed operator with InputT = Object :

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-04-29 Thread Vlad Rozov
Use Object in place of InputT in the WindowedOperatorImpl. Cast Object to the actual type of InputT at runtime. Introducing an operator just to do a cast is not a good design decision, IMO. Thank you, Vlad Отправлено с iPhone > On Apr 29, 2017, at 02:50, AJAY GUPTA

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-04-29 Thread AJAY GUPTA
I am using WindowedOperatorImpl and it is declared as follows. WindowedOperatorImpl windowedOperator = new WindowedOperatorImpl<>(); In my application scenario, the InputT is Customer POJO which is getting output as an Object by CsvParser. Ajay On Fri,

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-04-28 Thread Vlad Rozov
How do you declare WindowedOperator? Thank you, Vlad On 4/28/17 10:35, AJAY GUPTA wrote: Vlad, The approach you suggested doesn't work because the CSVParser outputs Object Data Type irrespective of the POJO class being emitted. Ajay On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-04-28 Thread AJAY GUPTA
Vlad, The approach you suggested doesn't work because the CSVParser outputs Object Data Type irrespective of the POJO class being emitted. Ajay On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov wrote: > Make your POJO class implement WindowedOperator Tuple interface (it

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-04-28 Thread Vlad Rozov
Make your POJO class implement WindowedOperator Tuple interface (it may return itself in getValue()). Thank you, Vlad On 4/28/17 02:44, AJAY GUPTA wrote: Hi All, I am creating an application which is using Windowed Operator. This application involves CsvParser operator emitting a POJO

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-04-28 Thread AJAY GUPTA
Hi All, I am creating an application which is using Windowed Operator. This application involves CsvParser operator emitting a POJO object which is to be passed as input to WindowedOperator. The WindowedOperator requires an instance of Tuple class as input : *public final transient

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-03-23 Thread Bhupesh Chawda
Hi All, I think we have some agreement on the way we should use control tuples for File I/O operators to support batch. In order to have more operators in Malhar, support this paradigm, I think we should also look at store operators - JDBC, Cassandra, HBase etc. The case with these operators is

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-03-17 Thread Pramod Immaneni
Yes having this type of window not tied to timestamp will work out better. On Mon, Feb 27, 2017 at 10:58 AM, David Yan wrote: > I now see your rationale on putting the filename in the window. > As far as I understand, the reasons why the filename is not part of the key > and

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-28 Thread Bhupesh Chawda
Hi David, I went through the discussion, but it seems like it is more on the event time watermark handling as opposed to batches. What we are trying to do is have watermarks serve the purpose of demarcating batches using control tuples. Since each batch is separate from others, we would like to

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-28 Thread Bhupesh Chawda
Hi David, If using time window does not seem appropriate, we can have another class which is more suited for such sequential and distinct windows. Perhaps, a CustomWindow option can be introduced which takes in a window id. The purpose of this window option could be to translate the window id

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-27 Thread David Yan
I now see your rationale on putting the filename in the window. As far as I understand, the reasons why the filename is not part of the key and the Global Window is not used are: 1) The files are processed in sequence, not in parallel 2) The windowed operator should not keep the state associated

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-27 Thread Thomas Weise
On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda wrote: > I think my comments related to count based windows might be causing > confusion. Let's not discuss count based scenarios for now. > > Just want to make sure we are on the same page wrt. the "each file is a > batch"

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-27 Thread Bhupesh Chawda
I think my comments related to count based windows might be causing confusion. Let's not discuss count based scenarios for now. Just want to make sure we are on the same page wrt. the "each file is a batch" use case. As mentioned by Thomas, the each tuple from the same file has the same timestamp

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-27 Thread Thomas Weise
I don't think this is a use case for count based window. We have multiple files that are retrieved in a sequence and there is no knowledge of the number of records per file. The requirement is to aggregate each file separately and emit the aggregate when the file is read fully. There is no

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-27 Thread David Yan
I don't think this is the way to go. Global Window only means the timestamp does not matter (or that there is no timestamp). It does not necessarily mean it's a large batch. Unless there is some notion of event time for each file, you don't want to embed the file into the window itself. If you

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-27 Thread Bhupesh Chawda
Hi David, Thanks for your comments. The wordcount example that I created based on the windowed operator does processing of word counts per file (each file as a separate batch), i.e. process counts for each file and dump into separate files. As I understand Global window is for one large batch;

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-26 Thread David Yan
I'm worried that we are making the watermark concept too complicated. Watermarks should simply just tell you what windows can be considered complete. Point 2 is basically a count-based window. Watermarks do not play a role here because the window is always complete at the n-th tuple. If I

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-26 Thread David Yan
> > > Understood all but this line: > > windowedOperator.setWindowOption(new > WindowOption.TimeWindows(Duration.millis(2))); > > I wonder if there is an option to control this from the source, maybe David > can take a look? > > It actually does not make sense to have windows at the source. The

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-24 Thread Thomas Weise
--> On Thu, Feb 23, 2017 at 12:02 AM, Bhupesh Chawda wrote: > Hi Thomas, > > My response inline: > > On Wed, Feb 22, 2017 at 10:17 PM, Thomas Weise wrote: > > > Hi Bhupesh, > > > > This looks great. You use the watermark as measure of completeness and

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-23 Thread Bhupesh Chawda
Hi Thomas, My response inline: On Wed, Feb 22, 2017 at 10:17 PM, Thomas Weise wrote: > Hi Bhupesh, > > This looks great. You use the watermark as measure of completeness and the > window to isolate the state, which is how it should work. > > Questions/comments: > > Why does

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-22 Thread Thomas Weise
Hi Bhupesh, This looks great. You use the watermark as measure of completeness and the window to isolate the state, which is how it should work. Questions/comments: Why does the count operator have a 2ms window when this should be driven by the watermark from the input operator? I don't think

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-22 Thread Bhupesh Chawda
Yes Amol, Watermarks, in general solve a different issue and we should not mix it with data association. If needed, the data association can be solved by the user on the operator/ application layer. The engine should not worry about it. Regarding your point on event time based watermarks, I think

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-22 Thread Bhupesh Chawda
Hi Thomas, Sorry for the delay. I agree that the watermark concept is general and is understood by intermediate transformations. File name is some additional information in the watermark which helps the start and end operators do stuff related to batch. As suggested, I have created a wordcount

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-18 Thread Amol Kekre
Bhupesh, That is true, but in reality watermarks do not solve a design problem in the DAG where data is getting mixed up. All the watermarks do is to convey "start" and "end" within the stream. The start and end control tuples should have the physical operator id, + a monotonically increasing

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-18 Thread Amol Kekre
Thomas, I believe Bhupesh's proposal is to have a monotonically increasing watermark and filename as extra information. The usage of "file start" may have caused confusion. I agree, we do not need explicit "file start" watermark. I am at loss of words, maybe "start "->"end "; and then a "final-all

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-18 Thread Thomas Weise
Hi Bhupesh, I think this needs a generic watermark concept that is independent of source and destination and can be understood by intermediate transformations. File names don't meet this criteria. One possible approach is to have a monotonic increasing file sequence (instead of time, if it is

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-18 Thread Bhupesh Chawda
Amol, agreed. We can address event time based watermarks once file batch is done. Regarding, file batch support: by allowing to partition an input (file) operator, we are implicitly mixing multiple batches. Even if the user does not do any transformations, we should be able to write the correct

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-18 Thread Bhupesh Chawda
Hi Thomas, For an input operator which is supposed to generate watermarks for downstream operators, I can think about the following watermarks that the operator can emit: 1. Time based watermarks (the high watermark / low watermark) 2. Number of tuple based watermarks (Every n tuples) 3. File

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-17 Thread Amol Kekre
Thomas, The watermarks we have in Apex (start-window and end-window) are working good. It is fine to take a look at event time, but basic file I/O does not need anything more than start and end. Lets say they are start-something, end-something. The main difference here is that the tuples are user

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-15 Thread Thomas Weise
I don't think this should be designed based on a simplistic file input-output scenario. It would be good to include a stateful transformation based on event time. More complex pipelines contain stateful transformations that depend on windowing and watermarks. I think we need a watermark concept

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-02-15 Thread Bhupesh Chawda
For better understanding the use case for control tuples in batch, ​I am creating a prototype for a batch application using File Input and File Output operators. To enable basic batch processing for File IO operators, I am proposing the following changes to File input and output operators: 1.

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-01-17 Thread Bhupesh Chawda
Yes, this can be part of operator configuration. Given this, for a user to define a batch application, would mean configuring the connectors (mostly the input operator) in the application for the desired behavior. Similarly, there can be other use cases that can be achieved other than batch. We

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-01-17 Thread Thomas Weise
The HDFS source can operate in two modes, bounded or unbounded. If you scan only once, then it should emit the final watermark after it is done. Otherwise it would emit watermarks based on a policy (files names etc.). The mechanism to generate the marks may depend on the type of source and the

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-01-17 Thread Bhupesh Chawda
Hi Thomas, I am not sure that I completely understand your suggestion. Are you suggesting to broaden the scope of the proposal to treat all sources as bounded as well as unbounded? In case of Apex, we treat all sources as unbounded sources. Even bounded sources like HDFS file source is treated

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-01-16 Thread Thomas Weise
Bhupesh, Please see how that can be solved in a unified way using windows and watermarks. It is bounded data vs. unbounded data. In Beam for example, you can use the "global window" and the final watermark to accomplish what you are looking for. Batch is just a special case of streaming where the

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-01-16 Thread Bhupesh Chawda
Yes, if the user needs to develop a batch application, then batch aware operators need to be used in the application. The nature of the application is mostly controlled by the input and the output operators used in the application. For example, consider an application which needs to filter

Re: [DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-01-16 Thread Priyanka Gugale
Will it make an impression on user that, if he has a batch usecase he has to use batch aware operators only? If so, is that what we expect? I am not aware of how do we implement batch scenario so this might be a basic question. -Priyanka On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh Chawda

[DISCUSS] Proposal for adapting Malhar operators for batch use cases

2017-01-15 Thread Bhupesh Chawda
Hi All, While design / implementation for custom control tuples is ongoing, I thought it would be a good idea to consider its usefulness in one of the use cases - batch applications. This is a proposal to adapt / extend existing operators in the Apache Apex Malhar library so that it is easy to