[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352095#comment-15352095 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r68678627 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java --- @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem in + * parallel (without ordering guarantees between tuples). Records can be + * delimited (e.g. newline) or fixed width records. Output tuples are byte[]. + * + * Typically, this operator will be connected to output of FileSplitterInput to + * read records in parallel. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReader extends FSSliceReader +{ + /** + * Record reader mode decides how to split the records. + */ + public static enum RECORD_READER_MODE + { +DELIMITED_RECORD, FIXED_WIDTH_RECORD; + } + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + + /** + * Length for fixed width record + */ + private int recordLength; + + /** + * Port to emit individual records/tuples as byte[] + */ + public final transient DefaultOutputPortrecords = new DefaultOutputPort (); + + /** + * Initialize appropriate reader context based on mode selection + */ + @Override + public void setup(OperatorContext context) + { +super.setup(context); +if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) { + ReaderContext.FixedBytesReaderContext fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext(); + fixedBytesReaderContext.setLength(recordLength); + readerContext = fixedBytesReaderContext; +} else { + readerContext = new ReaderContext.ReadAheadLineReaderContext(); +} + } + + /** + * Read the block data and emit records based on reader context + * + * @param blockMetadata + * block + * @throws IOException + */ + protected void readBlock(BlockMetadata blockMetadata) throws IOException + { +readerContext.initialize(stream, blockMetadata, consecutiveBlock); +ReaderContext.Entity entity; +while ((entity = readerContext.next()) != null) { + + counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); + + byte[] record = entity.getRecord(); + + if (record != null) { +counters.getCounter(ReaderCounterKeys.RECORDS).increment(); +records.emit(record); + } +} + } + + /** + * Criteria for record split + * + * @param mode + * Mode + */ + public void setMode(RECORD_READER_MODE mode) + { +this.mode = mode; + } + + /** + * Criteria for record split + * + * @return mode + */ + public RECORD_READER_MODE getMode() + { +return mode; + } + + /** + * Length for fixed width record + * + * @param
[GitHub] apex-malhar pull request #326: APEXMALHAR-2116 Added FS record reader operat...
Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r68678627 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java --- @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem in + * parallel (without ordering guarantees between tuples). Records can be + * delimited (e.g. newline) or fixed width records. Output tuples are byte[]. + * + * Typically, this operator will be connected to output of FileSplitterInput to + * read records in parallel. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReader extends FSSliceReader +{ + /** + * Record reader mode decides how to split the records. + */ + public static enum RECORD_READER_MODE + { +DELIMITED_RECORD, FIXED_WIDTH_RECORD; + } + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + + /** + * Length for fixed width record + */ + private int recordLength; + + /** + * Port to emit individual records/tuples as byte[] + */ + public final transient DefaultOutputPortrecords = new DefaultOutputPort (); + + /** + * Initialize appropriate reader context based on mode selection + */ + @Override + public void setup(OperatorContext context) + { +super.setup(context); +if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) { + ReaderContext.FixedBytesReaderContext fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext(); + fixedBytesReaderContext.setLength(recordLength); + readerContext = fixedBytesReaderContext; +} else { + readerContext = new ReaderContext.ReadAheadLineReaderContext(); +} + } + + /** + * Read the block data and emit records based on reader context + * + * @param blockMetadata + * block + * @throws IOException + */ + protected void readBlock(BlockMetadata blockMetadata) throws IOException + { +readerContext.initialize(stream, blockMetadata, consecutiveBlock); +ReaderContext.Entity entity; +while ((entity = readerContext.next()) != null) { + + counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); + + byte[] record = entity.getRecord(); + + if (record != null) { +counters.getCounter(ReaderCounterKeys.RECORDS).increment(); +records.emit(record); + } +} + } + + /** + * Criteria for record split + * + * @param mode + * Mode + */ + public void setMode(RECORD_READER_MODE mode) + { +this.mode = mode; + } + + /** + * Criteria for record split + * + * @return mode + */ + public RECORD_READER_MODE getMode() + { +return mode; + } + + /** + * Length for fixed width record + * + * @param recordLength + */ + public void setRecordLength(int recordLength) + { +this.recordLength = recordLength; + } + + /** + * Length for fixed width record + * + * @return record length + */ +
[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators
[ https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352075#comment-15352075 ] ASF GitHub Bot commented on APEXMALHAR-2085: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68677141 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java --- @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.validation.ValidationException; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.WindowedOperator; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Function; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * This is the abstract windowed operator class that implements most of the windowing, triggering, and accumulating + * concepts. The subclass of this abstract class is supposed to provide the implementation of how the accumulated + * values are stored in the storage. + * + * @param The type of the input tuple + * @param The type of the output tuple + * @param The type of the data storage + * @param The type of the accumulation + */ +@InterfaceStability.Evolving +public abstract class AbstractWindowedOperator+extends BaseOperator implements WindowedOperator +{ + + protected WindowOption windowOption; + protected TriggerOption triggerOption; + protected long allowedLatenessMillis = -1; + protected WindowedStorage windowStateMap; + + private Function timestampExtractor; + + private long currentWatermark; + private boolean triggerAtWatermark; + private long earlyTriggerCount; + private long earlyTriggerMillis; + private long lateTriggerCount; + private long lateTriggerMillis; + private long currentDerivedTimestamp = -1; + private long windowWidthMillis; + protected DataStorageT dataStorage; + protected DataStorageT retractionStorage; + protected AccumulationT accumulation; + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class); + + public final transient DefaultInputPort input = new DefaultInputPort () + { +@Override +public void process(Tuple tuple) +{ + processTuple(tuple); +} + }; + + // TODO: This port should be removed when Apex Core has native support for custom control tuples + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort controlInput = new DefaultInputPort() + { +@Override +public void process(ControlTuple tuple) +{ + if (tuple instanceof ControlTuple.Watermark) { +processWatermark((ControlTuple.Watermark)tuple); + } +}
Re: [DISCUSSION] Custom Control Tuples
The windowing we discuss here is in general event time based, arrival time is a special case of it. I don't think state changes can be made independent of the streaming window boundary as it would prevent idempotent processing and transitively exactly once. For that to work, tuples need to be presented to the operator in a guaranteed order *within* the streaming window, which is not possible with multiple ports (and partitions). Thomas On Mon, Jun 27, 2016 at 2:53 PM, David Yanwrote: > I think for session tracking, if the session boundaries are allowed to be > not aligned with the streaming window boundaries, the user will have a much > bigger problem with idempotency. And in most cases, session tracking is > event time based, not ingression time or processing time based, so this may > never be a problem. But if that ever happens, the user can always alter the > default 500ms width. > > David > > On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozov > wrote: > > > Ability to send custom control tuples within window may be useful, for > > example, for sessions tracking, where session boundaries are not aligned > > with window boundaries and 500 ms latency is not acceptable for an > > application. > > > > Thank you, > > > > Vlad > > > > > > On 6/25/16 10:52, Thomas Weise wrote: > > > >> It should not matter from where the control tuple is triggered. It will > be > >> good to have a generic mechanism to propagate it and other things can be > >> accomplished outside the engine. For example, the new comprehensive > >> support > >> for windowing will all be in Malhar, nothing that the engine needs to > know > >> about it except that we need the control tuple for watermark propagation > >> and idempotent processing. > >> > >> I also think the main difference to other tuples is the need to send it > to > >> all partitions. Which is similar to checkpoint window tuples, but not > the > >> same. Here, we probably also need the ability for the user to control > >> whether such tuple should traverse the entire DAG or not. For a batch > use > >> case, for example, we may want to send the end of file to the next > >> operator, but not beyond, if the operator has asynchronous processing > >> logic > >> in it. > >> > >> For any logic to be idempotent, the control tuple needs to be processed > at > >> a window boundary. Receiving the control tuple in the window callback > >> would > >> avoid having to track extra state in the operator. I don't think that's > a > >> major issue, but what is the use case for processing a control tuple > >> within > >> the window? > >> > >> Thomas > >> > >> > >> > >> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni < > pra...@datatorrent.com> > >> wrote: > >> > >> For the use cases you mentioned, I think 1) and 2) are more likely to > >>> be controlled directly by the application, 3) and 4) are more likely > >>> going to be triggered externally and directly handled by the engine > >>> and 3) is already being implemented that way (apexcore-163). > >>> > >>> The control tuples emitted by an operator would be sent to all > >>> downstream partitions isn't it and that would be the chief distinction > >>> compared to data (apart from the payload) which would get partitioned > >>> under normal circumstances. It would also be guaranteed that > >>> downstream partitions will receive control tuples only after the data > >>> that was sent before it so we could send it immediately when it is > >>> emitted as opposed to window boundaries. > >>> > >>> However during unification it is important to know if these control > >>> tuples have been received from all upstream partitions before > >>> proceeding with a control operation. One could wait till end of the > >>> window but that introduces a delay however small, I would like to add > >>> to the proposal that the platform only hand over the control tuple to > >>> the unifier when it has been received from all upstream partitions > >>> much like how end window is processed but not wait till the actual end > >>> of the window. > >>> > >>> Regd your concern about idempotency, we typically care about > >>> idempotency at a window level and doing the above will still allow the > >>> operators to preserve that easily. > >>> > >>> Thanks > >>> > >>> On Jun 24, 2016, at 11:22 AM, David Yan wrote: > > Hi all, > > I would like to propose a new feature to the Apex core engine -- the > support of custom control tuples. Currently, we have control tuples > such > > >>> as > >>> > BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the > support for applications to insert their own control tuples. The way > currently to get around this is to use data tuples and have a separate > > >>> port > >>> > for such tuples that sends tuples to all partitions of the downstream > operators, which is not exactly developer friendly. > >
[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators
[ https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15352055#comment-15352055 ] ASF GitHub Bot commented on APEXMALHAR-2085: Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68675191 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/Window.java --- @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import java.util.Comparator; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This interface describes the individual window. + */ +@InterfaceStability.Evolving +public interface Window --- End diff -- Session window can be extended, and two session windows can be merged into one. > Implement Windowed Operators > > > Key: APEXMALHAR-2085 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Siyuan Hua >Assignee: David Yan > > As per our recent several discussions in the community. A group of Windowed > Operators that delivers the window semantic follows the google Data Flow > model(https://cloud.google.com/dataflow/) is very important. > The operators should be designed and implemented in a way for > High-level API > Beam translation > Easy to use with other popular operator > {panel:title=Operator Hierarchy} > Hierarchy of the operators, > The windowed operators should cover all possible transformations that require > window, and batch processing is also considered as special window called > global window > {code} >+---+ >+-> | WindowedOperator | <+ >| ++--+ | >|^ ^+ >|| | | >|| | | > +--+++--+--+ +---+-++--+-+ > |CombineOperator||GroupOperator| |KeyedOperator||JoinOperator| > +---++-+ +--+--++-+--+ >+-^ ^ ^ >| | | > ++---+ +-++ +++ > |KeyedCombine| |KeyedGroup| | CoGroup | > ++ +--+ +-+ > {code} > Combine operation includes all operations that combine all tuples in one > window into one or small number of tuples, Group operation group all tuples > in one window, Join and CoGroup are used to join and group tuples from > different inputs. > {panel} > {panel:title=Components} > * Window Component > It includes configuration, window state that should be checkpointed, etc. It > should support NonMergibleWindow(fixed or slide) MergibleWindow(Session) > * Trigger > It should support early trigger, late trigger with customizable trigger > behaviour > * Other related components: > ** Watermark generator, can be plugged into input source to generate watermark > ** Tuple schema support: > It should handle either predefined tuple type or give a declarative API to > describe the user defined tuple class > {panel} > Most component API should be reused in High-Level API > This is the umbrella ticket, separate tickets would be created for different > components and operators respectively -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68675229 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java --- @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.joda.time.Duration; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; + +/** + * This class describes how triggers should be fired for each window. + * For each window, a trigger can be fired before the watermark (EARLY), at the watermark (ON_TIME), or after the watermark (LATE). + * If a LATE trigger is specified and the accumulation mode is ACCUMULATING, it is important for the WindowOption to + * specify the allowed lateness because otherwise, all states must be kept in storage. + * + */ +@InterfaceStability.Evolving +public class TriggerOption +{ + + public enum AccumulationMode + { +DISCARDING, +ACCUMULATING, +ACCUMULATING_AND_RETRACTING + } + + private AccumulationMode accumulationMode = AccumulationMode.DISCARDING; + private boolean firingOnlyUpdatedPanes = false; + + /** + * Whether the trigger should be fired before the watermark, at the watermark, or after the watermark + */ + public enum WatermarkOpt + { +EARLY, --- End diff -- We can. @siyuanh any problem with that? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...
Github user davidyan74 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68675191 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/Window.java --- @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import java.util.Comparator; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This interface describes the individual window. + */ +@InterfaceStability.Evolving +public interface Window --- End diff -- Session window can be extended, and two session windows can be merged into one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2126) Suggest: Share Slice Buffer
[ https://issues.apache.org/jira/browse/APEXMALHAR-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351984#comment-15351984 ] Munagala V. Ramanath commented on APEXMALHAR-2126: -- +1 for this idea > Suggest: Share Slice Buffer > --- > > Key: APEXMALHAR-2126 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2126 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: bright chen > > I think the intention of Slice(com.datatorrent.netlet.util.Slice) was to > share the buffer and avoid unnecessary memory allocation/deallocation. But > the intension is not self-explain and lack of method to share the memory. And > the util class org.apache.apex.malhar.lib.utils.serde.SliceUtils also create > new memory and copy the data. > I suggest to implement another class(Say BufferSlice), which > - initialize buffer with relative large buffer > - support append(byte[] data, int offset, int length) > - dynamic reallocated buffer or throw exception when buffer is full ( based > on the management strategy) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2126) Suggest: Share Slice Buffer
[ https://issues.apache.org/jira/browse/APEXMALHAR-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351979#comment-15351979 ] bright chen commented on APEXMALHAR-2126: - let me take endWindow() of org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImplas an example. public void endWindow() { for (K key: cache.getChangedKeys()) { store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)), SliceUtils.concatenate(identifier, serdeValue.serialize(cache.get(key; } … } public static Slice concatenate(byte[] a, Slice b) { int size = a.length + b.length; byte[] bytes = new byte[size]; System.arraycopy(a, 0, bytes, 0, a.length); System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length); return new Slice(bytes); } so, each time call concatenate(), a new memory block allocated. But with new approach, these can be avoided private transient BufferSlice slice = new BufferSlice(SIZE); public void endWindow() { slice.clear(); for (K key: cache.getChangedKeys()) { store.put(this.bucket, slice.append(identifier).append(serdeKey.serialize(key)).endSlice(), slice.append(identifier).append(serdeValue.serialize(cache.get(key))).endSlice()); } … } The class can be implemented like following: public class BufferSlice { protected byte[] buffer; protected int offset; protected int length; public BufferSlice(int size) { buffer = new byte[size]; } public void append(byte[] data) { //TODO: handle over sized System.arraycopy(buffer, offset + length, data, 0, data.length); length += data.length; } public Slice endSlice() { Slice slice = new Slice(buffer, offset, length); offset += length; length = 0; return slice; } } > Suggest: Share Slice Buffer > --- > > Key: APEXMALHAR-2126 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2126 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: bright chen > > I think the intention of Slice(com.datatorrent.netlet.util.Slice) was to > share the buffer and avoid unnecessary memory allocation/deallocation. But > the intension is not self-explain and lack of method to share the memory. And > the util class org.apache.apex.malhar.lib.utils.serde.SliceUtils also create > new memory and copy the data. > I suggest to implement another class(Say BufferSlice), which > - initialize buffer with relative large buffer > - support append(byte[] data, int offset, int length) > - dynamic reallocated buffer or throw exception when buffer is full ( based > on the management strategy) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSSION] Custom Control Tuples
I hope I'm not commenting too late on this thread. >From the above discussion, it looks like the requirement is to have a custom tuple which has following 2 capabilities to influence streaming engine on: 1. When to send it (between windows/within window) 2. Where to send it (all partitions, some partition(s), one partition etc) I think if we design the custom tuple keeping in mind above 2 capabilities, user can have all the flexibility to make right use as per need. Thoughts? - Chinmay. On Mon, Jun 27, 2016 at 2:35 PM, Vlad Rozovwrote: > Ability to send custom control tuples within window may be useful, for > example, for sessions tracking, where session boundaries are not aligned > with window boundaries and 500 ms latency is not acceptable for an > application. > > Thank you, > > Vlad > > > On 6/25/16 10:52, Thomas Weise wrote: > >> It should not matter from where the control tuple is triggered. It will be >> good to have a generic mechanism to propagate it and other things can be >> accomplished outside the engine. For example, the new comprehensive >> support >> for windowing will all be in Malhar, nothing that the engine needs to know >> about it except that we need the control tuple for watermark propagation >> and idempotent processing. >> >> I also think the main difference to other tuples is the need to send it to >> all partitions. Which is similar to checkpoint window tuples, but not the >> same. Here, we probably also need the ability for the user to control >> whether such tuple should traverse the entire DAG or not. For a batch use >> case, for example, we may want to send the end of file to the next >> operator, but not beyond, if the operator has asynchronous processing >> logic >> in it. >> >> For any logic to be idempotent, the control tuple needs to be processed at >> a window boundary. Receiving the control tuple in the window callback >> would >> avoid having to track extra state in the operator. I don't think that's a >> major issue, but what is the use case for processing a control tuple >> within >> the window? >> >> Thomas >> >> >> >> On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneni >> wrote: >> >> For the use cases you mentioned, I think 1) and 2) are more likely to >>> be controlled directly by the application, 3) and 4) are more likely >>> going to be triggered externally and directly handled by the engine >>> and 3) is already being implemented that way (apexcore-163). >>> >>> The control tuples emitted by an operator would be sent to all >>> downstream partitions isn't it and that would be the chief distinction >>> compared to data (apart from the payload) which would get partitioned >>> under normal circumstances. It would also be guaranteed that >>> downstream partitions will receive control tuples only after the data >>> that was sent before it so we could send it immediately when it is >>> emitted as opposed to window boundaries. >>> >>> However during unification it is important to know if these control >>> tuples have been received from all upstream partitions before >>> proceeding with a control operation. One could wait till end of the >>> window but that introduces a delay however small, I would like to add >>> to the proposal that the platform only hand over the control tuple to >>> the unifier when it has been received from all upstream partitions >>> much like how end window is processed but not wait till the actual end >>> of the window. >>> >>> Regd your concern about idempotency, we typically care about >>> idempotency at a window level and doing the above will still allow the >>> operators to preserve that easily. >>> >>> Thanks >>> >>> On Jun 24, 2016, at 11:22 AM, David Yan wrote: Hi all, I would like to propose a new feature to the Apex core engine -- the support of custom control tuples. Currently, we have control tuples such >>> as >>> BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the support for applications to insert their own control tuples. The way currently to get around this is to use data tuples and have a separate >>> port >>> for such tuples that sends tuples to all partitions of the downstream operators, which is not exactly developer friendly. We have already seen a number of use cases that can use this feature: 1) Batch support: We need to tell all operators of the physical DAG when >>> a >>> batch starts and ends, so the operators can do whatever that is needed >>> upon >>> the start or the end of a batch. 2) Watermark: To support the concepts of event time windowing, the watermark control tuple is needed to tell which windows should be considered late. 3) Changing operator properties: We do have the support of changing operator properties on the fly, but with a custom control tuple, the command to
Re: [DISCUSSION] Custom Control Tuples
Ability to send custom control tuples within window may be useful, for example, for sessions tracking, where session boundaries are not aligned with window boundaries and 500 ms latency is not acceptable for an application. Thank you, Vlad On 6/25/16 10:52, Thomas Weise wrote: It should not matter from where the control tuple is triggered. It will be good to have a generic mechanism to propagate it and other things can be accomplished outside the engine. For example, the new comprehensive support for windowing will all be in Malhar, nothing that the engine needs to know about it except that we need the control tuple for watermark propagation and idempotent processing. I also think the main difference to other tuples is the need to send it to all partitions. Which is similar to checkpoint window tuples, but not the same. Here, we probably also need the ability for the user to control whether such tuple should traverse the entire DAG or not. For a batch use case, for example, we may want to send the end of file to the next operator, but not beyond, if the operator has asynchronous processing logic in it. For any logic to be idempotent, the control tuple needs to be processed at a window boundary. Receiving the control tuple in the window callback would avoid having to track extra state in the operator. I don't think that's a major issue, but what is the use case for processing a control tuple within the window? Thomas On Sat, Jun 25, 2016 at 6:19 AM, Pramod Immaneniwrote: For the use cases you mentioned, I think 1) and 2) are more likely to be controlled directly by the application, 3) and 4) are more likely going to be triggered externally and directly handled by the engine and 3) is already being implemented that way (apexcore-163). The control tuples emitted by an operator would be sent to all downstream partitions isn't it and that would be the chief distinction compared to data (apart from the payload) which would get partitioned under normal circumstances. It would also be guaranteed that downstream partitions will receive control tuples only after the data that was sent before it so we could send it immediately when it is emitted as opposed to window boundaries. However during unification it is important to know if these control tuples have been received from all upstream partitions before proceeding with a control operation. One could wait till end of the window but that introduces a delay however small, I would like to add to the proposal that the platform only hand over the control tuple to the unifier when it has been received from all upstream partitions much like how end window is processed but not wait till the actual end of the window. Regd your concern about idempotency, we typically care about idempotency at a window level and doing the above will still allow the operators to preserve that easily. Thanks On Jun 24, 2016, at 11:22 AM, David Yan wrote: Hi all, I would like to propose a new feature to the Apex core engine -- the support of custom control tuples. Currently, we have control tuples such as BEGIN_WINDOW, END_WINDOW, CHECKPOINT, and so on, but we don't have the support for applications to insert their own control tuples. The way currently to get around this is to use data tuples and have a separate port for such tuples that sends tuples to all partitions of the downstream operators, which is not exactly developer friendly. We have already seen a number of use cases that can use this feature: 1) Batch support: We need to tell all operators of the physical DAG when a batch starts and ends, so the operators can do whatever that is needed upon the start or the end of a batch. 2) Watermark: To support the concepts of event time windowing, the watermark control tuple is needed to tell which windows should be considered late. 3) Changing operator properties: We do have the support of changing operator properties on the fly, but with a custom control tuple, the command to change operator properties can be window aligned for all partitions and also across the DAG. 4) Recording tuples: Like changing operator properties, we do have this support now but only at the individual physical operator level, and without control of which window to record tuples for. With a custom control tuple, because a control tuple must belong to a window, all operators in the DAG can start (and stop) recording for the same windows. I can think of two options to achieve this: 1) new custom control tuple type that takes user's serializable object. 2) piggy back the current BEGIN_WINDOW and END_WINDOW control tuples. Please provide your feedback. Thank you. David
[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators
[ https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351837#comment-15351837 ] ASF GitHub Bot commented on APEXMALHAR-2085: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68658531 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java --- @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.validation.ValidationException; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.WindowedOperator; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Function; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * This is the abstract windowed operator class that implements most of the windowing, triggering, and accumulating + * concepts. The subclass of this abstract class is supposed to provide the implementation of how the accumulated + * values are stored in the storage. + * + * @param The type of the input tuple + * @param The type of the output tuple + * @param The type of the data storage + * @param The type of the accumulation + */ +@InterfaceStability.Evolving +public abstract class AbstractWindowedOperator+extends BaseOperator implements WindowedOperator +{ + + protected WindowOption windowOption; + protected TriggerOption triggerOption; + protected long allowedLatenessMillis = -1; + protected WindowedStorage windowStateMap; + + private Function timestampExtractor; + + private long currentWatermark; + private boolean triggerAtWatermark; + private long earlyTriggerCount; + private long earlyTriggerMillis; + private long lateTriggerCount; + private long lateTriggerMillis; + private long currentDerivedTimestamp = -1; + private long windowWidthMillis; + protected DataStorageT dataStorage; + protected DataStorageT retractionStorage; + protected AccumulationT accumulation; + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class); + + public final transient DefaultInputPort input = new DefaultInputPort () + { +@Override +public void process(Tuple tuple) +{ + processTuple(tuple); +} + }; + + // TODO: This port should be removed when Apex Core has native support for custom control tuples + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort controlInput = new DefaultInputPort() + { +@Override +public void process(ControlTuple tuple) +{ + if (tuple instanceof ControlTuple.Watermark) { +processWatermark((ControlTuple.Watermark)tuple); + } +}
[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators
[ https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351851#comment-15351851 ] ASF GitHub Bot commented on APEXMALHAR-2085: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68659163 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java --- @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.joda.time.Duration; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; + +/** + * This class describes how triggers should be fired for each window. + * For each window, a trigger can be fired before the watermark (EARLY), at the watermark (ON_TIME), or after the watermark (LATE). + * If a LATE trigger is specified and the accumulation mode is ACCUMULATING, it is important for the WindowOption to + * specify the allowed lateness because otherwise, all states must be kept in storage. + * + */ +@InterfaceStability.Evolving +public class TriggerOption +{ + + public enum AccumulationMode + { +DISCARDING, +ACCUMULATING, +ACCUMULATING_AND_RETRACTING + } + + private AccumulationMode accumulationMode = AccumulationMode.DISCARDING; + private boolean firingOnlyUpdatedPanes = false; + + /** + * Whether the trigger should be fired before the watermark, at the watermark, or after the watermark + */ + public enum WatermarkOpt + { +EARLY, --- End diff -- If I understand correctly from bream model, EARLY, ON-TIME & LATE are the attributes of data and/or of the results to be send to the downstream. Its not the attribute of a trigger. A trigger is an action which sends updated to downstream. The data containing that should be marked late, early or on-time. Shouldn't this enum be moved out as its not specific to trigger? Also the terminology of WatermarkOpt is confusion. As I said before these are not the attribute of watermark or trigger. > Implement Windowed Operators > > > Key: APEXMALHAR-2085 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2085 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Siyuan Hua >Assignee: David Yan > > As per our recent several discussions in the community. A group of Windowed > Operators that delivers the window semantic follows the google Data Flow > model(https://cloud.google.com/dataflow/) is very important. > The operators should be designed and implemented in a way for > High-level API > Beam translation > Easy to use with other popular operator > {panel:title=Operator Hierarchy} > Hierarchy of the operators, > The windowed operators should cover all possible transformations that require > window, and batch processing is also considered as special window called > global window > {code} >+---+ >+-> | WindowedOperator | <+ >| ++--+ | >|^ ^+ >|| | | >|| | | > +--+++--+--+ +---+-++--+-+ > |CombineOperator||GroupOperator| |KeyedOperator||JoinOperator| > +---++-+ +--+--++-+--+ >+-^ ^ ^ >
[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68659163 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/TriggerOption.java --- @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.joda.time.Duration; + +import org.apache.hadoop.classification.InterfaceStability; + +import com.esotericsoftware.kryo.serializers.FieldSerializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; + +/** + * This class describes how triggers should be fired for each window. + * For each window, a trigger can be fired before the watermark (EARLY), at the watermark (ON_TIME), or after the watermark (LATE). + * If a LATE trigger is specified and the accumulation mode is ACCUMULATING, it is important for the WindowOption to + * specify the allowed lateness because otherwise, all states must be kept in storage. + * + */ +@InterfaceStability.Evolving +public class TriggerOption +{ + + public enum AccumulationMode + { +DISCARDING, +ACCUMULATING, +ACCUMULATING_AND_RETRACTING + } + + private AccumulationMode accumulationMode = AccumulationMode.DISCARDING; + private boolean firingOnlyUpdatedPanes = false; + + /** + * Whether the trigger should be fired before the watermark, at the watermark, or after the watermark + */ + public enum WatermarkOpt + { +EARLY, --- End diff -- If I understand correctly from bream model, EARLY, ON-TIME & LATE are the attributes of data and/or of the results to be send to the downstream. Its not the attribute of a trigger. A trigger is an action which sends updated to downstream. The data containing that should be marked late, early or on-time. Shouldn't this enum be moved out as its not specific to trigger? Also the terminology of WatermarkOpt is confusion. As I said before these are not the attribute of watermark or trigger. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68658531 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java --- @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.validation.ValidationException; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.WindowedOperator; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Function; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * This is the abstract windowed operator class that implements most of the windowing, triggering, and accumulating + * concepts. The subclass of this abstract class is supposed to provide the implementation of how the accumulated + * values are stored in the storage. + * + * @param The type of the input tuple + * @param The type of the output tuple + * @param The type of the data storage + * @param The type of the accumulation + */ +@InterfaceStability.Evolving +public abstract class AbstractWindowedOperator+extends BaseOperator implements WindowedOperator +{ + + protected WindowOption windowOption; + protected TriggerOption triggerOption; + protected long allowedLatenessMillis = -1; + protected WindowedStorage windowStateMap; + + private Function timestampExtractor; + + private long currentWatermark; + private boolean triggerAtWatermark; + private long earlyTriggerCount; + private long earlyTriggerMillis; + private long lateTriggerCount; + private long lateTriggerMillis; + private long currentDerivedTimestamp = -1; + private long windowWidthMillis; + protected DataStorageT dataStorage; + protected DataStorageT retractionStorage; + protected AccumulationT accumulation; + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class); + + public final transient DefaultInputPort input = new DefaultInputPort () + { +@Override +public void process(Tuple tuple) +{ + processTuple(tuple); +} + }; + + // TODO: This port should be removed when Apex Core has native support for custom control tuples + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort controlInput = new DefaultInputPort() + { +@Override +public void process(ControlTuple tuple) +{ + if (tuple instanceof ControlTuple.Watermark) { +processWatermark((ControlTuple.Watermark)tuple); + } +} + }; + + + // TODO: multiple input ports for join operations + + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); + + // TODO: This port should be removed when Apex Core has native
[jira] [Commented] (APEXMALHAR-2085) Implement Windowed Operators
[ https://issues.apache.org/jira/browse/APEXMALHAR-2085?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351836#comment-15351836 ] ASF GitHub Bot commented on APEXMALHAR-2085: Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68658247 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java --- @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.validation.ValidationException; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.WindowedOperator; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Function; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * This is the abstract windowed operator class that implements most of the windowing, triggering, and accumulating + * concepts. The subclass of this abstract class is supposed to provide the implementation of how the accumulated + * values are stored in the storage. + * + * @param The type of the input tuple + * @param The type of the output tuple + * @param The type of the data storage + * @param The type of the accumulation + */ +@InterfaceStability.Evolving +public abstract class AbstractWindowedOperator+extends BaseOperator implements WindowedOperator +{ + + protected WindowOption windowOption; + protected TriggerOption triggerOption; + protected long allowedLatenessMillis = -1; + protected WindowedStorage windowStateMap; + + private Function timestampExtractor; + + private long currentWatermark; + private boolean triggerAtWatermark; + private long earlyTriggerCount; + private long earlyTriggerMillis; + private long lateTriggerCount; + private long lateTriggerMillis; + private long currentDerivedTimestamp = -1; + private long windowWidthMillis; + protected DataStorageT dataStorage; + protected DataStorageT retractionStorage; + protected AccumulationT accumulation; + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class); + + public final transient DefaultInputPort input = new DefaultInputPort () + { +@Override +public void process(Tuple tuple) +{ + processTuple(tuple); +} + }; + + // TODO: This port should be removed when Apex Core has native support for custom control tuples + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort controlInput = new DefaultInputPort() + { +@Override +public void process(ControlTuple tuple) +{ + if (tuple instanceof ControlTuple.Watermark) { +processWatermark((ControlTuple.Watermark)tuple); + } +}
[GitHub] apex-malhar pull request #319: APEXMALHAR-2085: REVIEW ONLY: Operator suppor...
Github user chinmaykolhatkar commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/319#discussion_r68658247 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java --- @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.impl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import javax.validation.ValidationException; + +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.ControlTuple; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.lib.window.WindowState; +import org.apache.apex.malhar.lib.window.WindowedOperator; +import org.apache.apex.malhar.lib.window.WindowedStorage; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Function; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; + +/** + * This is the abstract windowed operator class that implements most of the windowing, triggering, and accumulating + * concepts. The subclass of this abstract class is supposed to provide the implementation of how the accumulated + * values are stored in the storage. + * + * @param The type of the input tuple + * @param The type of the output tuple + * @param The type of the data storage + * @param The type of the accumulation + */ +@InterfaceStability.Evolving +public abstract class AbstractWindowedOperator+extends BaseOperator implements WindowedOperator +{ + + protected WindowOption windowOption; + protected TriggerOption triggerOption; + protected long allowedLatenessMillis = -1; + protected WindowedStorage windowStateMap; + + private Function timestampExtractor; + + private long currentWatermark; + private boolean triggerAtWatermark; + private long earlyTriggerCount; + private long earlyTriggerMillis; + private long lateTriggerCount; + private long lateTriggerMillis; + private long currentDerivedTimestamp = -1; + private long windowWidthMillis; + protected DataStorageT dataStorage; + protected DataStorageT retractionStorage; + protected AccumulationT accumulation; + + private static final transient Logger LOG = LoggerFactory.getLogger(AbstractWindowedOperator.class); + + public final transient DefaultInputPort input = new DefaultInputPort () + { +@Override +public void process(Tuple tuple) +{ + processTuple(tuple); +} + }; + + // TODO: This port should be removed when Apex Core has native support for custom control tuples + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort controlInput = new DefaultInputPort() + { +@Override +public void process(ControlTuple tuple) +{ + if (tuple instanceof ControlTuple.Watermark) { +processWatermark((ControlTuple.Watermark)tuple); + } +} + }; + + + // TODO: multiple input ports for join operations + + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); + + // TODO: This port should be removed when Apex Core has native
[jira] [Commented] (APEXMALHAR-2126) Suggest: Share Slice Buffer
[ https://issues.apache.org/jira/browse/APEXMALHAR-2126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15351786#comment-15351786 ] Vlad Rozov commented on APEXMALHAR-2126: The intention of com.datatorrent.netlet.util.Slice was to avoid memory copy when raw data is received into a buffer and then parsed into messages, not memory allocation/deallocation. Memory for data structures/tuples still needs to be allocated and allocating large buffer may lead to excessive memory usage when a large buffer is allocated, but not fully used. > Suggest: Share Slice Buffer > --- > > Key: APEXMALHAR-2126 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2126 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: bright chen > > I think the intention of Slice(com.datatorrent.netlet.util.Slice) was to > share the buffer and avoid unnecessary memory allocation/deallocation. But > the intension is not self-explain and lack of method to share the memory. And > the util class org.apache.apex.malhar.lib.utils.serde.SliceUtils also create > new memory and copy the data. > I suggest to implement another class(Say BufferSlice), which > - initialize buffer with relative large buffer > - support append(byte[] data, int offset, int length) > - dynamic reallocated buffer or throw exception when buffer is full ( based > on the management strategy) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (APEXMALHAR-2126) Suggest: Share Slice Buffer
bright chen created APEXMALHAR-2126: --- Summary: Suggest: Share Slice Buffer Key: APEXMALHAR-2126 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2126 Project: Apache Apex Malhar Issue Type: Improvement Reporter: bright chen I think the intention of Slice(com.datatorrent.netlet.util.Slice) was to share the buffer and avoid unnecessary memory allocation/deallocation. But the intension is not self-explain and lack of method to share the memory. And the util class org.apache.apex.malhar.lib.utils.serde.SliceUtils also create new memory and copy the data. I suggest to implement another class(Say BufferSlice), which - initialize buffer with relative large buffer - support append(byte[] data, int offset, int length) - dynamic reallocated buffer or throw exception when buffer is full ( based on the management strategy) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-core pull request #350: APEXCORE-222 Purging the stale data present in ...
Github user sandeshh commented on a diff in the pull request: https://github.com/apache/apex-core/pull/350#discussion_r68643226 --- Diff: engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java --- @@ -769,7 +769,11 @@ public void processHeartbeatResponse(ContainerHeartbeatResponse rsp) } if (rsp.committedWindowId != lastCommittedWindowId) { + lastCommittedWindowId = rsp.committedWindowId; + + bufferServer.purge(lastCommittedWindowId); --- End diff -- I have created a jira for the extra window. Will update this PR accordingly. https://issues.apache.org/jira/browse/APEXCORE-479 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: Getting various statitstics from StreamingContainers
Looked around to see, how others are implementing this, found the feature implemented in Storm. Storm supports whitelist of commands to run heap dumps/stack and few others. Looked at their implementation, they are also using new process to lauch the JDK tools to get the information. https://issues.apache.org/jira/browse/STORM-1157 On Mon, May 23, 2016 at 11:31 AM Sandesh Hegdewrote: > After connecting to the app user will run the following command. > > Users will select the container id, jdk tool and the arguments to the tool. > > Apex CLI api > run-jdkTools "Container-id" "Tool-name" "Arguments" > > Output > of the command is interpreted by the user. > > > > On Mon, May 23, 2016 at 11:31 AM Thomas Weise > wrote: > >> I think it is appropriate to collect the information that the JVM provides >> using the available API instead of running external processes. >> >> For other information, how do you suggest that will be provided to the >> user? >> >> Thanks, >> Thomas >> >> >> >> On Mon, May 23, 2016 at 11:27 AM, Sandesh Hegde >> wrote: >> >> > Users can pass the arguments to the JDK tools. So it exposes all the >> power >> > of those tools. If we have to write the code we are doing the duplicate >> > work. >> > Also it doesn't evolve with the new features of the JVM, but the tools >> will >> > and we just have to change the arguments that we pass. >> > >> > On Mon, May 23, 2016 at 11:15 AM Vlad Rozov >> > wrote: >> > >> > > What is the purpose of the new process? Why that information can't be >> > > collected directly from JVM and passed to app master using heartbeat? >> > > >> > > Thank you, >> > > Vlad >> > > >> > > On 5/23/16 10:57, Sandesh Hegde wrote: >> > > > Hello All, >> > > > >> > > > Getting various information from the StreamingConatainers is a >> useful >> > > > feature to have. >> > > > As StreamingContainers are JVMs, various JDK tools can be used to >> get >> > the >> > > > information. >> > > > >> > > > So the idea is to spawn the new process from the streaming >> containers >> > and >> > > > return the information via Stram. >> > > > >> > > > Recently we have added the feature to get stack trace I have >> modified >> > > that >> > > > to show the idea I am talking about. >> > > > >> > > > Here is the pull request, the purpose of that is to show the idea, >> let >> > me >> > > > know your thoughts. >> > > > https://github.com/apache/incubator-apex-core/pull/340 >> > > > >> > > > I have not created a jira yet, wanted to check the viability of the >> > idea. >> > > > >> > > > Thanks >> > > > Sandesh >> > > > >> > > >> > > >> > >> >
[jira] [Created] (APEXCORE-478) SerDe based on the types
Sandesh created APEXCORE-478: Summary: SerDe based on the types Key: APEXCORE-478 URL: https://issues.apache.org/jira/browse/APEXCORE-478 Project: Apache Apex Core Issue Type: Improvement Reporter: Sandesh During deploy time, Apex already knows the type of the object to be serialized and deserialized. I am quoting Vlad here, " +1 for type based serialization. Tuples in most cases are flat records/pojo and it should be possible programmatically construct a codec that will significantly outperform Kryo. It should also reduce amount of data passed over the wire. I started to look in that direction as well as Kryo serialization is one of bottlenecks that limits Apex throughput when operators are deployed into different containers including NODE_LOCAL case." Link to mail thread on this discussion http://mail-archives.apache.org/mod_mbox/apex-dev/201605.mbox/%3CCAJhKGzqxknF_7Wj2T2Vfp%3D35cziT0hTJoQTnWopET%2Bj8OgpUJw%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (APEXCORE-478) SerDe based on the types
[ https://issues.apache.org/jira/browse/APEXCORE-478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sandesh updated APEXCORE-478: - Description: During deploy time, Apex already knows the type of the object to be serialized and deserialized, so can optimize the SerDe used. I am quoting Vlad here, " +1 for type based serialization. Tuples in most cases are flat records/pojo and it should be possible programmatically construct a codec that will significantly outperform Kryo. It should also reduce amount of data passed over the wire. I started to look in that direction as well as Kryo serialization is one of bottlenecks that limits Apex throughput when operators are deployed into different containers including NODE_LOCAL case." Link to mail thread on this discussion http://mail-archives.apache.org/mod_mbox/apex-dev/201605.mbox/%3CCAJhKGzqxknF_7Wj2T2Vfp%3D35cziT0hTJoQTnWopET%2Bj8OgpUJw%40mail.gmail.com%3E was: During deploy time, Apex already knows the type of the object to be serialized and deserialized. I am quoting Vlad here, " +1 for type based serialization. Tuples in most cases are flat records/pojo and it should be possible programmatically construct a codec that will significantly outperform Kryo. It should also reduce amount of data passed over the wire. I started to look in that direction as well as Kryo serialization is one of bottlenecks that limits Apex throughput when operators are deployed into different containers including NODE_LOCAL case." Link to mail thread on this discussion http://mail-archives.apache.org/mod_mbox/apex-dev/201605.mbox/%3CCAJhKGzqxknF_7Wj2T2Vfp%3D35cziT0hTJoQTnWopET%2Bj8OgpUJw%40mail.gmail.com%3E > SerDe based on the types > > > Key: APEXCORE-478 > URL: https://issues.apache.org/jira/browse/APEXCORE-478 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Sandesh > > During deploy time, Apex already knows the type of the object to be > serialized and deserialized, so can optimize the SerDe used. I am quoting > Vlad here, > " > +1 for type based serialization. Tuples in most cases are flat > records/pojo and it should be possible programmatically construct a > codec that will significantly outperform Kryo. It should also reduce > amount of data passed over the wire. I started to look in that direction > as well as Kryo serialization is one of bottlenecks that limits Apex > throughput when operators are deployed into different containers > including NODE_LOCAL case." > Link to mail thread on this discussion > http://mail-archives.apache.org/mod_mbox/apex-dev/201605.mbox/%3CCAJhKGzqxknF_7Wj2T2Vfp%3D35cziT0hTJoQTnWopET%2Bj8OgpUJw%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (APEXCORE-477) Unifier as an Attribute
Sandesh created APEXCORE-477: Summary: Unifier as an Attribute Key: APEXCORE-477 URL: https://issues.apache.org/jira/browse/APEXCORE-477 Project: Apache Apex Core Issue Type: Improvement Reporter: Sandesh There is a parallel between StreamCodec and Unifier. As StreamCodec can be applied on the input ports as an Attribute, having Unifier as an attribute for the output ports completes the similarity, making it easy for app developers. Another reason for making it an Attribute : Unifier can be applied without extending the existing operators. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68560022 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,241 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImplimplements Spillable.SpillableByteArrayListMultimap , +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; + private Serde serdeValue; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + + private SpillableArrayListImpl getHelper(@Nullable K key) + { +SpillableArrayListImpl spillableArrayList = cache.get(key); + +if (spillableArrayList == null) { + Slice keyPrefix = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keyPrefix, SIZE_KEY_SUFFIX)); + --- End diff -- Accessing the value from map should be as follows: map.get(SliceUtils.concatenate().buffer) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68559313 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java --- @@ -0,0 +1,52 @@ +package org.apache.apex.malhar.lib.utils.serde; + +import com.datatorrent.netlet.util.Slice; + +public class SliceUtils +{ + private SliceUtils() + { + } + + public static byte[] concatenate(byte[] a, byte[] b) + { +byte[] output = new byte[a.length + b.length]; + +System.arraycopy(a, 0, output, 0, a.length); +System.arraycopy(b, 0, output, a.length, b.length); +return output; + } + + public static Slice concatenate(Slice a, Slice b) + { +int size = a.length + b.length; +byte[] bytes = new byte[size]; + +System.arraycopy(a.buffer, a.offset, bytes, 0, a.length); +System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length); + +return new Slice(bytes); + } + + public static Slice concatenate(byte[] a, Slice b) + { +int size = a.length + b.length; +byte[] bytes = new byte[size]; + +System.arraycopy(a, 0, bytes, 0, a.length); +System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length); + +return new Slice(bytes); + } + + public static Slice concatenate(Slice a, byte[] b) + { +int size = a.length + b.length; +byte[] bytes = new byte[size]; + +System.arraycopy(a, a.offset, bytes, 0, a.length); --- End diff -- a.buffer instead of a. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68559266 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java --- @@ -0,0 +1,191 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.state.BucketedState; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.mutable.MutableInt; + +import com.google.common.base.Preconditions; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +public class SpillableByteMapImplimplements Spillable.SpillableByteMap , Spillable.SpillableComponent +{ + @NotNull + private SpillableStateStore store; + @NotNull + private byte[] identifier; + private long bucket; + @NotNull + private Serde serdeKey; + @NotNull + private Serde serdeValue; + + private int size = 0; + + private transient WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + private transient MutableInt tempOffset = new MutableInt(); + + private SpillableByteMapImpl() + { +//for kryo + } + + public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + } + + @Override + public int size() + { +return size; + } + + @Override + public boolean isEmpty() + { +return size == 0; + } + + @Override + public boolean containsKey(Object o) + { +return get(o) != null; + } + + @Override + public boolean containsValue(Object o) + { +throw new UnsupportedOperationException(); + } + + @Override + public V get(Object o) + { +K key = (K)o; + +if (cache.getRemovedKeys().contains(key)) { + return null; +} + +V val = cache.get(key); + +if (val != null) { + return val; +} + +Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key))); + +if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) { + return null; +} + +tempOffset.setValue(valSlice.offset + identifier.length); +return serdeValue.deserialize(valSlice, tempOffset); + } + + @Override + public V put(K k, V v) + { +V value = get(k); + +if (value == null) { + size++; +} + +cache.put(k, v); + +return value; + } + + @Override + public V remove(Object o) + { +V value = get(o); + +if (value != null) { + size--; +} + +cache.remove((K)o); + +return value; + } + + @Override + public void putAll(Map map) + { +for (Map.Entry entry : map.entrySet()) { + put(entry.getKey(), entry.getValue()); +} + } + + @Override + public void clear() + { +throw new UnsupportedOperationException(); + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Set > entrySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public void setup(Context.OperatorContext context) + { + } --- End diff -- I think store.setup() has to be called in setup method because SpillableStateStore extends from Component interface. Similarly the other methods like beginWindow, endWindow, teardown. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
Github user chaithu14 commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/324#discussion_r68559252 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java --- @@ -0,0 +1,241 @@ +package org.apache.apex.malhar.lib.state.spillable; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde; +import org.apache.apex.malhar.lib.utils.serde.Serde; +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice; +import org.apache.apex.malhar.lib.utils.serde.SliceUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multiset; + +import com.datatorrent.api.Context; +import com.datatorrent.netlet.util.Slice; + +/** + * Created by tfarkas on 6/12/16. + */ +public class SpillableByteArrayListMultimapImplimplements Spillable.SpillableByteArrayListMultimap , +Spillable.SpillableComponent +{ + public static final int DEFAULT_BATCH_SIZE = 1000; + public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0}; + + private int batchSize = DEFAULT_BATCH_SIZE; + + private WindowBoundedMapCache cache = new WindowBoundedMapCache<>(); + + @NotNull + private SpillableByteMapImpl map; + + private SpillableStateStore store; + private byte[] identifier; + private long bucket; + private Serde serdeKey; + private Serde serdeValue; + + public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket, + Serde serdeKey, + Serde serdeValue) + { +this.store = Preconditions.checkNotNull(store); +this.identifier = Preconditions.checkNotNull(identifier); +this.bucket = bucket; +this.serdeKey = Preconditions.checkNotNull(serdeKey); +this.serdeValue = Preconditions.checkNotNull(serdeValue); + +map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice()); + } + + @Override + public List get(@Nullable K key) + { +return getHelper(key); + } + + private SpillableArrayListImpl getHelper(@Nullable K key) + { +SpillableArrayListImpl spillableArrayList = cache.get(key); + +if (spillableArrayList == null) { + Slice keyPrefix = serdeKey.serialize(key); + Integer size = map.get(SliceUtils.concatenate(keyPrefix, SIZE_KEY_SUFFIX)); + + if (size == null) { +return null; + } + + spillableArrayList = new SpillableArrayListImpl(bucket, keyPrefix.buffer, store, serdeValue); + spillableArrayList.setSize(size); + + cache.put(key, spillableArrayList); +} + +return spillableArrayList; + } + + @Override + public Set keySet() + { +throw new UnsupportedOperationException(); + } + + @Override + public Multiset keys() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection values() + { +throw new UnsupportedOperationException(); + } + + @Override + public Collection > entries() + { +throw new UnsupportedOperationException(); + } + + @Override + public List removeAll(@Nullable Object key) + { +throw new UnsupportedOperationException(); + } + + @Override + public void clear() + { +throw new UnsupportedOperationException(); + } + + @Override + public int size() + { +return map.size(); + } + + @Override + public boolean isEmpty() + { +return map.isEmpty(); + } + + @Override + public boolean containsKey(@Nullable Object key) + { +return map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), SIZE_KEY_SUFFIX)); + } + + @Override + public boolean containsValue(@Nullable Object value) + { +throw new UnsupportedOperationException(); + } + + @Override + public boolean containsEntry(@Nullable Object key, @Nullable Object value) + { +throw new UnsupportedOperationException(); + } + + @Override + public boolean put(@Nullable K key, @Nullable V value) + { +SpillableArrayListImpl spillableArrayList = getHelper(key); +
[GitHub] apex-core pull request #350: APEXCORE-222 Purging the stale data present in ...
Github user vrozov commented on a diff in the pull request: https://github.com/apache/apex-core/pull/350#discussion_r68526285 --- Diff: engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java --- @@ -769,7 +769,11 @@ public void processHeartbeatResponse(ContainerHeartbeatResponse rsp) } if (rsp.committedWindowId != lastCommittedWindowId) { + lastCommittedWindowId = rsp.committedWindowId; + + bufferServer.purge(lastCommittedWindowId); --- End diff -- I would prefer to have a separate bug and PR for changing what window Id is safe to purge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXCORE-222) Delegate Buffer Server purge to StreamingContainer
[ https://issues.apache.org/jira/browse/APEXCORE-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15350459#comment-15350459 ] ASF GitHub Bot commented on APEXCORE-222: - Github user vrozov commented on a diff in the pull request: https://github.com/apache/apex-core/pull/350#discussion_r68526285 --- Diff: engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java --- @@ -769,7 +769,11 @@ public void processHeartbeatResponse(ContainerHeartbeatResponse rsp) } if (rsp.committedWindowId != lastCommittedWindowId) { + lastCommittedWindowId = rsp.committedWindowId; + + bufferServer.purge(lastCommittedWindowId); --- End diff -- I would prefer to have a separate bug and PR for changing what window Id is safe to purge. > Delegate Buffer Server purge to StreamingContainer > -- > > Key: APEXCORE-222 > URL: https://issues.apache.org/jira/browse/APEXCORE-222 > Project: Apache Apex Core > Issue Type: Improvement >Reporter: Thomas Weise >Assignee: Sandesh > > Currently the purge requests are sent to the buffer servers from the app > master. This interaction exists parallel to the heartbeat protocol. Instead, > the committed window ID that is propagated through the heartbeat response can > be used in StreamingContainer to initiate the purge with the local buffer > server, similar to how the committed callback on the operator occurs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
APEXMALHAR-1701 Deduper in Malhar
Hi All, I am working on adding a De-duplication operator in Malhar library based on managed state APIs. I will be working off the already created JIRA - https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the initial pull request for an AbstractDeduper here: https://github.com/apache/apex-malhar/pull/260/files I am planning to include the following features in the first version: 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time correlation holds. 2. Option to maintain order of incoming tuples. 3. Duplicate and Expired ports to emit duplicate and expired tuples respectively. Thanks. ~ Bhupesh