Re: Efficient Batch Operator in Streaming

2016-10-20 Thread Till Rohrmann
Hi Xiaowei,

thanks for sharing this proposal. How would fault tolerance work with the
BatchFunction? Since the batch function seems to manage its own buffer,
users would also have to make sure that in-flight elements which are
buffered but not yet processed are checkpointed, wouldn't they?

Cheers,
Till

On Thu, Oct 20, 2016 at 9:50 AM, Xiaowei Jiang  wrote:

> Very often, it's more efficient to process a batch of records at once
> instead of processing them one by one. We can use window to achieve this
> functionality. However, window will store all records in states, which can
> be costly. It's desirable to have an efficient implementation of batch
> operator. The batch operator works per task and behave similarly to aligned
> windows. Here is an example of how the interface looks like to a user.
>
> interface BatchFunction {
> // add the record to the buffer
> // returns if the batch is ready to be flushed
> boolean addRecord(T record);
>
> // process all pending records in the buffer
> void flush(Collector collector) ;
> }
>
> DataStream ds = ...
> BatchFunction func = ...
> ds.batch(func);
>
> The operator calls addRecord for each record. The batch function saves the
> record in its own buffer. The addRecord returns if the pending buffer
> should be flushed. In that case, the operator invokes flush.
>
> Please share your thoughts. The corresponding JIRA is
> https://issues.apache.org/jira/browse/FLINK-4854
>
> Xiaowei
>


Re: Efficient Batch Operator in Streaming

2016-10-20 Thread Chesnay Schepler
Could you not do the same thing today with a FlatMap function that 
stores incoming elements

and only computes and collects a result when a certain threshold is reached?

On 20.10.2016 09:50, Xiaowei Jiang wrote:

Very often, it's more efficient to process a batch of records at once
instead of processing them one by one. We can use window to achieve this
functionality. However, window will store all records in states, which can
be costly. It's desirable to have an efficient implementation of batch
operator. The batch operator works per task and behave similarly to aligned
windows. Here is an example of how the interface looks like to a user.

interface BatchFunction {
 // add the record to the buffer
 // returns if the batch is ready to be flushed
 boolean addRecord(T record);

 // process all pending records in the buffer
 void flush(Collector collector) ;
}

DataStream ds = ...
BatchFunction func = ...
ds.batch(func);

The operator calls addRecord for each record. The batch function saves the
record in its own buffer. The addRecord returns if the pending buffer
should be flushed. In that case, the operator invokes flush.

Please share your thoughts. The corresponding JIRA is
https://issues.apache.org/jira/browse/FLINK-4854

Xiaowei





Efficient Batch Operator in Streaming

2016-10-20 Thread Xiaowei Jiang
Very often, it's more efficient to process a batch of records at once
instead of processing them one by one. We can use window to achieve this
functionality. However, window will store all records in states, which can
be costly. It's desirable to have an efficient implementation of batch
operator. The batch operator works per task and behave similarly to aligned
windows. Here is an example of how the interface looks like to a user.

interface BatchFunction {
// add the record to the buffer
// returns if the batch is ready to be flushed
boolean addRecord(T record);

// process all pending records in the buffer
void flush(Collector collector) ;
}

DataStream ds = ...
BatchFunction func = ...
ds.batch(func);

The operator calls addRecord for each record. The batch function saves the
record in its own buffer. The addRecord returns if the pending buffer
should be flushed. In that case, the operator invokes flush.

Please share your thoughts. The corresponding JIRA is
https://issues.apache.org/jira/browse/FLINK-4854

Xiaowei


[jira] [Created] (FLINK-4854) Efficient Batch Operator in Streaming

2016-10-18 Thread Xiaowei Jiang (JIRA)
Xiaowei Jiang created FLINK-4854:


 Summary: Efficient Batch Operator in Streaming
 Key: FLINK-4854
 URL: https://issues.apache.org/jira/browse/FLINK-4854
 Project: Flink
  Issue Type: Improvement
Reporter: Xiaowei Jiang
Assignee: MaGuowei


Very often, it's more efficient to process a batch of records at once instead 
of processing them one by one. We can use window to achieve this functionality. 
However, window will store all records in states, which can be costly. It's 
desirable to have an efficient implementation of batch operator. The batch 
operator works per task and behave similarly to aligned windows. Here is an 
example of how the interface looks like to a user.

interface BatchFunction {
// add the record to the buffer
// returns if the batch is ready to be flushed
boolean addRecord(T record);

// process all pending records in the buffer
void flush(Collector collector) ;
}

DataStream ds = ...
BatchFunction func = ...
ds.batch(func);

The operator calls addRecord for each record. The batch function saves the 
record in its own buffer. The addRecord returns if the pending buffer should be 
flushed. In that case, the operator invokes flush.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)