Hi all,
As per the discussion on this thread, we can conclude with the following
points.
1) Batch control tuples will need to be handled separately from watermark
tuples.
2) Add support for start batch and stop batch control tuples
3) Add support for reset control tuples to indicate to windowed
I think we should have the support to allow running multiple batches
through the same DAG.
Resetting the watermark to the initial watermark seems like a good idea.
The windowed operator needs to understand the start/end batch control tuple
and reset the watermark.
~ Bhupesh
Usually batches are processed by different instances of a topology. First
of all we should agree that running multiple batches through the same DAG
*in sequence* is something that we want to address.
If yes, then there is the reset problem you are referring to, and it only
occurs when you want to
After some discussion and trying out the approach discussed above, it seems
we would need to separate out the concepts of Watermarks and Batch Control
tuples.
The windowed operator needs to be modified to understand batch control
tuples.
Even if we have watermark tuples which also include batch
public static class Pojo implements Tuple
{
@Override
public Object getValue()
{
return this;
}
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class);
WindowedOperatorImpl
Even this will not work because the output port of CsvParser is of type
Object. Even though Customer extends Tuple, it will still fail to
work since Tuple gets output as Object.
*DefaultOutputPort output = new DefaultOutputPort();*
The input port type at windowed operator with InputT = Object :
Use Object in place of InputT in the WindowedOperatorImpl. Cast Object to the
actual type of InputT at runtime. Introducing an operator just to do a cast is
not a good design decision, IMO.
Thank you,
Vlad
Отправлено с iPhone
> On Apr 29, 2017, at 02:50, AJAY GUPTA
I am using WindowedOperatorImpl and it is declared as follows.
WindowedOperatorImpl windowedOperator
= new WindowedOperatorImpl<>();
In my application scenario, the InputT is Customer POJO which is getting
output as an Object by CsvParser.
Ajay
On Fri,
How do you declare WindowedOperator?
Thank you,
Vlad
On 4/28/17 10:35, AJAY GUPTA wrote:
Vlad,
The approach you suggested doesn't work because the CSVParser outputs
Object Data Type irrespective of the POJO class being emitted.
Ajay
On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov
Vlad,
The approach you suggested doesn't work because the CSVParser outputs
Object Data Type irrespective of the POJO class being emitted.
Ajay
On Fri, Apr 28, 2017 at 8:13 PM, Vlad Rozov wrote:
> Make your POJO class implement WindowedOperator Tuple interface (it
Make your POJO class implement WindowedOperator Tuple interface (it may
return itself in getValue()).
Thank you,
Vlad
On 4/28/17 02:44, AJAY GUPTA wrote:
Hi All,
I am creating an application which is using Windowed Operator. This
application involves CsvParser operator emitting a POJO
Hi All,
I am creating an application which is using Windowed Operator. This
application involves CsvParser operator emitting a POJO object which is to
be passed as input to WindowedOperator. The WindowedOperator requires an
instance of Tuple class as input :
*public final transient
Hi All,
I think we have some agreement on the way we should use control tuples for
File I/O operators to support batch.
In order to have more operators in Malhar, support this paradigm, I think
we should also look at store operators - JDBC, Cassandra, HBase etc.
The case with these operators is
Yes having this type of window not tied to timestamp will work out better.
On Mon, Feb 27, 2017 at 10:58 AM, David Yan wrote:
> I now see your rationale on putting the filename in the window.
> As far as I understand, the reasons why the filename is not part of the key
> and
Hi David,
I went through the discussion, but it seems like it is more on the event
time watermark handling as opposed to batches. What we are trying to do is
have watermarks serve the purpose of demarcating batches using control
tuples. Since each batch is separate from others, we would like to
Hi David,
If using time window does not seem appropriate, we can have another class
which is more suited for such sequential and distinct windows. Perhaps, a
CustomWindow option can be introduced which takes in a window id. The
purpose of this window option could be to translate the window id
I now see your rationale on putting the filename in the window.
As far as I understand, the reasons why the filename is not part of the key
and the Global Window is not used are:
1) The files are processed in sequence, not in parallel
2) The windowed operator should not keep the state associated
On Mon, Feb 27, 2017 at 8:50 AM, Bhupesh Chawda
wrote:
> I think my comments related to count based windows might be causing
> confusion. Let's not discuss count based scenarios for now.
>
> Just want to make sure we are on the same page wrt. the "each file is a
> batch"
I think my comments related to count based windows might be causing
confusion. Let's not discuss count based scenarios for now.
Just want to make sure we are on the same page wrt. the "each file is a
batch" use case. As mentioned by Thomas, the each tuple from the same file
has the same timestamp
I don't think this is a use case for count based window.
We have multiple files that are retrieved in a sequence and there is no
knowledge of the number of records per file. The requirement is to
aggregate each file separately and emit the aggregate when the file is read
fully. There is no
I don't think this is the way to go. Global Window only means the timestamp
does not matter (or that there is no timestamp). It does not necessarily
mean it's a large batch. Unless there is some notion of event time for each
file, you don't want to embed the file into the window itself.
If you
Hi David,
Thanks for your comments.
The wordcount example that I created based on the windowed operator does
processing of word counts per file (each file as a separate batch), i.e.
process counts for each file and dump into separate files.
As I understand Global window is for one large batch;
I'm worried that we are making the watermark concept too complicated.
Watermarks should simply just tell you what windows can be considered
complete.
Point 2 is basically a count-based window. Watermarks do not play a role
here because the window is always complete at the n-th tuple.
If I
>
>
> Understood all but this line:
>
> windowedOperator.setWindowOption(new
> WindowOption.TimeWindows(Duration.millis(2)));
>
> I wonder if there is an option to control this from the source, maybe David
> can take a look?
>
>
It actually does not make sense to have windows at the source.
The
-->
On Thu, Feb 23, 2017 at 12:02 AM, Bhupesh Chawda
wrote:
> Hi Thomas,
>
> My response inline:
>
> On Wed, Feb 22, 2017 at 10:17 PM, Thomas Weise wrote:
>
> > Hi Bhupesh,
> >
> > This looks great. You use the watermark as measure of completeness and
Hi Thomas,
My response inline:
On Wed, Feb 22, 2017 at 10:17 PM, Thomas Weise wrote:
> Hi Bhupesh,
>
> This looks great. You use the watermark as measure of completeness and the
> window to isolate the state, which is how it should work.
>
> Questions/comments:
>
> Why does
Hi Bhupesh,
This looks great. You use the watermark as measure of completeness and the
window to isolate the state, which is how it should work.
Questions/comments:
Why does the count operator have a 2ms window when this should be driven by
the watermark from the input operator?
I don't think
Yes Amol,
Watermarks, in general solve a different issue and we should not mix it
with data association. If needed, the data association can be solved by the
user on the operator/ application layer. The engine should not worry about
it.
Regarding your point on event time based watermarks, I think
Hi Thomas,
Sorry for the delay.
I agree that the watermark concept is general and is understood by
intermediate transformations. File name is some additional information in
the watermark which helps the start and end operators do stuff related to
batch.
As suggested, I have created a wordcount
Bhupesh,
That is true, but in reality watermarks do not solve a design problem in
the DAG where data is getting mixed up. All the watermarks do is to convey
"start" and "end" within the stream. The start and end control tuples
should have the physical operator id, + a monotonically increasing
Thomas,
I believe Bhupesh's proposal is to have a monotonically increasing
watermark and filename as extra information. The usage of "file start" may
have caused confusion. I agree, we do not need explicit "file start"
watermark. I am at loss of words, maybe "start "->"end
"; and then a "final-all
Hi Bhupesh,
I think this needs a generic watermark concept that is independent of
source and destination and can be understood by intermediate
transformations. File names don't meet this criteria.
One possible approach is to have a monotonic increasing file sequence
(instead of time, if it is
Amol, agreed. We can address event time based watermarks once file batch is
done.
Regarding, file batch support: by allowing to partition an input (file)
operator, we are implicitly mixing multiple batches. Even if the user does
not do any transformations, we should be able to write the correct
Hi Thomas,
For an input operator which is supposed to generate watermarks for
downstream operators, I can think about the following watermarks that the
operator can emit:
1. Time based watermarks (the high watermark / low watermark)
2. Number of tuple based watermarks (Every n tuples)
3. File
Thomas,
The watermarks we have in Apex (start-window and end-window) are working
good. It is fine to take a look at event time, but basic file I/O does not
need anything more than start and end. Lets say they are start-something,
end-something. The main difference here is that the tuples are user
I don't think this should be designed based on a simplistic file
input-output scenario. It would be good to include a stateful
transformation based on event time.
More complex pipelines contain stateful transformations that depend on
windowing and watermarks. I think we need a watermark concept
For better understanding the use case for control tuples in batch, I am
creating a prototype for a batch application using File Input and File
Output operators.
To enable basic batch processing for File IO operators, I am proposing the
following changes to File input and output operators:
1.
Yes, this can be part of operator configuration. Given this, for a user to
define a batch application, would mean configuring the connectors (mostly
the input operator) in the application for the desired behavior. Similarly,
there can be other use cases that can be achieved other than batch.
We
The HDFS source can operate in two modes, bounded or unbounded. If you scan
only once, then it should emit the final watermark after it is done.
Otherwise it would emit watermarks based on a policy (files names etc.).
The mechanism to generate the marks may depend on the type of source and
the
Hi Thomas,
I am not sure that I completely understand your suggestion. Are you
suggesting to broaden the scope of the proposal to treat all sources as
bounded as well as unbounded?
In case of Apex, we treat all sources as unbounded sources. Even bounded
sources like HDFS file source is treated
Bhupesh,
Please see how that can be solved in a unified way using windows and
watermarks. It is bounded data vs. unbounded data. In Beam for example, you
can use the "global window" and the final watermark to accomplish what you
are looking for. Batch is just a special case of streaming where the
Yes, if the user needs to develop a batch application, then batch aware
operators need to be used in the application.
The nature of the application is mostly controlled by the input and the
output operators used in the application.
For example, consider an application which needs to filter
Will it make an impression on user that, if he has a batch usecase he has
to use batch aware operators only? If so, is that what we expect? I am not
aware of how do we implement batch scenario so this might be a basic
question.
-Priyanka
On Mon, Jan 16, 2017 at 12:02 PM, Bhupesh Chawda
Hi All,
While design / implementation for custom control tuples is ongoing, I
thought it would be a good idea to consider its usefulness in one of the
use cases - batch applications.
This is a proposal to adapt / extend existing operators in the Apache Apex
Malhar library so that it is easy to
44 matches
Mail list logo