[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352095#comment-15352095
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r68678627
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem in
+ * parallel (without ordering guarantees between tuples). Records can be
+ * delimited (e.g. newline) or fixed width records. Output tuples are 
byte[].
+ * 
+ * Typically, this operator will be connected to output of 
FileSplitterInput to
+ * read records in parallel.
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReader extends FSSliceReader
+{
+  /**
+   * Record reader mode decides how to split the records.
+   */
+  public static enum RECORD_READER_MODE
+  {
+DELIMITED_RECORD, FIXED_WIDTH_RECORD;
+  }
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+
+  /**
+   * Port to emit individual records/tuples as byte[]
+   */
+  public final transient DefaultOutputPort records = new 
DefaultOutputPort();
+
+  /**
+   * Initialize appropriate reader context based on mode selection
+   */
+  @Override
+  public void setup(OperatorContext context)
+  {
+super.setup(context);
+if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) {
+  ReaderContext.FixedBytesReaderContext 
fixedBytesReaderContext = new 
ReaderContext.FixedBytesReaderContext();
+  fixedBytesReaderContext.setLength(recordLength);
+  readerContext = fixedBytesReaderContext;
+} else {
+  readerContext = new 
ReaderContext.ReadAheadLineReaderContext();
+}
+  }
+
+  /**
+   * Read the block data and emit records based on reader context
+   *
+   * @param blockMetadata
+   *  block
+   * @throws IOException
+   */
+  protected void readBlock(BlockMetadata blockMetadata) throws IOException
+  {
+readerContext.initialize(stream, blockMetadata, consecutiveBlock);
+ReaderContext.Entity entity;
+while ((entity = readerContext.next()) != null) {
+
+  
counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
+
+  byte[] record = entity.getRecord();
+
+  if (record != null) {
+counters.getCounter(ReaderCounterKeys.RECORDS).increment();
+records.emit(record);
+  }
+}
+  }
+
+  /**
+   * Criteria for record split
+   * 
+   * @param mode
+   *  Mode
+   */
+  public void setMode(RECORD_READER_MODE mode)
+  {
+this.mode = mode;
+  }
+
+  /**
+   * Criteria for record split
+   * 
+   * @return mode
+   */
+  public RECORD_READER_MODE getMode()
+  {
+return mode;
+  }
+
+  /**
+   * Length for fixed width record
+   * 
+   * @param 

[GitHub] apex-malhar pull request #326: APEXMALHAR-2116 Added FS record reader operat...

2016-06-27 Thread amberarrow
Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r68678627
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem in
+ * parallel (without ordering guarantees between tuples). Records can be
+ * delimited (e.g. newline) or fixed width records. Output tuples are 
byte[].
+ * 
+ * Typically, this operator will be connected to output of 
FileSplitterInput to
+ * read records in parallel.
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReader extends FSSliceReader
+{
+  /**
+   * Record reader mode decides how to split the records.
+   */
+  public static enum RECORD_READER_MODE
+  {
+DELIMITED_RECORD, FIXED_WIDTH_RECORD;
+  }
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+
+  /**
+   * Port to emit individual records/tuples as byte[]
+   */
+  public final transient DefaultOutputPort records = new 
DefaultOutputPort();
+
+  /**
+   * Initialize appropriate reader context based on mode selection
+   */
+  @Override
+  public void setup(OperatorContext context)
+  {
+super.setup(context);
+if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) {
+  ReaderContext.FixedBytesReaderContext 
fixedBytesReaderContext = new 
ReaderContext.FixedBytesReaderContext();
+  fixedBytesReaderContext.setLength(recordLength);
+  readerContext = fixedBytesReaderContext;
+} else {
+  readerContext = new 
ReaderContext.ReadAheadLineReaderContext();
+}
+  }
+
+  /**
+   * Read the block data and emit records based on reader context
+   *
+   * @param blockMetadata
+   *  block
+   * @throws IOException
+   */
+  protected void readBlock(BlockMetadata blockMetadata) throws IOException
+  {
+readerContext.initialize(stream, blockMetadata, consecutiveBlock);
+ReaderContext.Entity entity;
+while ((entity = readerContext.next()) != null) {
+
+  
counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
+
+  byte[] record = entity.getRecord();
+
+  if (record != null) {
+counters.getCounter(ReaderCounterKeys.RECORDS).increment();
+records.emit(record);
+  }
+}
+  }
+
+  /**
+   * Criteria for record split
+   * 
+   * @param mode
+   *  Mode
+   */
+  public void setMode(RECORD_READER_MODE mode)
+  {
+this.mode = mode;
+  }
+
+  /**
+   * Criteria for record split
+   * 
+   * @return mode
+   */
+  public RECORD_READER_MODE getMode()
+  {
+return mode;
+  }
+
+  /**
+   * Length for fixed width record
+   * 
+   * @param recordLength
+   */
+  public void setRecordLength(int recordLength)
+  {
+this.recordLength = recordLength;
+  }
+
+  /**
+   * Length for fixed width record
+   * 
+   * @return record length
+   */
+  

[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352075#comment-15352075
 ] 

ASF GitHub Bot commented on APEXMALHAR-2085:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68677141
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
 ---
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.ValidationException;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedOperator;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is the abstract windowed operator class that implements most of 
the windowing, triggering, and accumulating
+ * concepts. The subclass of this abstract class is supposed to provide 
the implementation of how the accumulated
+ * values are stored in the storage.
+ *
+ * @param  The type of the input tuple
+ * @param  The type of the output tuple
+ * @param  The type of the data storage
+ * @param  The type of the accumulation
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractWindowedOperator
+extends BaseOperator implements WindowedOperator
+{
+
+  protected WindowOption windowOption;
+  protected TriggerOption triggerOption;
+  protected long allowedLatenessMillis = -1;
+  protected WindowedStorage windowStateMap;
+
+  private Function timestampExtractor;
+
+  private long currentWatermark;
+  private boolean triggerAtWatermark;
+  private long earlyTriggerCount;
+  private long earlyTriggerMillis;
+  private long lateTriggerCount;
+  private long lateTriggerMillis;
+  private long currentDerivedTimestamp = -1;
+  private long windowWidthMillis;
+  protected DataStorageT dataStorage;
+  protected DataStorageT retractionStorage;
+  protected AccumulationT accumulation;
+
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractWindowedOperator.class);
+
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public void process(Tuple tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  // TODO: This port should be removed when Apex Core has native support 
for custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort controlInput = new 
DefaultInputPort()
+  {
+@Override
+public void process(ControlTuple tuple)
+{
+  if (tuple instanceof ControlTuple.Watermark) {
+processWatermark((ControlTuple.Watermark)tuple);
+  }
+}
   

Re: [DISCUSSION] Custom Control Tuples

2016-06-27 Thread Thomas Weise
The windowing we discuss here is in general event time based, arrival time
is a special case of it.

I don't think state changes can be made independent of the streaming window
boundary as it would prevent idempotent processing and transitively exactly
once. For that to work, tuples need to be presented to the operator in a
guaranteed order *within* the streaming window, which is not possible with
multiple ports (and partitions).

Thomas

On Mon, Jun 27, 2016 at 2:53 PM, David Yan  wrote:

> I think for session tracking, if the session boundaries are allowed to be
> not aligned with the streaming window boundaries, the user will have a much
> bigger problem with idempotency. And in most cases, session tracking is
> event time based, not ingression time or processing time based, so this may
> never be a problem. But if that ever happens, the user can always alter the
> default 500ms width.
>
> David
>
> On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov 
> wrote:
>
> > Ability to send custom control tuples within window may be useful, for
> > example, for sessions tracking, where session boundaries are not aligned
> > with window boundaries and 500 ms latency is not acceptable for an
> > application.
> >
> > Thank you,
> >
> > Vlad
> >
> >
> > On 6/25/16 10:52, Thomas Weise wrote:
> >
> >> It should not matter from where the control tuple is triggered. It will
> be
> >> good to have a generic mechanism to propagate it and other things can be
> >> accomplished outside the engine. For example, the new comprehensive
> >> support
> >> for windowing will all be in Malhar, nothing that the engine needs to
> know
> >> about it except that we need the control tuple for watermark propagation
> >> and idempotent processing.
> >>
> >> I also think the main difference to other tuples is the need to send it
> to
> >> all partitions. Which is similar to checkpoint window tuples, but not
> the
> >> same. Here, we probably also need the ability for the user to control
> >> whether such tuple should traverse the entire DAG or not. For a batch
> use
> >> case, for example, we may want to send the end of file to the next
> >> operator, but not beyond, if the operator has asynchronous processing
> >> logic
> >> in it.
> >>
> >> For any logic to be idempotent, the control tuple needs to be processed
> at
> >> a window boundary. Receiving the control tuple in the window callback
> >> would
> >> avoid having to track extra state in the operator. I don't think that's
> a
> >> major issue, but what is the use case for processing a control tuple
> >> within
> >> the window?
> >>
> >> Thomas
> >>
> >>
> >>
> >> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni <
> pra...@datatorrent.com>
> >> wrote:
> >>
> >> For the use cases you mentioned, I think 1) and 2) are more likely to
> >>> be controlled directly by the application, 3) and 4) are more likely
> >>> going to be triggered externally and directly handled by the engine
> >>> and 3) is already being implemented that way (apexcore-163).
> >>>
> >>> The control tuples emitted by an operator would be sent to all
> >>> downstream partitions isn't it and that would be the chief distinction
> >>> compared to data (apart from the payload) which would get partitioned
> >>> under normal circumstances. It would also be guaranteed that
> >>> downstream partitions will receive control tuples only after the data
> >>> that was sent before it so we could send it immediately when it is
> >>> emitted as opposed to window boundaries.
> >>>
> >>> However during unification it is important to know if these control
> >>> tuples have been received from all upstream partitions before
> >>> proceeding with a control operation. One could wait till end of the
> >>> window but that introduces a delay however small, I would like to add
> >>> to the proposal that the platform only hand over the control tuple to
> >>> the unifier when it has been received from all upstream partitions
> >>> much like how end window is processed but not wait till the actual end
> >>> of the window.
> >>>
> >>> Regd your concern about idempotency, we typically care about
> >>> idempotency at a window level and doing the above will still allow the
> >>> operators to preserve that easily.
> >>>
> >>> Thanks
> >>>
> >>> On Jun 24, 2016, at 11:22 AM, David Yan  wrote:
> 
>  Hi all,
> 
>  I would like to propose a new feature to the Apex core engine -- the
>  support of custom control tuples. Currently, we have control tuples
> such
> 
> >>> as
> >>>
>  BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the
>  support for applications to insert their own control tuples. The way
>  currently to get around this is to use data tuples and have a separate
> 
> >>> port
> >>>
>  for such tuples that sends tuples to all partitions of the downstream
>  operators, which is not exactly developer friendly.
> 
> 

[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352055#comment-15352055
 ] 

ASF GitHub Bot commented on APEXMALHAR-2085:


Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68675191
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/Window.java ---
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This interface describes the individual window.
+ */
+@InterfaceStability.Evolving
+public interface Window
--- End diff --

Session window can be extended, and two session windows can be merged into 
one.


> Implement Windowed Operators
> 
>
> Key: APEXMALHAR-2085
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Siyuan Hua
>Assignee: David Yan
>
> As per our recent several discussions in the community. A group of Windowed 
> Operators that delivers the window semantic follows the google Data Flow 
> model(https://cloud.google.com/dataflow/) is very important. 
> The operators should be designed and implemented in a way for 
> High-level API
> Beam translation
> Easy to use with other popular operator
> {panel:title=Operator Hierarchy}
> Hierarchy of the operators,
> The windowed operators should cover all possible transformations that require 
> window, and batch processing is also considered as special window called 
> global window
> {code}
>+---+
>+-> |  WindowedOperator | <+
>|   ++--+  |
>|^  ^+
>|| | |
>|| | |
> +--+++--+--+  +---+-++--+-+
> |CombineOperator||GroupOperator|  |KeyedOperator||JoinOperator|
> +---++-+  +--+--++-+--+
>+-^   ^ ^
>| | |
>   ++---+   +-++   +++
>   |KeyedCombine|   |KeyedGroup|   | CoGroup |
>   ++   +--+   +-+
> {code}
> Combine operation includes all operations that combine all tuples in one 
> window into one or small number of tuples, Group operation group all tuples 
> in one window, Join and CoGroup are used to join and group tuples from 
> different inputs.
> {panel}
> {panel:title=Components}
> * Window Component
> It includes configuration, window state that should be checkpointed, etc. It 
> should support NonMergibleWindow(fixed or slide) MergibleWindow(Session)
> * Trigger
> It should support early trigger, late trigger with customizable trigger 
> behaviour 
> * Other related components:
> ** Watermark generator, can be plugged into input source to generate watermark
> ** Tuple schema support:
> It should handle either predefined tuple type or give a declarative API to 
> describe the user defined tuple class
> {panel}
> Most component API should be reused in High-Level API
> This is the umbrella ticket, separate tickets would be created for different 
> components and operators respectively 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...

2016-06-27 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68675229
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java ---
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * This class describes how triggers should be fired for each window.
+ * For each window, a trigger can be fired before the watermark (EARLY), 
at the watermark (ON_TIME), or after the watermark (LATE).
+ * If a LATE trigger is specified and the accumulation mode is 
ACCUMULATING, it is important for the WindowOption to
+ * specify the allowed lateness because otherwise, all states must be kept 
in storage.
+ *
+ */
+@InterfaceStability.Evolving
+public class TriggerOption
+{
+
+  public enum AccumulationMode
+  {
+DISCARDING,
+ACCUMULATING,
+ACCUMULATING_AND_RETRACTING
+  }
+
+  private AccumulationMode accumulationMode = AccumulationMode.DISCARDING;
+  private boolean firingOnlyUpdatedPanes = false;
+
+  /**
+   * Whether the trigger should be fired before the watermark, at the 
watermark, or after the watermark
+   */
+  public enum WatermarkOpt
+  {
+EARLY,
--- End diff --

We can. @siyuanh any problem with that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...

2016-06-27 Thread davidyan74
Github user davidyan74 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68675191
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/Window.java ---
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This interface describes the individual window.
+ */
+@InterfaceStability.Evolving
+public interface Window
--- End diff --

Session window can be extended, and two session windows can be merged into 
one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2126) Suggest: Share Slice Buffer

2016-06-27 Thread Munagala V. Ramanath (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351984#comment-15351984
 ] 

Munagala V. Ramanath commented on APEXMALHAR-2126:
--

+1 for this idea


> Suggest: Share Slice Buffer
> ---
>
> Key: APEXMALHAR-2126
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2126
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>
> I think the intention of Slice(com.datatorrent.netlet.util.Slice) was to 
> share the buffer and avoid unnecessary memory allocation/deallocation. But 
> the intension is not self-explain and lack of method to share the memory. And 
> the util class org.apache.apex.malhar.lib.utils.serde.SliceUtils also create 
> new memory and copy the data.
> I suggest to implement another class(Say BufferSlice), which 
> - initialize buffer with relative large buffer
> - support append(byte[] data, int offset, int length)
> - dynamic reallocated buffer or throw exception when buffer is full ( based 
> on the management strategy)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2126) Suggest: Share Slice Buffer

2016-06-27 Thread bright chen (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351979#comment-15351979
 ] 

bright chen commented on APEXMALHAR-2126:
-

let me take endWindow() of 
org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl as an 
example.

  public void endWindow()
  {
for (K key: cache.getChangedKeys()) {
  store.put(this.bucket, SliceUtils.concatenate(identifier, 
serdeKey.serialize(key)),
  SliceUtils.concatenate(identifier, 
serdeValue.serialize(cache.get(key;
}
…
  }

  public static Slice concatenate(byte[] a, Slice b)
  {
int size = a.length + b.length;
byte[] bytes = new byte[size];

System.arraycopy(a, 0, bytes, 0, a.length);
System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);

return new Slice(bytes);
  }

so, each time call concatenate(), a new memory block allocated.

But with new approach, these can be avoided
  private transient BufferSlice slice = new BufferSlice(SIZE);
  public void endWindow()
  {
slice.clear();
for (K key: cache.getChangedKeys()) {
  store.put(this.bucket, 
slice.append(identifier).append(serdeKey.serialize(key)).endSlice(),
  
slice.append(identifier).append(serdeValue.serialize(cache.get(key))).endSlice());
}
…
  }

The class can be implemented like following:
public class BufferSlice
{
  protected byte[] buffer;
  protected int offset;
  protected int length;
  
  public BufferSlice(int size)
  {
buffer = new byte[size];
  }
  
  public void append(byte[] data)
  {
//TODO: handle over sized
System.arraycopy(buffer, offset + length, data, 0, data.length);
length += data.length;
  }
  
  public Slice endSlice()
  {
Slice slice = new Slice(buffer, offset, length);
offset += length;
length = 0;
return slice;
  }
}


> Suggest: Share Slice Buffer
> ---
>
> Key: APEXMALHAR-2126
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2126
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>
> I think the intention of Slice(com.datatorrent.netlet.util.Slice) was to 
> share the buffer and avoid unnecessary memory allocation/deallocation. But 
> the intension is not self-explain and lack of method to share the memory. And 
> the util class org.apache.apex.malhar.lib.utils.serde.SliceUtils also create 
> new memory and copy the data.
> I suggest to implement another class(Say BufferSlice), which 
> - initialize buffer with relative large buffer
> - support append(byte[] data, int offset, int length)
> - dynamic reallocated buffer or throw exception when buffer is full ( based 
> on the management strategy)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [DISCUSSION] Custom Control Tuples

2016-06-27 Thread Chinmay Kolhatkar
I hope I'm not commenting too late on this thread.

>From the above discussion, it looks like the requirement is to have a
custom tuple which has following 2 capabilities to influence streaming
engine on:
1. When to send it (between windows/within window)
2. Where to send it (all partitions, some partition(s), one partition etc)

I think if we design the custom tuple keeping in mind above 2 capabilities,
user can have all the flexibility to make right use as per need.

Thoughts?

- Chinmay.




On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov  wrote:

> Ability to send custom control tuples within window may be useful, for
> example, for sessions tracking, where session boundaries are not aligned
> with window boundaries and 500 ms latency is not acceptable for an
> application.
>
> Thank you,
>
> Vlad
>
>
> On 6/25/16 10:52, Thomas Weise wrote:
>
>> It should not matter from where the control tuple is triggered. It will be
>> good to have a generic mechanism to propagate it and other things can be
>> accomplished outside the engine. For example, the new comprehensive
>> support
>> for windowing will all be in Malhar, nothing that the engine needs to know
>> about it except that we need the control tuple for watermark propagation
>> and idempotent processing.
>>
>> I also think the main difference to other tuples is the need to send it to
>> all partitions. Which is similar to checkpoint window tuples, but not the
>> same. Here, we probably also need the ability for the user to control
>> whether such tuple should traverse the entire DAG or not. For a batch use
>> case, for example, we may want to send the end of file to the next
>> operator, but not beyond, if the operator has asynchronous processing
>> logic
>> in it.
>>
>> For any logic to be idempotent, the control tuple needs to be processed at
>> a window boundary. Receiving the control tuple in the window callback
>> would
>> avoid having to track extra state in the operator. I don't think that's a
>> major issue, but what is the use case for processing a control tuple
>> within
>> the window?
>>
>> Thomas
>>
>>
>>
>> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni 
>> wrote:
>>
>> For the use cases you mentioned, I think 1) and 2) are more likely to
>>> be controlled directly by the application, 3) and 4) are more likely
>>> going to be triggered externally and directly handled by the engine
>>> and 3) is already being implemented that way (apexcore-163).
>>>
>>> The control tuples emitted by an operator would be sent to all
>>> downstream partitions isn't it and that would be the chief distinction
>>> compared to data (apart from the payload) which would get partitioned
>>> under normal circumstances. It would also be guaranteed that
>>> downstream partitions will receive control tuples only after the data
>>> that was sent before it so we could send it immediately when it is
>>> emitted as opposed to window boundaries.
>>>
>>> However during unification it is important to know if these control
>>> tuples have been received from all upstream partitions before
>>> proceeding with a control operation. One could wait till end of the
>>> window but that introduces a delay however small, I would like to add
>>> to the proposal that the platform only hand over the control tuple to
>>> the unifier when it has been received from all upstream partitions
>>> much like how end window is processed but not wait till the actual end
>>> of the window.
>>>
>>> Regd your concern about idempotency, we typically care about
>>> idempotency at a window level and doing the above will still allow the
>>> operators to preserve that easily.
>>>
>>> Thanks
>>>
>>> On Jun 24, 2016, at 11:22 AM, David Yan  wrote:

 Hi all,

 I would like to propose a new feature to the Apex core engine -- the
 support of custom control tuples. Currently, we have control tuples such

>>> as
>>>
 BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the
 support for applications to insert their own control tuples. The way
 currently to get around this is to use data tuples and have a separate

>>> port
>>>
 for such tuples that sends tuples to all partitions of the downstream
 operators, which is not exactly developer friendly.

 We have already seen a number of use cases that can use this feature:

 1) Batch support: We need to tell all operators of the physical DAG when

>>> a
>>>
 batch starts and ends, so the operators can do whatever that is needed

>>> upon
>>>
 the start or the end of a batch.

 2) Watermark: To support the concepts of event time windowing, the
 watermark control tuple is needed to tell which windows should be
 considered late.

 3) Changing operator properties: We do have the support of changing
 operator properties on the fly, but with a custom control tuple, the
 command to 

Re: [DISCUSSION] Custom Control Tuples

2016-06-27 Thread Vlad Rozov
Ability to send custom control tuples within window may be useful, for 
example, for sessions tracking, where session boundaries are not aligned 
with window boundaries and 500 ms latency is not acceptable for an 
application.


Thank you,

Vlad

On 6/25/16 10:52, Thomas Weise wrote:

It should not matter from where the control tuple is triggered. It will be
good to have a generic mechanism to propagate it and other things can be
accomplished outside the engine. For example, the new comprehensive support
for windowing will all be in Malhar, nothing that the engine needs to know
about it except that we need the control tuple for watermark propagation
and idempotent processing.

I also think the main difference to other tuples is the need to send it to
all partitions. Which is similar to checkpoint window tuples, but not the
same. Here, we probably also need the ability for the user to control
whether such tuple should traverse the entire DAG or not. For a batch use
case, for example, we may want to send the end of file to the next
operator, but not beyond, if the operator has asynchronous processing logic
in it.

For any logic to be idempotent, the control tuple needs to be processed at
a window boundary. Receiving the control tuple in the window callback would
avoid having to track extra state in the operator. I don't think that's a
major issue, but what is the use case for processing a control tuple within
the window?

Thomas



On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni 
wrote:


For the use cases you mentioned, I think 1) and 2) are more likely to
be controlled directly by the application, 3) and 4) are more likely
going to be triggered externally and directly handled by the engine
and 3) is already being implemented that way (apexcore-163).

The control tuples emitted by an operator would be sent to all
downstream partitions isn't it and that would be the chief distinction
compared to data (apart from the payload) which would get partitioned
under normal circumstances. It would also be guaranteed that
downstream partitions will receive control tuples only after the data
that was sent before it so we could send it immediately when it is
emitted as opposed to window boundaries.

However during unification it is important to know if these control
tuples have been received from all upstream partitions before
proceeding with a control operation. One could wait till end of the
window but that introduces a delay however small, I would like to add
to the proposal that the platform only hand over the control tuple to
the unifier when it has been received from all upstream partitions
much like how end window is processed but not wait till the actual end
of the window.

Regd your concern about idempotency, we typically care about
idempotency at a window level and doing the above will still allow the
operators to preserve that easily.

Thanks


On Jun 24, 2016, at 11:22 AM, David Yan  wrote:

Hi all,

I would like to propose a new feature to the Apex core engine -- the
support of custom control tuples. Currently, we have control tuples such

as

BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the
support for applications to insert their own control tuples. The way
currently to get around this is to use data tuples and have a separate

port

for such tuples that sends tuples to all partitions of the downstream
operators, which is not exactly developer friendly.

We have already seen a number of use cases that can use this feature:

1) Batch support: We need to tell all operators of the physical DAG when

a

batch starts and ends, so the operators can do whatever that is needed

upon

the start or the end of a batch.

2) Watermark: To support the concepts of event time windowing, the
watermark control tuple is needed to tell which windows should be
considered late.

3) Changing operator properties: We do have the support of changing
operator properties on the fly, but with a custom control tuple, the
command to change operator properties can be window aligned for all
partitions and also across the DAG.

4) Recording tuples: Like changing operator properties, we do have this
support now but only at the individual physical operator level, and

without

control of which window to record tuples for. With a custom control

tuple,

because a control tuple must belong to a window, all operators in the DAG
can start (and stop) recording for the same windows.

I can think of two options to achieve this:

1) new custom control tuple type that takes user's serializable object.

2) piggy back the current BEGIN_WINDOW and END_WINDOW control tuples.

Please provide your feedback. Thank you.

David




[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351837#comment-15351837
 ] 

ASF GitHub Bot commented on APEXMALHAR-2085:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68658531
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
 ---
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.ValidationException;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedOperator;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is the abstract windowed operator class that implements most of 
the windowing, triggering, and accumulating
+ * concepts. The subclass of this abstract class is supposed to provide 
the implementation of how the accumulated
+ * values are stored in the storage.
+ *
+ * @param  The type of the input tuple
+ * @param  The type of the output tuple
+ * @param  The type of the data storage
+ * @param  The type of the accumulation
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractWindowedOperator
+extends BaseOperator implements WindowedOperator
+{
+
+  protected WindowOption windowOption;
+  protected TriggerOption triggerOption;
+  protected long allowedLatenessMillis = -1;
+  protected WindowedStorage windowStateMap;
+
+  private Function timestampExtractor;
+
+  private long currentWatermark;
+  private boolean triggerAtWatermark;
+  private long earlyTriggerCount;
+  private long earlyTriggerMillis;
+  private long lateTriggerCount;
+  private long lateTriggerMillis;
+  private long currentDerivedTimestamp = -1;
+  private long windowWidthMillis;
+  protected DataStorageT dataStorage;
+  protected DataStorageT retractionStorage;
+  protected AccumulationT accumulation;
+
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractWindowedOperator.class);
+
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public void process(Tuple tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  // TODO: This port should be removed when Apex Core has native support 
for custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort controlInput = new 
DefaultInputPort()
+  {
+@Override
+public void process(ControlTuple tuple)
+{
+  if (tuple instanceof ControlTuple.Watermark) {
+processWatermark((ControlTuple.Watermark)tuple);
+  }
+}
   

[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351851#comment-15351851
 ] 

ASF GitHub Bot commented on APEXMALHAR-2085:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68659163
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java ---
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * This class describes how triggers should be fired for each window.
+ * For each window, a trigger can be fired before the watermark (EARLY), 
at the watermark (ON_TIME), or after the watermark (LATE).
+ * If a LATE trigger is specified and the accumulation mode is 
ACCUMULATING, it is important for the WindowOption to
+ * specify the allowed lateness because otherwise, all states must be kept 
in storage.
+ *
+ */
+@InterfaceStability.Evolving
+public class TriggerOption
+{
+
+  public enum AccumulationMode
+  {
+DISCARDING,
+ACCUMULATING,
+ACCUMULATING_AND_RETRACTING
+  }
+
+  private AccumulationMode accumulationMode = AccumulationMode.DISCARDING;
+  private boolean firingOnlyUpdatedPanes = false;
+
+  /**
+   * Whether the trigger should be fired before the watermark, at the 
watermark, or after the watermark
+   */
+  public enum WatermarkOpt
+  {
+EARLY,
--- End diff --

If I understand correctly from bream model, EARLY, ON-TIME & LATE are the 
attributes of data and/or of the results to be send to the downstream.

Its not the attribute of a trigger. A trigger is an action which sends 
updated to downstream. The data containing that should be marked late, early or 
on-time.

Shouldn't this enum be moved out as its not specific to trigger?

Also the terminology of WatermarkOpt is confusion. As I said before these 
are not the attribute of watermark or trigger.


> Implement Windowed Operators
> 
>
> Key: APEXMALHAR-2085
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Siyuan Hua
>Assignee: David Yan
>
> As per our recent several discussions in the community. A group of Windowed 
> Operators that delivers the window semantic follows the google Data Flow 
> model(https://cloud.google.com/dataflow/) is very important. 
> The operators should be designed and implemented in a way for 
> High-level API
> Beam translation
> Easy to use with other popular operator
> {panel:title=Operator Hierarchy}
> Hierarchy of the operators,
> The windowed operators should cover all possible transformations that require 
> window, and batch processing is also considered as special window called 
> global window
> {code}
>+---+
>+-> |  WindowedOperator | <+
>|   ++--+  |
>|^  ^+
>|| | |
>|| | |
> +--+++--+--+  +---+-++--+-+
> |CombineOperator||GroupOperator|  |KeyedOperator||JoinOperator|
> +---++-+  +--+--++-+--+
>+-^   ^ ^
>  

[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...

2016-06-27 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68659163
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java ---
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.joda.time.Duration;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
+/**
+ * This class describes how triggers should be fired for each window.
+ * For each window, a trigger can be fired before the watermark (EARLY), 
at the watermark (ON_TIME), or after the watermark (LATE).
+ * If a LATE trigger is specified and the accumulation mode is 
ACCUMULATING, it is important for the WindowOption to
+ * specify the allowed lateness because otherwise, all states must be kept 
in storage.
+ *
+ */
+@InterfaceStability.Evolving
+public class TriggerOption
+{
+
+  public enum AccumulationMode
+  {
+DISCARDING,
+ACCUMULATING,
+ACCUMULATING_AND_RETRACTING
+  }
+
+  private AccumulationMode accumulationMode = AccumulationMode.DISCARDING;
+  private boolean firingOnlyUpdatedPanes = false;
+
+  /**
+   * Whether the trigger should be fired before the watermark, at the 
watermark, or after the watermark
+   */
+  public enum WatermarkOpt
+  {
+EARLY,
--- End diff --

If I understand correctly from bream model, EARLY, ON-TIME & LATE are the 
attributes of data and/or of the results to be send to the downstream.

Its not the attribute of a trigger. A trigger is an action which sends 
updated to downstream. The data containing that should be marked late, early or 
on-time.

Shouldn't this enum be moved out as its not specific to trigger?

Also the terminology of WatermarkOpt is confusion. As I said before these 
are not the attribute of watermark or trigger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...

2016-06-27 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68658531
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
 ---
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.ValidationException;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedOperator;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is the abstract windowed operator class that implements most of 
the windowing, triggering, and accumulating
+ * concepts. The subclass of this abstract class is supposed to provide 
the implementation of how the accumulated
+ * values are stored in the storage.
+ *
+ * @param  The type of the input tuple
+ * @param  The type of the output tuple
+ * @param  The type of the data storage
+ * @param  The type of the accumulation
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractWindowedOperator
+extends BaseOperator implements WindowedOperator
+{
+
+  protected WindowOption windowOption;
+  protected TriggerOption triggerOption;
+  protected long allowedLatenessMillis = -1;
+  protected WindowedStorage windowStateMap;
+
+  private Function timestampExtractor;
+
+  private long currentWatermark;
+  private boolean triggerAtWatermark;
+  private long earlyTriggerCount;
+  private long earlyTriggerMillis;
+  private long lateTriggerCount;
+  private long lateTriggerMillis;
+  private long currentDerivedTimestamp = -1;
+  private long windowWidthMillis;
+  protected DataStorageT dataStorage;
+  protected DataStorageT retractionStorage;
+  protected AccumulationT accumulation;
+
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractWindowedOperator.class);
+
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public void process(Tuple tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  // TODO: This port should be removed when Apex Core has native support 
for custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort controlInput = new 
DefaultInputPort()
+  {
+@Override
+public void process(ControlTuple tuple)
+{
+  if (tuple instanceof ControlTuple.Watermark) {
+processWatermark((ControlTuple.Watermark)tuple);
+  }
+}
+  };
+
+
+  // TODO: multiple input ports for join operations
+
+  public final transient DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+  // TODO: This port should be removed when Apex Core has native 

[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351836#comment-15351836
 ] 

ASF GitHub Bot commented on APEXMALHAR-2085:


Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68658247
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
 ---
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.ValidationException;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedOperator;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is the abstract windowed operator class that implements most of 
the windowing, triggering, and accumulating
+ * concepts. The subclass of this abstract class is supposed to provide 
the implementation of how the accumulated
+ * values are stored in the storage.
+ *
+ * @param  The type of the input tuple
+ * @param  The type of the output tuple
+ * @param  The type of the data storage
+ * @param  The type of the accumulation
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractWindowedOperator
+extends BaseOperator implements WindowedOperator
+{
+
+  protected WindowOption windowOption;
+  protected TriggerOption triggerOption;
+  protected long allowedLatenessMillis = -1;
+  protected WindowedStorage windowStateMap;
+
+  private Function timestampExtractor;
+
+  private long currentWatermark;
+  private boolean triggerAtWatermark;
+  private long earlyTriggerCount;
+  private long earlyTriggerMillis;
+  private long lateTriggerCount;
+  private long lateTriggerMillis;
+  private long currentDerivedTimestamp = -1;
+  private long windowWidthMillis;
+  protected DataStorageT dataStorage;
+  protected DataStorageT retractionStorage;
+  protected AccumulationT accumulation;
+
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractWindowedOperator.class);
+
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public void process(Tuple tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  // TODO: This port should be removed when Apex Core has native support 
for custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort controlInput = new 
DefaultInputPort()
+  {
+@Override
+public void process(ControlTuple tuple)
+{
+  if (tuple instanceof ControlTuple.Watermark) {
+processWatermark((ControlTuple.Watermark)tuple);
+  }
+}
   

[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...

2016-06-27 Thread chinmaykolhatkar
Github user chinmaykolhatkar commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/319#discussion_r68658247
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java
 ---
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.ValidationException;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.window.Accumulation;
+import org.apache.apex.malhar.lib.window.ControlTuple;
+import org.apache.apex.malhar.lib.window.TriggerOption;
+import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.WindowOption;
+import org.apache.apex.malhar.lib.window.WindowState;
+import org.apache.apex.malhar.lib.window.WindowedOperator;
+import org.apache.apex.malhar.lib.window.WindowedStorage;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Function;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * This is the abstract windowed operator class that implements most of 
the windowing, triggering, and accumulating
+ * concepts. The subclass of this abstract class is supposed to provide 
the implementation of how the accumulated
+ * values are stored in the storage.
+ *
+ * @param  The type of the input tuple
+ * @param  The type of the output tuple
+ * @param  The type of the data storage
+ * @param  The type of the accumulation
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractWindowedOperator
+extends BaseOperator implements WindowedOperator
+{
+
+  protected WindowOption windowOption;
+  protected TriggerOption triggerOption;
+  protected long allowedLatenessMillis = -1;
+  protected WindowedStorage windowStateMap;
+
+  private Function timestampExtractor;
+
+  private long currentWatermark;
+  private boolean triggerAtWatermark;
+  private long earlyTriggerCount;
+  private long earlyTriggerMillis;
+  private long lateTriggerCount;
+  private long lateTriggerMillis;
+  private long currentDerivedTimestamp = -1;
+  private long windowWidthMillis;
+  protected DataStorageT dataStorage;
+  protected DataStorageT retractionStorage;
+  protected AccumulationT accumulation;
+
+  private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractWindowedOperator.class);
+
+  public final transient DefaultInputPort input = new 
DefaultInputPort()
+  {
+@Override
+public void process(Tuple tuple)
+{
+  processTuple(tuple);
+}
+  };
+
+  // TODO: This port should be removed when Apex Core has native support 
for custom control tuples
+  @InputPortFieldAnnotation(optional = true)
+  public final transient DefaultInputPort controlInput = new 
DefaultInputPort()
+  {
+@Override
+public void process(ControlTuple tuple)
+{
+  if (tuple instanceof ControlTuple.Watermark) {
+processWatermark((ControlTuple.Watermark)tuple);
+  }
+}
+  };
+
+
+  // TODO: multiple input ports for join operations
+
+  public final transient DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+  // TODO: This port should be removed when Apex Core has native 

[jira] [Commented] (APEXMALHAR-2126) Suggest: Share Slice Buffer

2016-06-27 Thread Vlad Rozov (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351786#comment-15351786
 ] 

Vlad Rozov commented on APEXMALHAR-2126:


The intention of com.datatorrent.netlet.util.Slice was to avoid memory copy 
when raw data is received into a buffer and then parsed into messages, not 
memory allocation/deallocation. Memory for data structures/tuples still needs 
to be allocated and allocating large buffer may lead to excessive memory usage 
when a large buffer is allocated, but not fully used.

> Suggest: Share Slice Buffer
> ---
>
> Key: APEXMALHAR-2126
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2126
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: bright chen
>
> I think the intention of Slice(com.datatorrent.netlet.util.Slice) was to 
> share the buffer and avoid unnecessary memory allocation/deallocation. But 
> the intension is not self-explain and lack of method to share the memory. And 
> the util class org.apache.apex.malhar.lib.utils.serde.SliceUtils also create 
> new memory and copy the data.
> I suggest to implement another class(Say BufferSlice), which 
> - initialize buffer with relative large buffer
> - support append(byte[] data, int offset, int length)
> - dynamic reallocated buffer or throw exception when buffer is full ( based 
> on the management strategy)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXMALHAR-2126) Suggest: Share Slice Buffer

2016-06-27 Thread bright chen (JIRA)
bright chen created APEXMALHAR-2126:
---

 Summary: Suggest: Share Slice Buffer
 Key: APEXMALHAR-2126
 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2126
 Project: Apache Apex Malhar
  Issue Type: Improvement
Reporter: bright chen


I think the intention of Slice(com.datatorrent.netlet.util.Slice) was to share 
the buffer and avoid unnecessary memory allocation/deallocation. But the 
intension is not self-explain and lack of method to share the memory. And the 
util class org.apache.apex.malhar.lib.utils.serde.SliceUtils also create new 
memory and copy the data.

I suggest to implement another class(Say BufferSlice), which 
- initialize buffer with relative large buffer
- support append(byte[] data, int offset, int length)
- dynamic reallocated buffer or throw exception when buffer is full ( based on 
the management strategy)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-core pull request #350: APEXCORE-222 Purging the stale data present in ...

2016-06-27 Thread sandeshh
Github user sandeshh commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/350#discussion_r68643226
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
@@ -769,7 +769,11 @@ public void 
processHeartbeatResponse(ContainerHeartbeatResponse rsp)
 }
 
 if (rsp.committedWindowId != lastCommittedWindowId) {
+
   lastCommittedWindowId = rsp.committedWindowId;
+
+  bufferServer.purge(lastCommittedWindowId);
--- End diff --

I have created a jira for the extra window. Will update this PR accordingly.
https://issues.apache.org/jira/browse/APEXCORE-479


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Getting various statitstics from StreamingContainers

2016-06-27 Thread Sandesh Hegde
Looked around to see, how others are implementing this, found the feature
implemented in Storm.

Storm supports whitelist of commands to run heap dumps/stack and few
others.

Looked at their implementation, they are also using new process to lauch
the JDK tools to get the information.
https://issues.apache.org/jira/browse/STORM-1157


On Mon, May 23, 2016 at 11:31 AM Sandesh Hegde 
wrote:

> After connecting to the app user will run the following command.
>
> Users will select the container id, jdk tool and the arguments to the tool.
>
> Apex CLI api
> run-jdkTools  "Container-id" "Tool-name" "Arguments"
>
> Output
>  of the command is interpreted by the user.
>
>
>
> On Mon, May 23, 2016 at 11:31 AM Thomas Weise 
> wrote:
>
>> I think it is appropriate to collect the information that the JVM provides
>> using the available API instead of running external processes.
>>
>> For other information, how do you suggest that will be provided to the
>> user?
>>
>> Thanks,
>> Thomas
>>
>>
>>
>> On Mon, May 23, 2016 at 11:27 AM, Sandesh Hegde 
>> wrote:
>>
>> > Users can pass the arguments to the JDK tools. So it exposes all the
>> power
>> > of those tools. If we have to write the code we are doing the duplicate
>> > work.
>> > Also it doesn't evolve with the new features of the JVM, but the tools
>> will
>> > and we just have to change the arguments that we pass.
>> >
>> > On Mon, May 23, 2016 at 11:15 AM Vlad Rozov 
>> > wrote:
>> >
>> > > What is the purpose of the new process? Why that information can't be
>> > > collected directly from JVM and passed to app master using heartbeat?
>> > >
>> > > Thank you,
>> > > Vlad
>> > >
>> > > On 5/23/16 10:57, Sandesh Hegde wrote:
>> > > > Hello All,
>> > > >
>> > > > Getting various information from the StreamingConatainers is a
>> useful
>> > > > feature to have.
>> > > > As StreamingContainers are JVMs, various JDK tools can be used to
>> get
>> > the
>> > > > information.
>> > > >
>> > > > So the idea is to spawn the new process from the streaming
>> containers
>> > and
>> > > > return the information via Stram.
>> > > >
>> > > > Recently we have added the feature to get stack trace I have
>> modified
>> > > that
>> > > > to show the idea I am talking about.
>> > > >
>> > > > Here is the pull request, the purpose of that is to show the idea,
>> let
>> > me
>> > > > know your thoughts.
>> > > > https://github.com/apache/incubator-apex-core/pull/340
>> > > >
>> > > > I have not created a jira yet, wanted to check the viability of the
>> > idea.
>> > > >
>> > > > Thanks
>> > > > Sandesh
>> > > >
>> > >
>> > >
>> >
>>
>


[jira] [Created] (APEXCORE-478) SerDe based on the types

2016-06-27 Thread Sandesh (JIRA)
Sandesh created APEXCORE-478:


 Summary: SerDe based on the types
 Key: APEXCORE-478
 URL: https://issues.apache.org/jira/browse/APEXCORE-478
 Project: Apache Apex Core
  Issue Type: Improvement
Reporter: Sandesh


During deploy time, Apex already knows the type of the object to be serialized 
and deserialized. I am quoting Vlad here,
"
+1 for type based serialization. Tuples in most cases are flat
records/pojo and it should be possible programmatically construct a
codec that will significantly outperform Kryo. It should also reduce
amount of data passed over the wire. I started to look in that direction
as well as Kryo serialization is one of bottlenecks that limits Apex
throughput when operators are deployed into different containers
including NODE_LOCAL case."

Link to mail thread on this discussion
http://mail-archives.apache.org/mod_mbox/apex-dev/201605.mbox/%3CCAJhKGzqxknF_7Wj2T2Vfp%3D35cziT0hTJoQTnWopET%2Bj8OgpUJw%40mail.gmail.com%3E




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (APEXCORE-478) SerDe based on the types

2016-06-27 Thread Sandesh (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXCORE-478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandesh updated APEXCORE-478:
-
Description: 
During deploy time, Apex already knows the type of the object to be serialized 
and deserialized, so can optimize the SerDe used. I am quoting Vlad here,
"
+1 for type based serialization. Tuples in most cases are flat
records/pojo and it should be possible programmatically construct a
codec that will significantly outperform Kryo. It should also reduce
amount of data passed over the wire. I started to look in that direction
as well as Kryo serialization is one of bottlenecks that limits Apex
throughput when operators are deployed into different containers
including NODE_LOCAL case."

Link to mail thread on this discussion
http://mail-archives.apache.org/mod_mbox/apex-dev/201605.mbox/%3CCAJhKGzqxknF_7Wj2T2Vfp%3D35cziT0hTJoQTnWopET%2Bj8OgpUJw%40mail.gmail.com%3E


  was:
During deploy time, Apex already knows the type of the object to be serialized 
and deserialized. I am quoting Vlad here,
"
+1 for type based serialization. Tuples in most cases are flat
records/pojo and it should be possible programmatically construct a
codec that will significantly outperform Kryo. It should also reduce
amount of data passed over the wire. I started to look in that direction
as well as Kryo serialization is one of bottlenecks that limits Apex
throughput when operators are deployed into different containers
including NODE_LOCAL case."

Link to mail thread on this discussion
http://mail-archives.apache.org/mod_mbox/apex-dev/201605.mbox/%3CCAJhKGzqxknF_7Wj2T2Vfp%3D35cziT0hTJoQTnWopET%2Bj8OgpUJw%40mail.gmail.com%3E



> SerDe based on the types
> 
>
> Key: APEXCORE-478
> URL: https://issues.apache.org/jira/browse/APEXCORE-478
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Sandesh
>
> During deploy time, Apex already knows the type of the object to be 
> serialized and deserialized, so can optimize the SerDe used. I am quoting 
> Vlad here,
> "
> +1 for type based serialization. Tuples in most cases are flat
> records/pojo and it should be possible programmatically construct a
> codec that will significantly outperform Kryo. It should also reduce
> amount of data passed over the wire. I started to look in that direction
> as well as Kryo serialization is one of bottlenecks that limits Apex
> throughput when operators are deployed into different containers
> including NODE_LOCAL case."
> Link to mail thread on this discussion
> http://mail-archives.apache.org/mod_mbox/apex-dev/201605.mbox/%3CCAJhKGzqxknF_7Wj2T2Vfp%3D35cziT0hTJoQTnWopET%2Bj8OgpUJw%40mail.gmail.com%3E



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (APEXCORE-477) Unifier as an Attribute

2016-06-27 Thread Sandesh (JIRA)
Sandesh created APEXCORE-477:


 Summary: Unifier as an Attribute
 Key: APEXCORE-477
 URL: https://issues.apache.org/jira/browse/APEXCORE-477
 Project: Apache Apex Core
  Issue Type: Improvement
Reporter: Sandesh


There is a parallel between StreamCodec and Unifier. As StreamCodec can be 
applied on the input ports as an Attribute, having Unifier as an attribute for 
the output ports completes the similarity, making it easy for app developers.

Another reason for making it an Attribute : Unifier can be applied without 
extending the existing operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68560022
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,241 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl implements 
Spillable.SpillableByteArrayListMultimap,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde serdeKey;
+  private Serde serdeValue;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
+  private SpillableArrayListImpl getHelper(@Nullable K key)
+  {
+SpillableArrayListImpl spillableArrayList = cache.get(key);
+
+if (spillableArrayList == null) {
+  Slice keyPrefix = serdeKey.serialize(key);
+  Integer size = map.get(SliceUtils.concatenate(keyPrefix, 
SIZE_KEY_SUFFIX));
+
--- End diff --

Accessing the value from map should be as follows: 
map.get(SliceUtils.concatenate().buffer)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68559313
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java ---
@@ -0,0 +1,52 @@
+package org.apache.apex.malhar.lib.utils.serde;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SliceUtils
+{
+  private SliceUtils()
+  {
+  }
+
+  public static byte[] concatenate(byte[] a, byte[] b)
+  {
+byte[] output = new byte[a.length + b.length];
+
+System.arraycopy(a, 0, output, 0, a.length);
+System.arraycopy(b, 0, output, a.length, b.length);
+return output;
+  }
+
+  public static Slice concatenate(Slice a, Slice b)
+  {
+int size = a.length + b.length;
+byte[] bytes = new byte[size];
+
+System.arraycopy(a.buffer, a.offset, bytes, 0, a.length);
+System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
+
+return new Slice(bytes);
+  }
+
+  public static Slice concatenate(byte[] a, Slice b)
+  {
+int size = a.length + b.length;
+byte[] bytes = new byte[size];
+
+System.arraycopy(a, 0, bytes, 0, a.length);
+System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
+
+return new Slice(bytes);
+  }
+
+  public static Slice concatenate(Slice a, byte[] b)
+  {
+int size = a.length + b.length;
+byte[] bytes = new byte[size];
+
+System.arraycopy(a, a.offset, bytes, 0, a.length);
--- End diff --

a.buffer instead of a.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68559266
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
 ---
@@ -0,0 +1,191 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+public class SpillableByteMapImpl implements 
Spillable.SpillableByteMap, Spillable.SpillableComponent
+{
+  @NotNull
+  private SpillableStateStore store;
+  @NotNull
+  private byte[] identifier;
+  private long bucket;
+  @NotNull
+  private Serde serdeKey;
+  @NotNull
+  private Serde serdeValue;
+
+  private int size = 0;
+
+  private transient WindowBoundedMapCache cache = new 
WindowBoundedMapCache<>();
+  private transient MutableInt tempOffset = new MutableInt();
+
+  private SpillableByteMapImpl()
+  {
+//for kryo
+  }
+
+  public SpillableByteMapImpl(SpillableStateStore store, byte[] 
identifier, long bucket, Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+  }
+
+  @Override
+  public int size()
+  {
+return size;
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+return size == 0;
+  }
+
+  @Override
+  public boolean containsKey(Object o)
+  {
+return get(o) != null;
+  }
+
+  @Override
+  public boolean containsValue(Object o)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public V get(Object o)
+  {
+K key = (K)o;
+
+if (cache.getRemovedKeys().contains(key)) {
+  return null;
+}
+
+V val = cache.get(key);
+
+if (val != null) {
+  return val;
+}
+
+Slice valSlice = store.getSync(bucket, 
SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+
+if (valSlice == null || valSlice == BucketedState.EXPIRED || 
valSlice.length == 0) {
+  return null;
+}
+
+tempOffset.setValue(valSlice.offset + identifier.length);
+return serdeValue.deserialize(valSlice, tempOffset);
+  }
+
+  @Override
+  public V put(K k, V v)
+  {
+V value = get(k);
+
+if (value == null) {
+  size++;
+}
+
+cache.put(k, v);
+
+return value;
+  }
+
+  @Override
+  public V remove(Object o)
+  {
+V value = get(o);
+
+if (value != null) {
+  size--;
+}
+
+cache.remove((K)o);
+
+return value;
+  }
+
+  @Override
+  public void putAll(Map map)
+  {
+for (Map.Entry entry : map.entrySet()) {
+  put(entry.getKey(), entry.getValue());
+}
+  }
+
+  @Override
+  public void clear()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set> entrySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+  }
--- End diff --

I think store.setup() has to be called in setup method because 
SpillableStateStore extends from Component interface. Similarly the other 
methods like beginWindow, endWindow, teardown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-27 Thread chaithu14
Github user chaithu14 commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/324#discussion_r68559252
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
 ---
@@ -0,0 +1,241 @@
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Created by tfarkas on 6/12/16.
+ */
+public class SpillableByteArrayListMultimapImpl implements 
Spillable.SpillableByteArrayListMultimap,
+Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, 
(byte)0, (byte)0};
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+
+  private WindowBoundedMapCache cache = new 
WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl map;
+
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde serdeKey;
+  private Serde serdeValue;
+
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, 
byte[] identifier, long bucket,
+  Serde serdeKey,
+  Serde serdeValue)
+  {
+this.store = Preconditions.checkNotNull(store);
+this.identifier = Preconditions.checkNotNull(identifier);
+this.bucket = bucket;
+this.serdeKey = Preconditions.checkNotNull(serdeKey);
+this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+map = new SpillableByteMapImpl(store, identifier, bucket, new 
PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  @Override
+  public List get(@Nullable K key)
+  {
+return getHelper(key);
+  }
+
+  private SpillableArrayListImpl getHelper(@Nullable K key)
+  {
+SpillableArrayListImpl spillableArrayList = cache.get(key);
+
+if (spillableArrayList == null) {
+  Slice keyPrefix = serdeKey.serialize(key);
+  Integer size = map.get(SliceUtils.concatenate(keyPrefix, 
SIZE_KEY_SUFFIX));
+
+  if (size == null) {
+return null;
+  }
+
+  spillableArrayList = new SpillableArrayListImpl(bucket, 
keyPrefix.buffer, store, serdeValue);
+  spillableArrayList.setSize(size);
+
+  cache.put(key, spillableArrayList);
+}
+
+return spillableArrayList;
+  }
+
+  @Override
+  public Set keySet()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset keys()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection values()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection> entries()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List removeAll(@Nullable Object key)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int size()
+  {
+return map.size();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+return map.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(@Nullable Object key)
+  {
+return 
map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), 
SIZE_KEY_SUFFIX));
+  }
+
+  @Override
+  public boolean containsValue(@Nullable Object value)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsEntry(@Nullable Object key, @Nullable Object 
value)
+  {
+throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean put(@Nullable K key, @Nullable V value)
+  {
+SpillableArrayListImpl spillableArrayList = getHelper(key);
+

[GitHub] apex-core pull request #350: APEXCORE-222 Purging the stale data present in ...

2016-06-27 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/350#discussion_r68526285
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
@@ -769,7 +769,11 @@ public void 
processHeartbeatResponse(ContainerHeartbeatResponse rsp)
 }
 
 if (rsp.committedWindowId != lastCommittedWindowId) {
+
   lastCommittedWindowId = rsp.committedWindowId;
+
+  bufferServer.purge(lastCommittedWindowId);
--- End diff --

I would prefer to have a separate bug and PR for changing what window Id is 
safe to purge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXCORE-222) Delegate Buffer Server purge to StreamingContainer

2016-06-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXCORE-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350459#comment-15350459
 ] 

ASF GitHub Bot commented on APEXCORE-222:
-

Github user vrozov commented on a diff in the pull request:

https://github.com/apache/apex-core/pull/350#discussion_r68526285
  
--- Diff: 
engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---
@@ -769,7 +769,11 @@ public void 
processHeartbeatResponse(ContainerHeartbeatResponse rsp)
 }
 
 if (rsp.committedWindowId != lastCommittedWindowId) {
+
   lastCommittedWindowId = rsp.committedWindowId;
+
+  bufferServer.purge(lastCommittedWindowId);
--- End diff --

I would prefer to have a separate bug and PR for changing what window Id is 
safe to purge.


> Delegate Buffer Server purge to StreamingContainer
> --
>
> Key: APEXCORE-222
> URL: https://issues.apache.org/jira/browse/APEXCORE-222
> Project: Apache Apex Core
>  Issue Type: Improvement
>Reporter: Thomas Weise
>Assignee: Sandesh
>
> Currently the purge requests are sent to the buffer servers from the app 
> master. This interaction exists parallel to the heartbeat protocol. Instead, 
> the committed window ID that is propagated through the heartbeat response can 
> be used in StreamingContainer to initiate the purge with the local buffer 
> server, similar to how the committed callback on the operator occurs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


APEXMALHAR-1701 Deduper in Malhar

2016-06-27 Thread Bhupesh Chawda
Hi All,

I am working on adding a De-duplication operator in Malhar library based on
managed state APIs. I will be working off the already created JIRA -
https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the initial pull
request for an AbstractDeduper here:
https://github.com/apache/apex-malhar/pull/260/files

I am planning to include the following features in the first version:
1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time
correlation holds.
2. Option to maintain order of incoming tuples.
3. Duplicate and Expired ports to emit duplicate and expired tuples
respectively.

Thanks.

~ Bhupesh