[jira] [Commented] (APEXMALHAR-2121) KafkaInputOperator emitTuple method should be able to emit more than just message

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343712#comment-15343712
 ] 

ASF GitHub Bot commented on APEXMALHAR-2121:


Github user tweise commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/323#discussion_r67994905
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -203,6 +204,14 @@ public AbstractKafkaInputOperator()
*/
   protected abstract void emitTuple(Message message);
--- End diff --

I don't see a backward compatibility issue, it's a new method. And yes, the 
same change needs to be made on the 0.9 operator also.


> KafkaInputOperator emitTuple method should be able to emit more than just 
> message
> -
>
> Key: APEXMALHAR-2121
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2121
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>Priority: Minor
>
> AbstractKafkaInputOperator in malhar-contrib operator had emitTuple method 
> with parameter as "Message". User can't extend this method to access more 
> kafka parameters other than messages.
> We should have a method to access to "KafkaMessage" so that user can choose 
> from multiple fields which he/she wants to emit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #323: APEXMALHAR-2121: Updating KafkaInputOperator ...

2016-06-21 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/323#discussion_r67994905
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -203,6 +204,14 @@ public AbstractKafkaInputOperator()
*/
   protected abstract void emitTuple(Message message);
--- End diff --

I don't see a backward compatibility issue, it's a new method. And yes, the 
same change needs to be made on the 0.9 operator also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

2016-06-21 Thread Chandni Singh
Hi,
IMO scheduling a job can be independent of any operator while
StatsListeners are not.  I understand that in a lot of cases input/output
operators will decide when the job ends but there can be cases when
scheduling can be independent of it.

Thanks,
Chandni
On Jun 21, 2016 12:12 PM, "Thomas Weise"  wrote:

> This looks like something that coordination wise belongs into the master
> and can be done with a shared stats listener.
>
> The operator request/response protocol could be used the relay the data for
> the scheduling decisions.
>
> Thomas
>
>
> On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
> chandni.si...@capitalone.com> wrote:
>
> > Hi Tushar,
> >
> > I have some questions about the use case 2: Batch Support
> > I don¹t understand the advantages of providing batch support by having an
> > operator as a scheduler.
> >
> > An approach that seemed a little more straightforward to me was to expose
> > an API for scheduler. If there is a scheduler set then the master uses
> and
> > schedules operators. By default there isn¹t any scheduler and the job is
> > run as it is now.
> >
> > Maybe this is too simplistic but can you please let me know why having an
> > operator as a scheduler is a better way?
> >
> > Thanks,
> > Chandni
> >
> >
> > On 6/21/16, 11:09 AM, "Tushar Gosavi"  wrote:
> >
> > >Hi All,
> > >
> > >We have seen few use cases in field which require Apex application
> > >scheduling based on some condition. This has also came up as part of
> > >Batch Support in Apex previously
> > >(
> >
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E)
> > >. I am proposing following functionality in Apex to help scheduling
> > >and better resource utilization for batch jobs. Please provide your
> > >comments.
> > >
> > >Usecase 1 - Dynamic Dag modification.
> > >
> > >Each operator in DAG consumes yarn resources, sometimes it is
> > >desirable to return the resources to yarn when no data is available
> > >for processing, and deploy whole DAG once data starts to appear. For
> > >this to happen automatically, we will need some data monitoring
> > >operators running in the DAG to trigger restart and shutdown of the
> > >operators in the DAG.
> > >
> > >Apex already have such api to dynamically change the running dag
> > >through cli. We could provide similar API available to operators which
> > >will trigger dag modification at runtime. This information can be
> > >passed to master using heartbeat RPC and master will make
> > >required changed to the DAG. let me know what do you think about it..
> > >something like below.
> > >Context.beginDagChange();
> > >context.addOperator("o1") <== launch operator from previous
> check-pointed
> > >state.
> > >context.addOperator("o2", new Operator2()) <== create new operator
> > >context.addStream("s1", "reader.output", "o1.input");
> > >context.shutdown("o3"); <== delete this and downstream operators from
> the
> > >DAG.
> > >context.apply();  <== dag changes will be send to master, and master
> > >will apply these changes.
> > >
> > >Similarly API for other functionalities such as locality settings
> > >needs to be provided.
> > >
> > >
> > >Usecase 2 - Classic Batch Scheduling.
> > >
> > >Provide an API callable from operator to launch a DAG. The operator
> > >will prepare an dag object and submit it to the yarn, the DAG will be
> > >scheduled as a new application. This way complex schedulers can be
> > >written as operators.
> > >
> > >public SchedulerOperator implements Operator {
> > >   void handleIdleTime() {
> > >  // check of conditions to start a job (for example enough files
> > >available, enough items are available in kafa, or time has reached
> > > Dag dag = context.createDAG();
> > > dag.addOperator();
> > > dag.addOperator();
> > > LaunchOptions lOptions = new LaunchOptions();
> > > lOptions.oldId = ""; // start for this checkpoint.
> > > DagHandler dagHandler = context.submit(dag, lOptions);
> > >   }
> > >}
> > >
> > >DagHandler will have methods to monitor the final state of
> > >application, or to kill the DAG
> > >dagHandler.waitForCompletion() <== wait till the DAG terminates
> > >dagHandler.status()  <== get the status of application.
> > >dagHandler.kill() <== kill the running application.
> > >dagHandler.shutdown() <== shutdown the application.
> > >
> > >The more complex Scheduler operators could be written to manage the
> > >workflows, i.e DAG of DAGs. using these APIs.
> > >
> > >Regards,
> > >-Tushar.
> >
> > 
> >
> > The information contained in this e-mail is confidential and/or
> > proprietary to Capital One and/or its affiliates and may only be used
> > solely in performance of work or services for Capital One. The
> information
> > transmitted herewith is intended only for use by the individual or 

[jira] [Closed] (APEXMALHAR-2119) Make DirectoryScanner in AbstractFileInputOperator inheritance friendly.

2016-06-21 Thread Munagala V. Ramanath (JIRA)

 [ 
https://issues.apache.org/jira/browse/APEXMALHAR-2119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Munagala V. Ramanath closed APEXMALHAR-2119.

   Resolution: Fixed
Fix Version/s: 3.5.0

> Make DirectoryScanner in AbstractFileInputOperator inheritance friendly. 
> -
>
> Key: APEXMALHAR-2119
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2119
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Tushar Gosavi
>Assignee: Tushar Gosavi
> Fix For: 3.5.0
>
>
> The DirectoryScanner has partitionIndex and partitionCount  declared as 
> private without any setters. Inherited DirectoryScanner can not set them and 
> hence can not call most of the methods in DirectoryScanner which depends on 
> these fields (acceptFile). 
> Also new DirectoryScanner has to implement createPartition as default one 
> creates instance of DirectoryScanner by default.
> Make the class inheritance friendly by adding setters and use kryo clone in 
> createPartition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343027#comment-15343027
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r67971098
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java 
---
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import javax.validation.constraints.Min;
+
+import javax.validation.constraints.NotNull;
+import javax.validation.constraints.Size;
+
+import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.Module;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
+import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.fs.FileSplitterInput;
+
+/**
+ * This module is used for reading records/tuples from FileSystem.
+ * Records can be read in parallel using multiple partitions of record 
reader operator.
+ * (Ordering is not guaranteed when records are read in parallel)
+ *
+ * Input directory is scanned at specified interval to poll for new data.
+ * 
+ * The module reads data in parallel, following parameters can be 
configured
+ * 1. files: list of file(s)/directories to read
+ * 2. filePatternRegularExp: Files names matching given regex will be 
read
+ * 3. scanIntervalMillis: interval between two scans to discover new files 
in input directory
+ * 4. recursive: if scan recursively input directories
+ * 5. blockSize: block size used to read input blocks of file
+ * 6. readersCount: count of readers to read input file
+ * 7. sequencialFileRead: If emit file blocks in sequence?
+ * 8. blocksThreshold: number of blocks emitted per window
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReaderModule implements Module
+{
+  @NotNull
+  @Size(min = 1)
+  private String files;
+  private String filePatternRegularExp;
+  @Min(0)
+  private long scanIntervalMillis;
+  private boolean recursive = true;
+  private boolean sequencialFileRead = false;
+  private int readersCount;
+  @Min(1)
+  protected int blocksThreshold;
+  
+  public final transient ProxyOutputPort records = new 
ProxyOutputPort();
+  
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode;
+  
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+  
+  public FileSplitterInput createFileSplitter()
+  {
+return new FileSplitterInput();
+  }
+  
+  public FSRecordReader createBlockReader()
+  {
+FSRecordReader recordReader = new FSRecordReader();
+recordReader.setMode(mode);
+recordReader.setRecordLength(recordLength);
+
+return recordReader;
+  }
+  
+  @Override
+  public void populateDAG(DAG dag, Configuration configuration)
+  {
+FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", 
createFileSplitter());
+FSRecordReader recordReader = dag.addOperator("BlockReader", 
createBlockReader());
+
+dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, 
recordReader.blocksMetadataInput);
+
+if (sequencialFileRead) {
+  dag.setInputPortAttribute(recordReader.blocksMetadataInput, 
Context.PortContext.STREAM_CODEC,
+  new SequentialFileBlockMetadataCodec());
+}
+
+

[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343016#comment-15343016
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r67970457
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem 
+ * in parallel (without ordering guarantees between tuples).
+ * Records can be delimited (e.g. newline) or fixed with records.
+ * Output tuples are byte[]. 
+ * 
+ * Typically, this operator will be connected to output of 
FileSplitterInput 
+ * to read records in parallel.
+ */
+@org.apache.hadoop.classification.InterfaceStability.Evolving
+public class FSRecordReader extends FSSliceReader
+{
+  /**
+   * Record reader mode decides how to split the records.
+   */
+  public static enum RECORD_READER_MODE
+  {
+DELIMITED_RECORD, FIXED_WIDTH_RECORD;
+  }
+
+  /**
+   * Criteria for record split
+   */
+  private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD;
+  
+  /**
+   * Length for fixed width record
+   */
+  private int recordLength;
+  
+  /**
+   * Port to emit individual records/tuples as byte[]
+   */
+  public final transient DefaultOutputPort records = new 
DefaultOutputPort();
+
+  /**
+   * Initialize appropriate reader context based on mode selection
+   */
+  @Override
+  public void setup(OperatorContext context)
+  {
+super.setup(context);
+if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) {
+  ReaderContext.FixedBytesReaderContext 
fixedBytesReaderContext = new 
ReaderContext.FixedBytesReaderContext();
+  fixedBytesReaderContext.setLength(recordLength);
+  readerContext = fixedBytesReaderContext;
+} else {
+  readerContext = new 
ReaderContext.ReadAheadLineReaderContext();
+}
+  }
+  
+  /**
+   * Read the block data and emit records based on reader context
+   *
+   * @param blockMetadata block
+   * @throws IOException
+   */
+  protected void readBlock(BlockMetadata blockMetadata) throws IOException
+  {
+readerContext.initialize(stream, blockMetadata, consecutiveBlock);
+ReaderContext.Entity entity;
+while ((entity = readerContext.next()) != null) {
+
+  
counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes());
+
+  byte[] record = entity.getRecord();
+
+  //If the record is partial then ignore the record.
--- End diff --

why 'partial' rather than 'null' ?


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> 

[GitHub] apex-malhar pull request #326: APEXMALHAR-2116 Added FS record reader operat...

2016-06-21 Thread amberarrow
Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r67969125
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem 
+ * in parallel (without ordering guarantees between tuples).
+ * Records can be delimited (e.g. newline) or fixed with records.
--- End diff --

fixed with => fixed width


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342995#comment-15342995
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r67969125
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem 
+ * in parallel (without ordering guarantees between tuples).
+ * Records can be delimited (e.g. newline) or fixed with records.
--- End diff --

fixed with => fixed width


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342992#comment-15342992
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


Github user amberarrow commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/326#discussion_r67968963
  
--- Diff: 
library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.fs;
+
+import java.io.IOException;
+
+import org.apache.commons.beanutils.ConvertUtils;
+import org.apache.commons.beanutils.converters.AbstractConverter;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.io.block.BlockMetadata;
+import com.datatorrent.lib.io.block.FSSliceReader;
+import com.datatorrent.lib.io.block.ReaderContext;
+
+/**
+ * This operator can be used for reading records/tuples from Filesystem 
--- End diff --

This line and several others have trailing whitespace; this is generally 
considered not a good thing -- see
for instance: 
http://programmers.stackexchange.com/questions/121555/why-is-trailing-whitespace-a-big-deal
Suggest removing all trailing white space.


> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2119) Make DirectoryScanner in AbstractFileInputOperator inheritance friendly.

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342899#comment-15342899
 ] 

ASF GitHub Bot commented on APEXMALHAR-2119:


Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/318


> Make DirectoryScanner in AbstractFileInputOperator inheritance friendly. 
> -
>
> Key: APEXMALHAR-2119
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2119
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Tushar Gosavi
>Assignee: Tushar Gosavi
>
> The DirectoryScanner has partitionIndex and partitionCount  declared as 
> private without any setters. Inherited DirectoryScanner can not set them and 
> hence can not call most of the methods in DirectoryScanner which depends on 
> these fields (acceptFile). 
> Also new DirectoryScanner has to implement createPartition as default one 
> creates instance of DirectoryScanner by default.
> Make the class inheritance friendly by adding setters and use kryo clone in 
> createPartition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #318: APEXMALHAR-2119 add setters for partition cou...

2016-06-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/apex-malhar/pull/318


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2121) KafkaInputOperator emitTuple method should be able to emit more than just message

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342652#comment-15342652
 ] 

ASF GitHub Bot commented on APEXMALHAR-2121:


Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/323#discussion_r67949068
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -203,6 +204,14 @@ public AbstractKafkaInputOperator()
*/
   protected abstract void emitTuple(Message message);
--- End diff --

That's why I'm asking can we do that. It does have compatibility issue but 
on the other hand if some one wants to emit the tuple along with partition id 
etc, he/she is forced to implement the emitTuple method as well. Well, we can 
keep it as it is now. @DT-Priyanka can you file a jira to make this change in 
0.9 kafka input operator.  We can change the existing interface for that one


> KafkaInputOperator emitTuple method should be able to emit more than just 
> message
> -
>
> Key: APEXMALHAR-2121
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2121
> Project: Apache Apex Malhar
>  Issue Type: Improvement
>Reporter: Priyanka Gugale
>Assignee: Priyanka Gugale
>Priority: Minor
>
> AbstractKafkaInputOperator in malhar-contrib operator had emitTuple method 
> with parameter as "Message". User can't extend this method to access more 
> kafka parameters other than messages.
> We should have a method to access to "KafkaMessage" so that user can choose 
> from multiple fields which he/she wants to emit.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #323: APEXMALHAR-2121: Updating KafkaInputOperator ...

2016-06-21 Thread siyuanh
Github user siyuanh commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/323#discussion_r67949068
  
--- Diff: 
contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 ---
@@ -203,6 +204,14 @@ public AbstractKafkaInputOperator()
*/
   protected abstract void emitTuple(Message message);
--- End diff --

That's why I'm asking can we do that. It does have compatibility issue but 
on the other hand if some one wants to emit the tuple along with partition id 
etc, he/she is forced to implement the emitTuple method as well. Well, we can 
keep it as it is now. @DT-Priyanka can you file a jira to make this change in 
0.9 kafka input operator.  We can change the existing interface for that one


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342485#comment-15342485
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user tweise commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67934148
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  public static class PmfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public PmfSink() {}
+  }
+
+  public static class QuantileSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public QuantileSink() {}
+  }
+
+  public static class CdfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(Double tuple) {}
+};
+
+public CdfSink() {}
+  }
+
+  @Test
+  public void testQuantiles()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.quantilesOutput, sink);
+
+Random rand = new Random(1234L);
+ArrayList randArray = new ArrayList<>();
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 101; i++) {
+  double r = rand.nextGaussian();
+  quantilesOp.data.process(r);
+  randArray.add(r);
+}
+
+quantilesOp.endWindow();
+
+Collections.sort(randArray);
+
+Assert.assertEquals("Captures all computed quantiles", 
sink.collectedTuples.size(), 101);
+Assert.assertTrue("Computes median correctly", randArray.get(50) == 
sink.collectedTuples.get(100)[2]);
+  }
+
+  @Test
+  public void testCDF()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+

[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342484#comment-15342484
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user tweise commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67933939
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  @Test
+  public void testQuantiles()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.quantilesOutput, sink);
+
+Random rand = new Random(1234L);
+ArrayList randArray = new ArrayList<>();
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 101; i++) {
+  double r = rand.nextGaussian();
+  quantilesOp.data.process(r);
+  randArray.add(r);
+}
+
+quantilesOp.endWindow();
+
+Collections.sort(randArray);
+
+Assert.assertEquals("Captures all computed quantiles", 
sink.collectedTuples.size(), 101);
+Assert.assertTrue("Computes median correctly", randArray.get(50) == 
sink.collectedTuples.get(100)[2]);
+  }
+
+  @Test
+  public void testCDF()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.cdfOutput, sink);
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 1001; i++) {
+  double r = 0.001 * i;
+  quantilesOp.data.process(r);
+}
+quantilesOp.endWindow();
+
+List cdfValues = sink.collectedTuples;
+Assert.assertTrue("Highest CDF value is approx 1.0", 
cdfValues.get(cdfValues.size() - 1) >= 0.99 &&
+cdfValues.get(cdfValues.size() - 1) <= 1.0);
+Assert.assertTrue("Lowest CDF value is approx 0.0", cdfValues.get(0) 
>= 0.0 &&
+cdfValues.get(0) <= 0.01);
+  }
+
+  @Test
+  public void testPMF()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+double[] intervals = {0.0, 0.20, 0.40, 0.60, 0.80, 1.0};
+quantilesOp.setPmfIntervals(intervals);
+
+CollectorTestSink sink = new 

[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-21 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67933939
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  @Test
+  public void testQuantiles()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.quantilesOutput, sink);
+
+Random rand = new Random(1234L);
+ArrayList randArray = new ArrayList<>();
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 101; i++) {
+  double r = rand.nextGaussian();
+  quantilesOp.data.process(r);
+  randArray.add(r);
+}
+
+quantilesOp.endWindow();
+
+Collections.sort(randArray);
+
+Assert.assertEquals("Captures all computed quantiles", 
sink.collectedTuples.size(), 101);
+Assert.assertTrue("Computes median correctly", randArray.get(50) == 
sink.collectedTuples.get(100)[2]);
+  }
+
+  @Test
+  public void testCDF()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.cdfOutput, sink);
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 1001; i++) {
+  double r = 0.001 * i;
+  quantilesOp.data.process(r);
+}
+quantilesOp.endWindow();
+
+List cdfValues = sink.collectedTuples;
+Assert.assertTrue("Highest CDF value is approx 1.0", 
cdfValues.get(cdfValues.size() - 1) >= 0.99 &&
+cdfValues.get(cdfValues.size() - 1) <= 1.0);
+Assert.assertTrue("Lowest CDF value is approx 0.0", cdfValues.get(0) 
>= 0.0 &&
+cdfValues.get(0) <= 0.01);
+  }
+
+  @Test
+  public void testPMF()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+double[] intervals = {0.0, 0.20, 0.40, 0.60, 0.80, 1.0};
+quantilesOp.setPmfIntervals(intervals);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.pmfOutput, sink);
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+for (int i = 0; i < 1000; i++) {
+  quantilesOp.data.process(0.001 * i);
+}
+

Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

2016-06-21 Thread Thomas Weise
This looks like something that coordination wise belongs into the master
and can be done with a shared stats listener.

The operator request/response protocol could be used the relay the data for
the scheduling decisions.

Thomas


On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni <
chandni.si...@capitalone.com> wrote:

> Hi Tushar,
>
> I have some questions about the use case 2: Batch Support
> I don¹t understand the advantages of providing batch support by having an
> operator as a scheduler.
>
> An approach that seemed a little more straightforward to me was to expose
> an API for scheduler. If there is a scheduler set then the master uses and
> schedules operators. By default there isn¹t any scheduler and the job is
> run as it is now.
>
> Maybe this is too simplistic but can you please let me know why having an
> operator as a scheduler is a better way?
>
> Thanks,
> Chandni
>
>
> On 6/21/16, 11:09 AM, "Tushar Gosavi"  wrote:
>
> >Hi All,
> >
> >We have seen few use cases in field which require Apex application
> >scheduling based on some condition. This has also came up as part of
> >Batch Support in Apex previously
> >(
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP
> >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E)
> >. I am proposing following functionality in Apex to help scheduling
> >and better resource utilization for batch jobs. Please provide your
> >comments.
> >
> >Usecase 1 - Dynamic Dag modification.
> >
> >Each operator in DAG consumes yarn resources, sometimes it is
> >desirable to return the resources to yarn when no data is available
> >for processing, and deploy whole DAG once data starts to appear. For
> >this to happen automatically, we will need some data monitoring
> >operators running in the DAG to trigger restart and shutdown of the
> >operators in the DAG.
> >
> >Apex already have such api to dynamically change the running dag
> >through cli. We could provide similar API available to operators which
> >will trigger dag modification at runtime. This information can be
> >passed to master using heartbeat RPC and master will make
> >required changed to the DAG. let me know what do you think about it..
> >something like below.
> >Context.beginDagChange();
> >context.addOperator("o1") <== launch operator from previous check-pointed
> >state.
> >context.addOperator("o2", new Operator2()) <== create new operator
> >context.addStream("s1", "reader.output", "o1.input");
> >context.shutdown("o3"); <== delete this and downstream operators from the
> >DAG.
> >context.apply();  <== dag changes will be send to master, and master
> >will apply these changes.
> >
> >Similarly API for other functionalities such as locality settings
> >needs to be provided.
> >
> >
> >Usecase 2 - Classic Batch Scheduling.
> >
> >Provide an API callable from operator to launch a DAG. The operator
> >will prepare an dag object and submit it to the yarn, the DAG will be
> >scheduled as a new application. This way complex schedulers can be
> >written as operators.
> >
> >public SchedulerOperator implements Operator {
> >   void handleIdleTime() {
> >  // check of conditions to start a job (for example enough files
> >available, enough items are available in kafa, or time has reached
> > Dag dag = context.createDAG();
> > dag.addOperator();
> > dag.addOperator();
> > LaunchOptions lOptions = new LaunchOptions();
> > lOptions.oldId = ""; // start for this checkpoint.
> > DagHandler dagHandler = context.submit(dag, lOptions);
> >   }
> >}
> >
> >DagHandler will have methods to monitor the final state of
> >application, or to kill the DAG
> >dagHandler.waitForCompletion() <== wait till the DAG terminates
> >dagHandler.status()  <== get the status of application.
> >dagHandler.kill() <== kill the running application.
> >dagHandler.shutdown() <== shutdown the application.
> >
> >The more complex Scheduler operators could be written to manage the
> >workflows, i.e DAG of DAGs. using these APIs.
> >
> >Regards,
> >-Tushar.
>
> 
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>


Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application

2016-06-21 Thread Sandesh Hegde
For the usecase 1, is it possible to avoid changing the Context? Can we
have something along the lines of "StramToNodeRequest" ?

On Tue, Jun 21, 2016 at 11:09 AM Tushar Gosavi 
wrote:

> Hi All,
>
> We have seen few use cases in field which require Apex application
> scheduling based on some condition. This has also came up as part of
> Batch Support in Apex previously
> (
> http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDPXNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E
> )
> . I am proposing following functionality in Apex to help scheduling
> and better resource utilization for batch jobs. Please provide your
> comments.
>
> Usecase 1 - Dynamic Dag modification.
>
> Each operator in DAG consumes yarn resources, sometimes it is
> desirable to return the resources to yarn when no data is available
> for processing, and deploy whole DAG once data starts to appear. For
> this to happen automatically, we will need some data monitoring
> operators running in the DAG to trigger restart and shutdown of the
> operators in the DAG.
>
> Apex already have such api to dynamically change the running dag
> through cli. We could provide similar API available to operators which
> will trigger dag modification at runtime. This information can be
> passed to master using heartbeat RPC and master will make
> required changed to the DAG. let me know what do you think about it..
> something like below.
> Context.beginDagChange();
> context.addOperator("o1") <== launch operator from previous check-pointed
> state.
> context.addOperator("o2", new Operator2()) <== create new operator
> context.addStream("s1", "reader.output", "o1.input");
> context.shutdown("o3"); <== delete this and downstream operators from the
> DAG.
> context.apply();  <== dag changes will be send to master, and master
> will apply these changes.
>
> Similarly API for other functionalities such as locality settings
> needs to be provided.
>
>
> Usecase 2 - Classic Batch Scheduling.
>
> Provide an API callable from operator to launch a DAG. The operator
> will prepare an dag object and submit it to the yarn, the DAG will be
> scheduled as a new application. This way complex schedulers can be
> written as operators.
>
> public SchedulerOperator implements Operator {
>void handleIdleTime() {
>   // check of conditions to start a job (for example enough files
> available, enough items are available in kafa, or time has reached
>  Dag dag = context.createDAG();
>  dag.addOperator();
>  dag.addOperator();
>  LaunchOptions lOptions = new LaunchOptions();
>  lOptions.oldId = ""; // start for this checkpoint.
>  DagHandler dagHandler = context.submit(dag, lOptions);
>}
> }
>
> DagHandler will have methods to monitor the final state of
> application, or to kill the DAG
> dagHandler.waitForCompletion() <== wait till the DAG terminates
> dagHandler.status()  <== get the status of application.
> dagHandler.kill() <== kill the running application.
> dagHandler.shutdown() <== shutdown the application.
>
> The more complex Scheduler operators could be written to manage the
> workflows, i.e DAG of DAGs. using these APIs.
>
> Regards,
> -Tushar.
>


[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342403#comment-15342403
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user sandeep-n commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67925991
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  public static class PmfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public PmfSink() {}
+  }
+
+  public static class QuantileSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public QuantileSink() {}
+  }
+
+  public static class CdfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(Double tuple) {}
+};
+
+public CdfSink() {}
+  }
+
+  @Test
+  public void testQuantiles()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.quantilesOutput, sink);
+
+Random rand = new Random(1234L);
+ArrayList randArray = new ArrayList<>();
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 101; i++) {
+  double r = rand.nextGaussian();
+  quantilesOp.data.process(r);
+  randArray.add(r);
+}
+
+quantilesOp.endWindow();
+
+Collections.sort(randArray);
+
+Assert.assertEquals("Captures all computed quantiles", 
sink.collectedTuples.size(), 101);
+Assert.assertTrue("Computes median correctly", randArray.get(50) == 
sink.collectedTuples.get(100)[2]);
+  }
+
+  @Test
+  public void testCDF()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+

[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-21 Thread sandeep-n
Github user sandeep-n commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67925991
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  public static class PmfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public PmfSink() {}
+  }
+
+  public static class QuantileSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public QuantileSink() {}
+  }
+
+  public static class CdfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(Double tuple) {}
+};
+
+public CdfSink() {}
+  }
+
+  @Test
+  public void testQuantiles()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.quantilesOutput, sink);
+
+Random rand = new Random(1234L);
+ArrayList randArray = new ArrayList<>();
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 101; i++) {
+  double r = rand.nextGaussian();
+  quantilesOp.data.process(r);
+  randArray.add(r);
+}
+
+quantilesOp.endWindow();
+
+Collections.sort(randArray);
+
+Assert.assertEquals("Captures all computed quantiles", 
sink.collectedTuples.size(), 101);
+Assert.assertTrue("Computes median correctly", randArray.get(50) == 
sink.collectedTuples.get(100)[2]);
+  }
+
+  @Test
+  public void testCDF()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.cdfOutput, sink);
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 1001; i++) {
+  double r = 0.001 * i;
+  quantilesOp.data.process(r);
+

[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-21 Thread sandeep-n
Github user sandeep-n commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67925558
  
--- Diff: pom.xml ---
@@ -212,6 +212,7 @@
 library
 contrib
 demos
+sketches
--- End diff --

Thanks for the heads-up!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


APEXCORE-408 : Ability to schedule Sub-DAG from running application

2016-06-21 Thread Tushar Gosavi
Hi All,

We have seen few use cases in field which require Apex application
scheduling based on some condition. This has also came up as part of
Batch Support in Apex previously
(http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDPXNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E)
. I am proposing following functionality in Apex to help scheduling
and better resource utilization for batch jobs. Please provide your
comments.

Usecase 1 - Dynamic Dag modification.

Each operator in DAG consumes yarn resources, sometimes it is
desirable to return the resources to yarn when no data is available
for processing, and deploy whole DAG once data starts to appear. For
this to happen automatically, we will need some data monitoring
operators running in the DAG to trigger restart and shutdown of the
operators in the DAG.

Apex already have such api to dynamically change the running dag
through cli. We could provide similar API available to operators which
will trigger dag modification at runtime. This information can be
passed to master using heartbeat RPC and master will make
required changed to the DAG. let me know what do you think about it..
something like below.
Context.beginDagChange();
context.addOperator("o1") <== launch operator from previous check-pointed state.
context.addOperator("o2", new Operator2()) <== create new operator
context.addStream("s1", "reader.output", "o1.input");
context.shutdown("o3"); <== delete this and downstream operators from the DAG.
context.apply();  <== dag changes will be send to master, and master
will apply these changes.

Similarly API for other functionalities such as locality settings
needs to be provided.


Usecase 2 - Classic Batch Scheduling.

Provide an API callable from operator to launch a DAG. The operator
will prepare an dag object and submit it to the yarn, the DAG will be
scheduled as a new application. This way complex schedulers can be
written as operators.

public SchedulerOperator implements Operator {
   void handleIdleTime() {
  // check of conditions to start a job (for example enough files
available, enough items are available in kafa, or time has reached
 Dag dag = context.createDAG();
 dag.addOperator();
 dag.addOperator();
 LaunchOptions lOptions = new LaunchOptions();
 lOptions.oldId = ""; // start for this checkpoint.
 DagHandler dagHandler = context.submit(dag, lOptions);
   }
}

DagHandler will have methods to monitor the final state of
application, or to kill the DAG
dagHandler.waitForCompletion() <== wait till the DAG terminates
dagHandler.status()  <== get the status of application.
dagHandler.kill() <== kill the running application.
dagHandler.shutdown() <== shutdown the application.

The more complex Scheduler operators could be written to manage the
workflows, i.e DAG of DAGs. using these APIs.

Regards,
-Tushar.


[jira] [Commented] (APEXMALHAR-2116) File Record reader module

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341888#comment-15341888
 ] 

ASF GitHub Bot commented on APEXMALHAR-2116:


GitHub user yogidevendra opened a pull request:

https://github.com/apache/apex-malhar/pull/326

APEXMALHAR-2116 Added FS record reader operator, module, test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogidevendra/apex-malhar 
APEXMALHAR-2116-record-reader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #326


commit aaeb7fe34dc69de71dae120ea01452149389854f
Author: yogidevendra 
Date:   2016-06-20T05:47:08Z

Added FS record reader operator, module, test




> File Record reader module
> -
>
> Key: APEXMALHAR-2116
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Yogi Devendra
>Assignee: Yogi Devendra
>
> This will be useful for the usecases which involves reading from files "line 
> by line" in parallel and emit each line as seperate tuple.
> Proposal is to have new Module which would allow users to monitor 
> directories, read files and emit data records(tuple). Records are based on 
> record separator (e.g. newline) or fixed size (no of bytes). 
> Plan is as follows:
> 1. New operator FileRecordReader which will extend BlockReader.
> 2. This operator will have configuration option to select mode for 
> FIXED_LENGTH, SEPARATOR_BASED recors. 
> 3. Using appropriate ReaderContext based on mode.
> 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + 
> FileRecordReader operator)
> Reason for having different operator than BlockReader is because output port 
> signature is different than BlockReader.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #326: APEXMALHAR-2116 Added FS record reader operat...

2016-06-21 Thread yogidevendra
GitHub user yogidevendra opened a pull request:

https://github.com/apache/apex-malhar/pull/326

APEXMALHAR-2116 Added FS record reader operator, module, test



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yogidevendra/apex-malhar 
APEXMALHAR-2116-record-reader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #326


commit aaeb7fe34dc69de71dae120ea01452149389854f
Author: yogidevendra 
Date:   2016-06-20T05:47:08Z

Added FS record reader operator, module, test




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-21 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67852697
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  public static class PmfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public PmfSink() {}
+  }
+
+  public static class QuantileSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public QuantileSink() {}
+  }
+
+  public static class CdfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(Double tuple) {}
+};
+
+public CdfSink() {}
+  }
+
+  @Test
+  public void testQuantiles()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.quantilesOutput, sink);
+
+Random rand = new Random(1234L);
+ArrayList randArray = new ArrayList<>();
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 101; i++) {
+  double r = rand.nextGaussian();
+  quantilesOp.data.process(r);
+  randArray.add(r);
+}
+
+quantilesOp.endWindow();
+
+Collections.sort(randArray);
+
+Assert.assertEquals("Captures all computed quantiles", 
sink.collectedTuples.size(), 101);
+Assert.assertTrue("Computes median correctly", randArray.get(50) == 
sink.collectedTuples.get(100)[2]);
+  }
+
+  @Test
+  public void testCDF()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.cdfOutput, sink);
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 1001; i++) {
+  double r = 0.001 * i;
+  quantilesOp.data.process(r);
+ 

[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341617#comment-15341617
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67852390
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  public static class PmfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public PmfSink() {}
+  }
+
+  public static class QuantileSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public QuantileSink() {}
+  }
+
+  public static class CdfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(Double tuple) {}
+};
+
+public CdfSink() {}
+  }
+
+  @Test
+  public void testQuantiles()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.quantilesOutput, sink);
+
+Random rand = new Random(1234L);
+ArrayList randArray = new ArrayList<>();
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 101; i++) {
+  double r = rand.nextGaussian();
+  quantilesOp.data.process(r);
+  randArray.add(r);
+}
+
+quantilesOp.endWindow();
+
+Collections.sort(randArray);
+
+Assert.assertEquals("Captures all computed quantiles", 
sink.collectedTuples.size(), 101);
+Assert.assertTrue("Computes median correctly", randArray.get(50) == 
sink.collectedTuples.get(100)[2]);
+  }
+
+  @Test
+  public void testCDF()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+  

[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-21 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67852390
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  public static class PmfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public PmfSink() {}
+  }
+
+  public static class QuantileSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public QuantileSink() {}
+  }
+
+  public static class CdfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(Double tuple) {}
+};
+
+public CdfSink() {}
+  }
+
+  @Test
+  public void testQuantiles()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.quantilesOutput, sink);
+
+Random rand = new Random(1234L);
+ArrayList randArray = new ArrayList<>();
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 101; i++) {
+  double r = rand.nextGaussian();
+  quantilesOp.data.process(r);
+  randArray.add(r);
+}
+
+quantilesOp.endWindow();
+
+Collections.sort(randArray);
+
+Assert.assertEquals("Captures all computed quantiles", 
sink.collectedTuples.size(), 101);
+Assert.assertTrue("Computes median correctly", randArray.get(50) == 
sink.collectedTuples.get(100)[2]);
+  }
+
+  @Test
+  public void testCDF()
+  {
+QuantilesEstimator quantilesOp = new QuantilesEstimator(128, 
(short)12345);
+
+CollectorTestSink sink = new CollectorTestSink<>();
+TestUtils.setSink(quantilesOp.cdfOutput, sink);
+
+quantilesOp.setup(null);
+quantilesOp.beginWindow(0);
+
+for (int i = 0; i < 1001; i++) {
+  double r = 0.001 * i;
+  quantilesOp.data.process(r);
+ 

[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341612#comment-15341612
 ] 

ASF GitHub Bot commented on APEXMALHAR-2094:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67852225
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  public static class PmfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public PmfSink() {}
+  }
+
+  public static class QuantileSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public QuantileSink() {}
+  }
+
+  public static class CdfSink extends BaseOperator
--- End diff --

Not used


> Quantiles sketch operator
> -
>
> Key: APEXMALHAR-2094
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Sandeep Narayanaswami
>Assignee: Sandeep Narayanaswami
>Priority: Minor
>
> An operator that "sketches" in an online fashion the probability distribution 
> of an input (numeric) data stream, enabling computation of quantiles and 
> cumulative distribution functions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-21 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67852225
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  public static class PmfSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public PmfSink() {}
+  }
+
+  public static class QuantileSink extends BaseOperator
+  {
+public final DefaultInputPort input = new 
DefaultInputPort()
+{
+  @Override
+  public void process(double[] tuple) {}
+};
+
+public QuantileSink() {}
+  }
+
+  public static class CdfSink extends BaseOperator
--- End diff --

Not used


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator

2016-06-21 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/301#discussion_r67852192
  
--- Diff: 
sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java
 ---
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.sketches;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.TestUtils;
+
+public class QuantilesEstimatorTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(QuantilesEstimatorTest.class);
+
+  public static class NumberSource extends BaseOperator implements 
InputOperator
+  {
+public final DefaultOutputPort output = new 
DefaultOutputPort<>();
+
+private Random rand = new Random(1234L);
+
+public NumberSource() {}
+
+@Override
+public void emitTuples()
+{
+  output.emit(rand.nextGaussian());
+}
+  }
+
+  public static class PmfSink extends BaseOperator
--- End diff --

This is not used anywhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341509#comment-15341509
 ] 

ASF GitHub Bot commented on APEXMALHAR-2106:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/309#discussion_r67842738
  
--- Diff: 
library/src/test/java/com/datatorrent/lib/stream/MultipleStreamMergerTest.java 
---
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.testbench.RandomWordGenerator;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class MultipleStreamMergerTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(MultipleStreamMergerTest.class);
+
+  private ArrayList.Stream> streamsToAddToDag;
+  private ArrayList.NamedMerger> 
operatorsToAdd;
+
+  private static Counter counterOp = new Counter();
+
+  private static int tuplesToSend = 173;
+  private static int streamsToMerge = 15;
+
+  private static Map tuplesReceived = new HashMap<>();
+
+  @Before
+  public void setUp() throws Exception
+  {
+streamsToAddToDag = new ArrayList<>();
+operatorsToAdd = new ArrayList<>();
+  }
+
+  @Test
+  public void mergeTwoStreams()
+  {
+RandomWordGenerator randomWordGenerator = new RandomWordGenerator();
+RandomWordGenerator randomWordGenerator2 = new RandomWordGenerator();
+
+randomWordGenerator.setTuplesPerWindow(1);
+randomWordGenerator2.setTuplesPerWindow(1);
+
+MultipleStreamMerger merger = new MultipleStreamMerger<>();
+merger.merge(randomWordGenerator.output)
+.merge(randomWordGenerator2.output);
+
+merger.constructMergeTree(streamsToAddToDag, operatorsToAdd);
+
+assertEquals("Count of created streams", 2, streamsToAddToDag.size());
+assertEquals("Count of created operators", 1, operatorsToAdd.size());
+
+// Next check actual connections
+assertEquals("Generator 1 stream", randomWordGenerator.output,
+streamsToAddToDag.get(0).sourcePort.port);
+
+assertEquals("Generator 2 stream", randomWordGenerator2.output,
+streamsToAddToDag.get(1).sourcePort.port);
+
+assertEquals("Final operator input_1", 
operatorsToAdd.get(0).merger.data1, streamsToAddToDag.get(0).destPort);
+assertEquals("Final operator input_2", 
operatorsToAdd.get(0).merger.data2, streamsToAddToDag.get(1).destPort);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void mergeOneStream()
+  {
+RandomWordGenerator randomWordGenerator = new RandomWordGenerator();
+MultipleStreamMerger merger = new MultipleStreamMerger<>();
+merger.merge(randomWordGenerator.output);
+merger.constructMergeTree(streamsToAddToDag, operatorsToAdd);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void mergeZeroStream()

[GitHub] apex-malhar pull request #309: [APEXMALHAR-2106] Support multiple streams in...

2016-06-21 Thread bhupeshchawda
Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/309#discussion_r67842738
  
--- Diff: 
library/src/test/java/com/datatorrent/lib/stream/MultipleStreamMergerTest.java 
---
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.validation.ConstraintViolationException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.testbench.RandomWordGenerator;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+public class MultipleStreamMergerTest
+{
+  private static Logger LOG = 
LoggerFactory.getLogger(MultipleStreamMergerTest.class);
+
+  private ArrayList.Stream> streamsToAddToDag;
+  private ArrayList.NamedMerger> 
operatorsToAdd;
+
+  private static Counter counterOp = new Counter();
+
+  private static int tuplesToSend = 173;
+  private static int streamsToMerge = 15;
+
+  private static Map tuplesReceived = new HashMap<>();
+
+  @Before
+  public void setUp() throws Exception
+  {
+streamsToAddToDag = new ArrayList<>();
+operatorsToAdd = new ArrayList<>();
+  }
+
+  @Test
+  public void mergeTwoStreams()
+  {
+RandomWordGenerator randomWordGenerator = new RandomWordGenerator();
+RandomWordGenerator randomWordGenerator2 = new RandomWordGenerator();
+
+randomWordGenerator.setTuplesPerWindow(1);
+randomWordGenerator2.setTuplesPerWindow(1);
+
+MultipleStreamMerger merger = new MultipleStreamMerger<>();
+merger.merge(randomWordGenerator.output)
+.merge(randomWordGenerator2.output);
+
+merger.constructMergeTree(streamsToAddToDag, operatorsToAdd);
+
+assertEquals("Count of created streams", 2, streamsToAddToDag.size());
+assertEquals("Count of created operators", 1, operatorsToAdd.size());
+
+// Next check actual connections
+assertEquals("Generator 1 stream", randomWordGenerator.output,
+streamsToAddToDag.get(0).sourcePort.port);
+
+assertEquals("Generator 2 stream", randomWordGenerator2.output,
+streamsToAddToDag.get(1).sourcePort.port);
+
+assertEquals("Final operator input_1", 
operatorsToAdd.get(0).merger.data1, streamsToAddToDag.get(0).destPort);
+assertEquals("Final operator input_2", 
operatorsToAdd.get(0).merger.data2, streamsToAddToDag.get(1).destPort);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void mergeOneStream()
+  {
+RandomWordGenerator randomWordGenerator = new RandomWordGenerator();
+MultipleStreamMerger merger = new MultipleStreamMerger<>();
+merger.merge(randomWordGenerator.output);
+merger.constructMergeTree(streamsToAddToDag, operatorsToAdd);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void mergeZeroStream()
+  {
+MultipleStreamMerger merger = new MultipleStreamMerger<>();
+merger.constructMergeTree(streamsToAddToDag, operatorsToAdd);
+  }
+
+  public static class Application implements StreamingApplication
+  {
  

[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341487#comment-15341487
 ] 

ASF GitHub Bot commented on APEXMALHAR-2106:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/309#discussion_r67841249
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.util.ArrayList;
+import java.util.Queue;
+
+import org.eclipse.jetty.util.ArrayQueue;
--- End diff --

Why not use the java.util implementations instead? May be LinkedList?


> Support merging multiple streams with StreamMerger 
> ---
>
> Key: APEXMALHAR-2106
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106
> Project: Apache Apex Malhar
>  Issue Type: New Feature
>Reporter: Ilya Ganelin
>Assignee: Ilya Ganelin
>
> To properly implement the Flatten transformation (and other Stream 
> combination operations), Apex must support merging data from multiple 
> sources. The StreamMerger operator can be improved to merge multiple streams, 
> rather than just the two streams it can handle in the present implementation. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341485#comment-15341485
 ] 

ASF GitHub Bot commented on APEXMALHAR-2106:


Github user bhupeshchawda commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/309#discussion_r67841160
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java ---
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.stream;
+
+import java.util.ArrayList;
+import java.util.Queue;
+
+import org.eclipse.jetty.util.ArrayQueue;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+
+/**
+ * A helper class  that adds functionality to bypass the platform 
limitations of combining more than two
+ * streams at a time with Stream Merger.
+ *
+ * Usage:
+ *
+ * dag.addOperator("Stream_1", op1);
+ * dag.addOperator("Stream_2", op2);
+ * dag.addOperator("Stream_3", op3);
+ *
+ * MultipleStreamMerger merger = new MultipleStreamMerger();
+ * DefaultOutputPort streamOut = merger.merge(op1.out)
+ * .merge(op2.out)
+ * .merge(op3.out)
+ * .mergeStreams(dag, conf);
+ *
+ * dag.addStream("merger-counter", streamOut, counterOp.counter);
+ *
+ * @param 
+ */
+public class MultipleStreamMerger
+{
+  public class Stream
+  {
+DefaultInputPort destPort;
+SourcedOutputPort sourcePort;
+String name;
+
+public Stream(String name, SourcedOutputPort sourcePort, 
DefaultInputPort destPort)
+{
+  this.destPort = destPort;
+  this.sourcePort = sourcePort;
+  this.name = name;
+}
+  }
+
+  public class NamedMerger
+  {
+StreamMerger merger;
+String name;
+
+public NamedMerger(String name, StreamMerger merger)
+{
+  this.merger = merger;
+  this.name = name;
+}
+  }
+
+  /**
+   * A simple class to allow us to track whether the port to be merged is 
internal (allowing it to be thread local)
+   * or external
+   */
+  public class SourcedOutputPort
+  {
+boolean internal;
+DefaultOutputPort port;
+
+public SourcedOutputPort(DefaultOutputPort port)
+{
+  this.internal = false;
+  this.port = port;
+}
+
+public SourcedOutputPort(boolean internal, DefaultOutputPort port)
+{
+  this.internal = internal;
+  this.port = port;
+}
+  }
+
+  ArrayList streamsToMerge = new ArrayList<>();
+
+  private DefaultOutputPort streamOutput = new DefaultOutputPort<>();
+
+  /**
+   * Used to define all the sources to be merged into a single stream.
+   *
+   * @param sourcePort - The output port from the upstream operator that 
provides data
+   * @return The updated MultipleStreamMerger object that tracks which 
streams should be unified.
+   */
+  public MultipleStreamMerger merge(DefaultOutputPort sourcePort)
+  {
+streamsToMerge.add(sourcePort);
+return this;
+  }
+
+
+  /**
+   * Given the streams to merge have been selected with {@link 
#merge(DefaultOutputPort)}, create a subDAG and add it
+   * to an existing DAG.
+   *
+   * To merge more than two streams at a time, we construct a tiered 
hierarchy of thread-local StreamMerger operators
+   * E.g.
+   *
+   * Stream 0 ->
+   *StreamMerger_1 ->
+   * Stream 1 ->
+   *   StreamMerger_Final -> Out
+   * Stream 2 ->
+   *StreamMerger_2 ->
+   * Stream 3 ->
+   * Note that we don't use the populateDAG function because that is only 
used to 

[jira] [Commented] (APEXMALHAR-2122) Upgrading elasticsearch java api from 1.1.2 to 2.3.3.

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341286#comment-15341286
 ] 

ASF GitHub Bot commented on APEXMALHAR-2122:


GitHub user akshay-harale opened a pull request:

https://github.com/apache/apex-malhar/pull/325

APEXMALHAR-2122 Upgrading elasticsearch java api from 1.1.2 to 2.3.3. 

Upgrading elasticsearch java api from 1.1.2 to 2.3.3.  Made necessary 
changes in operator. Fixed build issues recognised while migration.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/akshay-harale/apex-malhar master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/325.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #325


commit ff5dbaad0ae349377ae9e20f19275e54189a5e07
Author: Akshay Harale 
Date:   2016-06-21T07:03:36Z

Upgrading elasticsearch java api from 1.1.2 to 2.3.3. Made necessary 
changes in operator. Fixed build issues recognised while migration.




> Upgrading elasticsearch java api from 1.1.2 to 2.3.3.
> -
>
> Key: APEXMALHAR-2122
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2122
> Project: Apache Apex Malhar
>  Issue Type: Bug
>  Components: query operators
>Affects Versions: 3.4.0
>Reporter: Akshay Harale
> Fix For: 3.4.1
>
>
> Current elassticsearch version is 2.3.3 so wee need to upgrade the 
> corresponding java api to 2.3.3.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #325: APEXMALHAR-2122 Upgrading elasticsearch java ...

2016-06-21 Thread akshay-harale
GitHub user akshay-harale opened a pull request:

https://github.com/apache/apex-malhar/pull/325

APEXMALHAR-2122 Upgrading elasticsearch java api from 1.1.2 to 2.3.3. 

Upgrading elasticsearch java api from 1.1.2 to 2.3.3.  Made necessary 
changes in operator. Fixed build issues recognised while migration.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/akshay-harale/apex-malhar master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/325.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #325


commit ff5dbaad0ae349377ae9e20f19275e54189a5e07
Author: Akshay Harale 
Date:   2016-06-21T07:03:36Z

Upgrading elasticsearch java api from 1.1.2 to 2.3.3. Made necessary 
changes in operator. Fixed build issues recognised while migration.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

2016-06-21 Thread ilooner
GitHub user ilooner opened a pull request:

https://github.com/apache/apex-malhar/pull/324

Spillable Datastructures PR for review only



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ilooner/incubator-apex-malhar 
APEXMALHAR-2048_pull

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/apex-malhar/pull/324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #324


commit c2c3f0acfcdf033a0e3044967ab3f8048f719259
Author: Timothy Farkas 
Date:   2016-06-05T00:11:20Z

- Intermediate commit.

commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8
Author: Timothy Farkas 
Date:   2016-06-13T06:03:21Z

Intermediate commit

commit 60acf68b96f2145af3d90b410c6b20613347f881
Author: Timothy Farkas 
Date:   2016-06-21T06:58:09Z

Intermediate commit




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (APEXMALHAR-2119) Make DirectoryScanner in AbstractFileInputOperator inheritance friendly.

2016-06-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/APEXMALHAR-2119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341202#comment-15341202
 ] 

ASF GitHub Bot commented on APEXMALHAR-2119:


Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/318#discussion_r67813977
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java 
---
@@ -1110,6 +1116,16 @@ public String toString()
   return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + 
" partitionIndex=" +
   partitionIndex + " partitionCount=" + partitionCount + "]";
 }
+
+protected void setPartitionIndex(int partitionIndex)
+{
+  this.partitionIndex = partitionIndex;
+}
+
+protected void setPartitionCount(int partitionCount)
+{
+  this.partitionCount = partitionCount;
+}
--- End diff --

Added test for setPartition and verified that the base function usage the 
value set.


> Make DirectoryScanner in AbstractFileInputOperator inheritance friendly. 
> -
>
> Key: APEXMALHAR-2119
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-2119
> Project: Apache Apex Malhar
>  Issue Type: Bug
>Reporter: Tushar Gosavi
>Assignee: Tushar Gosavi
>
> The DirectoryScanner has partitionIndex and partitionCount  declared as 
> private without any setters. Inherited DirectoryScanner can not set them and 
> hence can not call most of the methods in DirectoryScanner which depends on 
> these fields (acceptFile). 
> Also new DirectoryScanner has to implement createPartition as default one 
> creates instance of DirectoryScanner by default.
> Make the class inheritance friendly by adding setters and use kryo clone in 
> createPartition.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] apex-malhar pull request #318: APEXMALHAR-2119 add setters for partition cou...

2016-06-21 Thread tushargosavi
Github user tushargosavi commented on a diff in the pull request:

https://github.com/apache/apex-malhar/pull/318#discussion_r67813977
  
--- Diff: 
library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java 
---
@@ -1110,6 +1116,16 @@ public String toString()
   return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + 
" partitionIndex=" +
   partitionIndex + " partitionCount=" + partitionCount + "]";
 }
+
+protected void setPartitionIndex(int partitionIndex)
+{
+  this.partitionIndex = partitionIndex;
+}
+
+protected void setPartitionCount(int partitionCount)
+{
+  this.partitionCount = partitionCount;
+}
--- End diff --

Added test for setPartition and verified that the base function usage the 
value set.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---