[jira] [Commented] (APEXMALHAR-2121) KafkaInputOperator emitTuple method should be able to emit more than just message
[ https://issues.apache.org/jira/browse/APEXMALHAR-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343712#comment-15343712 ] ASF GitHub Bot commented on APEXMALHAR-2121: Github user tweise commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/323#discussion_r67994905 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -203,6 +204,14 @@ public AbstractKafkaInputOperator() */ protected abstract void emitTuple(Message message); --- End diff -- I don't see a backward compatibility issue, it's a new method. And yes, the same change needs to be made on the 0.9 operator also. > KafkaInputOperator emitTuple method should be able to emit more than just > message > - > > Key: APEXMALHAR-2121 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2121 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale >Priority: Minor > > AbstractKafkaInputOperator in malhar-contrib operator had emitTuple method > with parameter as "Message". User can't extend this method to access more > kafka parameters other than messages. > We should have a method to access to "KafkaMessage" so that user can choose > from multiple fields which he/she wants to emit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #323: APEXMALHAR-2121: Updating KafkaInputOperator ...
Github user tweise commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/323#discussion_r67994905 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -203,6 +204,14 @@ public AbstractKafkaInputOperator() */ protected abstract void emitTuple(Message message); --- End diff -- I don't see a backward compatibility issue, it's a new method. And yes, the same change needs to be made on the 0.9 operator also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application
Hi, IMO scheduling a job can be independent of any operator while StatsListeners are not. I understand that in a lot of cases input/output operators will decide when the job ends but there can be cases when scheduling can be independent of it. Thanks, Chandni On Jun 21, 2016 12:12 PM, "Thomas Weise"wrote: > This looks like something that coordination wise belongs into the master > and can be done with a shared stats listener. > > The operator request/response protocol could be used the relay the data for > the scheduling decisions. > > Thomas > > > On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni < > chandni.si...@capitalone.com> wrote: > > > Hi Tushar, > > > > I have some questions about the use case 2: Batch Support > > I don¹t understand the advantages of providing batch support by having an > > operator as a scheduler. > > > > An approach that seemed a little more straightforward to me was to expose > > an API for scheduler. If there is a scheduler set then the master uses > and > > schedules operators. By default there isn¹t any scheduler and the job is > > run as it is now. > > > > Maybe this is too simplistic but can you please let me know why having an > > operator as a scheduler is a better way? > > > > Thanks, > > Chandni > > > > > > On 6/21/16, 11:09 AM, "Tushar Gosavi" wrote: > > > > >Hi All, > > > > > >We have seen few use cases in field which require Apex application > > >scheduling based on some condition. This has also came up as part of > > >Batch Support in Apex previously > > >( > > > http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP > > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E) > > >. I am proposing following functionality in Apex to help scheduling > > >and better resource utilization for batch jobs. Please provide your > > >comments. > > > > > >Usecase 1 - Dynamic Dag modification. > > > > > >Each operator in DAG consumes yarn resources, sometimes it is > > >desirable to return the resources to yarn when no data is available > > >for processing, and deploy whole DAG once data starts to appear. For > > >this to happen automatically, we will need some data monitoring > > >operators running in the DAG to trigger restart and shutdown of the > > >operators in the DAG. > > > > > >Apex already have such api to dynamically change the running dag > > >through cli. We could provide similar API available to operators which > > >will trigger dag modification at runtime. This information can be > > >passed to master using heartbeat RPC and master will make > > >required changed to the DAG. let me know what do you think about it.. > > >something like below. > > >Context.beginDagChange(); > > >context.addOperator("o1") <== launch operator from previous > check-pointed > > >state. > > >context.addOperator("o2", new Operator2()) <== create new operator > > >context.addStream("s1", "reader.output", "o1.input"); > > >context.shutdown("o3"); <== delete this and downstream operators from > the > > >DAG. > > >context.apply(); <== dag changes will be send to master, and master > > >will apply these changes. > > > > > >Similarly API for other functionalities such as locality settings > > >needs to be provided. > > > > > > > > >Usecase 2 - Classic Batch Scheduling. > > > > > >Provide an API callable from operator to launch a DAG. The operator > > >will prepare an dag object and submit it to the yarn, the DAG will be > > >scheduled as a new application. This way complex schedulers can be > > >written as operators. > > > > > >public SchedulerOperator implements Operator { > > > void handleIdleTime() { > > > // check of conditions to start a job (for example enough files > > >available, enough items are available in kafa, or time has reached > > > Dag dag = context.createDAG(); > > > dag.addOperator(); > > > dag.addOperator(); > > > LaunchOptions lOptions = new LaunchOptions(); > > > lOptions.oldId = ""; // start for this checkpoint. > > > DagHandler dagHandler = context.submit(dag, lOptions); > > > } > > >} > > > > > >DagHandler will have methods to monitor the final state of > > >application, or to kill the DAG > > >dagHandler.waitForCompletion() <== wait till the DAG terminates > > >dagHandler.status() <== get the status of application. > > >dagHandler.kill() <== kill the running application. > > >dagHandler.shutdown() <== shutdown the application. > > > > > >The more complex Scheduler operators could be written to manage the > > >workflows, i.e DAG of DAGs. using these APIs. > > > > > >Regards, > > >-Tushar. > > > > > > > > The information contained in this e-mail is confidential and/or > > proprietary to Capital One and/or its affiliates and may only be used > > solely in performance of work or services for Capital One. The > information > > transmitted herewith is intended only for use by the individual or
[jira] [Closed] (APEXMALHAR-2119) Make DirectoryScanner in AbstractFileInputOperator inheritance friendly.
[ https://issues.apache.org/jira/browse/APEXMALHAR-2119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Munagala V. Ramanath closed APEXMALHAR-2119. Resolution: Fixed Fix Version/s: 3.5.0 > Make DirectoryScanner in AbstractFileInputOperator inheritance friendly. > - > > Key: APEXMALHAR-2119 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2119 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Tushar Gosavi >Assignee: Tushar Gosavi > Fix For: 3.5.0 > > > The DirectoryScanner has partitionIndex and partitionCount declared as > private without any setters. Inherited DirectoryScanner can not set them and > hence can not call most of the methods in DirectoryScanner which depends on > these fields (acceptFile). > Also new DirectoryScanner has to implement createPartition as default one > creates instance of DirectoryScanner by default. > Make the class inheritance friendly by adding setters and use kryo clone in > createPartition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343027#comment-15343027 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r67971098 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java --- @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import javax.validation.constraints.Min; + +import javax.validation.constraints.NotNull; +import javax.validation.constraints.Size; + +import org.apache.apex.malhar.lib.fs.FSRecordReader.RECORD_READER_MODE; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.Module; +import com.datatorrent.common.partitioner.StatelessPartitioner; +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.fs.FileSplitterInput; + +/** + * This module is used for reading records/tuples from FileSystem. + * Records can be read in parallel using multiple partitions of record reader operator. + * (Ordering is not guaranteed when records are read in parallel) + * + * Input directory is scanned at specified interval to poll for new data. + * + * The module reads data in parallel, following parameters can be configured + * 1. files: list of file(s)/directories to read + * 2. filePatternRegularExp: Files names matching given regex will be read + * 3. scanIntervalMillis: interval between two scans to discover new files in input directory + * 4. recursive: if scan recursively input directories + * 5. blockSize: block size used to read input blocks of file + * 6. readersCount: count of readers to read input file + * 7. sequencialFileRead: If emit file blocks in sequence? + * 8. blocksThreshold: number of blocks emitted per window + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReaderModule implements Module +{ + @NotNull + @Size(min = 1) + private String files; + private String filePatternRegularExp; + @Min(0) + private long scanIntervalMillis; + private boolean recursive = true; + private boolean sequencialFileRead = false; + private int readersCount; + @Min(1) + protected int blocksThreshold; + + public final transient ProxyOutputPortrecords = new ProxyOutputPort (); + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode; + + /** + * Length for fixed width record + */ + private int recordLength; + + public FileSplitterInput createFileSplitter() + { +return new FileSplitterInput(); + } + + public FSRecordReader createBlockReader() + { +FSRecordReader recordReader = new FSRecordReader(); +recordReader.setMode(mode); +recordReader.setRecordLength(recordLength); + +return recordReader; + } + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { +FileSplitterInput fileSplitter = dag.addOperator("FileSplitter", createFileSplitter()); +FSRecordReader recordReader = dag.addOperator("BlockReader", createBlockReader()); + +dag.addStream("BlockMetadata", fileSplitter.blocksMetadataOutput, recordReader.blocksMetadataInput); + +if (sequencialFileRead) { + dag.setInputPortAttribute(recordReader.blocksMetadataInput, Context.PortContext.STREAM_CODEC, + new SequentialFileBlockMetadataCodec()); +} + +
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343016#comment-15343016 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r67970457 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java --- @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem + * in parallel (without ordering guarantees between tuples). + * Records can be delimited (e.g. newline) or fixed with records. + * Output tuples are byte[]. + * + * Typically, this operator will be connected to output of FileSplitterInput + * to read records in parallel. + */ +@org.apache.hadoop.classification.InterfaceStability.Evolving +public class FSRecordReader extends FSSliceReader +{ + /** + * Record reader mode decides how to split the records. + */ + public static enum RECORD_READER_MODE + { +DELIMITED_RECORD, FIXED_WIDTH_RECORD; + } + + /** + * Criteria for record split + */ + private RECORD_READER_MODE mode = RECORD_READER_MODE.DELIMITED_RECORD; + + /** + * Length for fixed width record + */ + private int recordLength; + + /** + * Port to emit individual records/tuples as byte[] + */ + public final transient DefaultOutputPortrecords = new DefaultOutputPort (); + + /** + * Initialize appropriate reader context based on mode selection + */ + @Override + public void setup(OperatorContext context) + { +super.setup(context); +if (mode == RECORD_READER_MODE.FIXED_WIDTH_RECORD) { + ReaderContext.FixedBytesReaderContext fixedBytesReaderContext = new ReaderContext.FixedBytesReaderContext(); + fixedBytesReaderContext.setLength(recordLength); + readerContext = fixedBytesReaderContext; +} else { + readerContext = new ReaderContext.ReadAheadLineReaderContext(); +} + } + + /** + * Read the block data and emit records based on reader context + * + * @param blockMetadata block + * @throws IOException + */ + protected void readBlock(BlockMetadata blockMetadata) throws IOException + { +readerContext.initialize(stream, blockMetadata, consecutiveBlock); +ReaderContext.Entity entity; +while ((entity = readerContext.next()) != null) { + + counters.getCounter(ReaderCounterKeys.BYTES).add(entity.getUsedBytes()); + + byte[] record = entity.getRecord(); + + //If the record is partial then ignore the record. --- End diff -- why 'partial' rather than 'null' ? > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor >
[GitHub] apex-malhar pull request #326: APEXMALHAR-2116 Added FS record reader operat...
Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r67969125 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java --- @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem + * in parallel (without ordering guarantees between tuples). + * Records can be delimited (e.g. newline) or fixed with records. --- End diff -- fixed with => fixed width --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342995#comment-15342995 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r67969125 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java --- @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem + * in parallel (without ordering guarantees between tuples). + * Records can be delimited (e.g. newline) or fixed with records. --- End diff -- fixed with => fixed width > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342992#comment-15342992 ] ASF GitHub Bot commented on APEXMALHAR-2116: Github user amberarrow commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/326#discussion_r67968963 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java --- @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.malhar.lib.fs; + +import java.io.IOException; + +import org.apache.commons.beanutils.ConvertUtils; +import org.apache.commons.beanutils.converters.AbstractConverter; +import org.apache.hadoop.fs.FSDataInputStream; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.lib.io.block.BlockMetadata; +import com.datatorrent.lib.io.block.FSSliceReader; +import com.datatorrent.lib.io.block.ReaderContext; + +/** + * This operator can be used for reading records/tuples from Filesystem --- End diff -- This line and several others have trailing whitespace; this is generally considered not a good thing -- see for instance: http://programmers.stackexchange.com/questions/121555/why-is-trailing-whitespace-a-big-deal Suggest removing all trailing white space. > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2119) Make DirectoryScanner in AbstractFileInputOperator inheritance friendly.
[ https://issues.apache.org/jira/browse/APEXMALHAR-2119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342899#comment-15342899 ] ASF GitHub Bot commented on APEXMALHAR-2119: Github user asfgit closed the pull request at: https://github.com/apache/apex-malhar/pull/318 > Make DirectoryScanner in AbstractFileInputOperator inheritance friendly. > - > > Key: APEXMALHAR-2119 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2119 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Tushar Gosavi >Assignee: Tushar Gosavi > > The DirectoryScanner has partitionIndex and partitionCount declared as > private without any setters. Inherited DirectoryScanner can not set them and > hence can not call most of the methods in DirectoryScanner which depends on > these fields (acceptFile). > Also new DirectoryScanner has to implement createPartition as default one > creates instance of DirectoryScanner by default. > Make the class inheritance friendly by adding setters and use kryo clone in > createPartition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #318: APEXMALHAR-2119 add setters for partition cou...
Github user asfgit closed the pull request at: https://github.com/apache/apex-malhar/pull/318 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2121) KafkaInputOperator emitTuple method should be able to emit more than just message
[ https://issues.apache.org/jira/browse/APEXMALHAR-2121?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342652#comment-15342652 ] ASF GitHub Bot commented on APEXMALHAR-2121: Github user siyuanh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/323#discussion_r67949068 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -203,6 +204,14 @@ public AbstractKafkaInputOperator() */ protected abstract void emitTuple(Message message); --- End diff -- That's why I'm asking can we do that. It does have compatibility issue but on the other hand if some one wants to emit the tuple along with partition id etc, he/she is forced to implement the emitTuple method as well. Well, we can keep it as it is now. @DT-Priyanka can you file a jira to make this change in 0.9 kafka input operator. We can change the existing interface for that one > KafkaInputOperator emitTuple method should be able to emit more than just > message > - > > Key: APEXMALHAR-2121 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2121 > Project: Apache Apex Malhar > Issue Type: Improvement >Reporter: Priyanka Gugale >Assignee: Priyanka Gugale >Priority: Minor > > AbstractKafkaInputOperator in malhar-contrib operator had emitTuple method > with parameter as "Message". User can't extend this method to access more > kafka parameters other than messages. > We should have a method to access to "KafkaMessage" so that user can choose > from multiple fields which he/she wants to emit. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #323: APEXMALHAR-2121: Updating KafkaInputOperator ...
Github user siyuanh commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/323#discussion_r67949068 --- Diff: contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java --- @@ -203,6 +204,14 @@ public AbstractKafkaInputOperator() */ protected abstract void emitTuple(Message message); --- End diff -- That's why I'm asking can we do that. It does have compatibility issue but on the other hand if some one wants to emit the tuple along with partition id etc, he/she is forced to implement the emitTuple method as well. Well, we can keep it as it is now. @DT-Priyanka can you file a jira to make this change in 0.9 kafka input operator. We can change the existing interface for that one --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342485#comment-15342485 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user tweise commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67934148 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + public static class PmfSink extends BaseOperator + { +public final DefaultInputPortinput = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public PmfSink() {} + } + + public static class QuantileSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public QuantileSink() {} + } + + public static class CdfSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort() +{ + @Override + public void process(Double tuple) {} +}; + +public CdfSink() {} + } + + @Test + public void testQuantiles() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.quantilesOutput, sink); + +Random rand = new Random(1234L); +ArrayList randArray = new ArrayList<>(); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 101; i++) { + double r = rand.nextGaussian(); + quantilesOp.data.process(r); + randArray.add(r); +} + +quantilesOp.endWindow(); + +Collections.sort(randArray); + +Assert.assertEquals("Captures all computed quantiles", sink.collectedTuples.size(), 101); +Assert.assertTrue("Computes median correctly", randArray.get(50) == sink.collectedTuples.get(100)[2]); + } + + @Test + public void testCDF() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342484#comment-15342484 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user tweise commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67933939 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + @Test + public void testQuantiles() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSinksink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.quantilesOutput, sink); + +Random rand = new Random(1234L); +ArrayList randArray = new ArrayList<>(); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 101; i++) { + double r = rand.nextGaussian(); + quantilesOp.data.process(r); + randArray.add(r); +} + +quantilesOp.endWindow(); + +Collections.sort(randArray); + +Assert.assertEquals("Captures all computed quantiles", sink.collectedTuples.size(), 101); +Assert.assertTrue("Computes median correctly", randArray.get(50) == sink.collectedTuples.get(100)[2]); + } + + @Test + public void testCDF() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.cdfOutput, sink); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 1001; i++) { + double r = 0.001 * i; + quantilesOp.data.process(r); +} +quantilesOp.endWindow(); + +List cdfValues = sink.collectedTuples; +Assert.assertTrue("Highest CDF value is approx 1.0", cdfValues.get(cdfValues.size() - 1) >= 0.99 && +cdfValues.get(cdfValues.size() - 1) <= 1.0); +Assert.assertTrue("Lowest CDF value is approx 0.0", cdfValues.get(0) >= 0.0 && +cdfValues.get(0) <= 0.01); + } + + @Test + public void testPMF() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); +double[] intervals = {0.0, 0.20, 0.40, 0.60, 0.80, 1.0}; +quantilesOp.setPmfIntervals(intervals); + +CollectorTestSink sink = new
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user tweise commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67933939 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + @Test + public void testQuantiles() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSinksink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.quantilesOutput, sink); + +Random rand = new Random(1234L); +ArrayList randArray = new ArrayList<>(); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 101; i++) { + double r = rand.nextGaussian(); + quantilesOp.data.process(r); + randArray.add(r); +} + +quantilesOp.endWindow(); + +Collections.sort(randArray); + +Assert.assertEquals("Captures all computed quantiles", sink.collectedTuples.size(), 101); +Assert.assertTrue("Computes median correctly", randArray.get(50) == sink.collectedTuples.get(100)[2]); + } + + @Test + public void testCDF() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.cdfOutput, sink); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 1001; i++) { + double r = 0.001 * i; + quantilesOp.data.process(r); +} +quantilesOp.endWindow(); + +List cdfValues = sink.collectedTuples; +Assert.assertTrue("Highest CDF value is approx 1.0", cdfValues.get(cdfValues.size() - 1) >= 0.99 && +cdfValues.get(cdfValues.size() - 1) <= 1.0); +Assert.assertTrue("Lowest CDF value is approx 0.0", cdfValues.get(0) >= 0.0 && +cdfValues.get(0) <= 0.01); + } + + @Test + public void testPMF() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); +double[] intervals = {0.0, 0.20, 0.40, 0.60, 0.80, 1.0}; +quantilesOp.setPmfIntervals(intervals); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.pmfOutput, sink); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); +for (int i = 0; i < 1000; i++) { + quantilesOp.data.process(0.001 * i); +} +
Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application
This looks like something that coordination wise belongs into the master and can be done with a shared stats listener. The operator request/response protocol could be used the relay the data for the scheduling decisions. Thomas On Tue, Jun 21, 2016 at 11:38 AM, Singh, Chandni < chandni.si...@capitalone.com> wrote: > Hi Tushar, > > I have some questions about the use case 2: Batch Support > I don¹t understand the advantages of providing batch support by having an > operator as a scheduler. > > An approach that seemed a little more straightforward to me was to expose > an API for scheduler. If there is a scheduler set then the master uses and > schedules operators. By default there isn¹t any scheduler and the job is > run as it is now. > > Maybe this is too simplistic but can you please let me know why having an > operator as a scheduler is a better way? > > Thanks, > Chandni > > > On 6/21/16, 11:09 AM, "Tushar Gosavi"wrote: > > >Hi All, > > > >We have seen few use cases in field which require Apex application > >scheduling based on some condition. This has also came up as part of > >Batch Support in Apex previously > >( > http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDP > >XNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E) > >. I am proposing following functionality in Apex to help scheduling > >and better resource utilization for batch jobs. Please provide your > >comments. > > > >Usecase 1 - Dynamic Dag modification. > > > >Each operator in DAG consumes yarn resources, sometimes it is > >desirable to return the resources to yarn when no data is available > >for processing, and deploy whole DAG once data starts to appear. For > >this to happen automatically, we will need some data monitoring > >operators running in the DAG to trigger restart and shutdown of the > >operators in the DAG. > > > >Apex already have such api to dynamically change the running dag > >through cli. We could provide similar API available to operators which > >will trigger dag modification at runtime. This information can be > >passed to master using heartbeat RPC and master will make > >required changed to the DAG. let me know what do you think about it.. > >something like below. > >Context.beginDagChange(); > >context.addOperator("o1") <== launch operator from previous check-pointed > >state. > >context.addOperator("o2", new Operator2()) <== create new operator > >context.addStream("s1", "reader.output", "o1.input"); > >context.shutdown("o3"); <== delete this and downstream operators from the > >DAG. > >context.apply(); <== dag changes will be send to master, and master > >will apply these changes. > > > >Similarly API for other functionalities such as locality settings > >needs to be provided. > > > > > >Usecase 2 - Classic Batch Scheduling. > > > >Provide an API callable from operator to launch a DAG. The operator > >will prepare an dag object and submit it to the yarn, the DAG will be > >scheduled as a new application. This way complex schedulers can be > >written as operators. > > > >public SchedulerOperator implements Operator { > > void handleIdleTime() { > > // check of conditions to start a job (for example enough files > >available, enough items are available in kafa, or time has reached > > Dag dag = context.createDAG(); > > dag.addOperator(); > > dag.addOperator(); > > LaunchOptions lOptions = new LaunchOptions(); > > lOptions.oldId = ""; // start for this checkpoint. > > DagHandler dagHandler = context.submit(dag, lOptions); > > } > >} > > > >DagHandler will have methods to monitor the final state of > >application, or to kill the DAG > >dagHandler.waitForCompletion() <== wait till the DAG terminates > >dagHandler.status() <== get the status of application. > >dagHandler.kill() <== kill the running application. > >dagHandler.shutdown() <== shutdown the application. > > > >The more complex Scheduler operators could be written to manage the > >workflows, i.e DAG of DAGs. using these APIs. > > > >Regards, > >-Tushar. > > > > The information contained in this e-mail is confidential and/or > proprietary to Capital One and/or its affiliates and may only be used > solely in performance of work or services for Capital One. The information > transmitted herewith is intended only for use by the individual or entity > to which it is addressed. If the reader of this message is not the intended > recipient, you are hereby notified that any review, retransmission, > dissemination, distribution, copying or other use of, or taking of any > action in reliance upon this information is strictly prohibited. If you > have received this communication in error, please contact the sender and > delete the material from your computer. > >
Re: APEXCORE-408 : Ability to schedule Sub-DAG from running application
For the usecase 1, is it possible to avoid changing the Context? Can we have something along the lines of "StramToNodeRequest" ? On Tue, Jun 21, 2016 at 11:09 AM Tushar Gosaviwrote: > Hi All, > > We have seen few use cases in field which require Apex application > scheduling based on some condition. This has also came up as part of > Batch Support in Apex previously > ( > http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDPXNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E > ) > . I am proposing following functionality in Apex to help scheduling > and better resource utilization for batch jobs. Please provide your > comments. > > Usecase 1 - Dynamic Dag modification. > > Each operator in DAG consumes yarn resources, sometimes it is > desirable to return the resources to yarn when no data is available > for processing, and deploy whole DAG once data starts to appear. For > this to happen automatically, we will need some data monitoring > operators running in the DAG to trigger restart and shutdown of the > operators in the DAG. > > Apex already have such api to dynamically change the running dag > through cli. We could provide similar API available to operators which > will trigger dag modification at runtime. This information can be > passed to master using heartbeat RPC and master will make > required changed to the DAG. let me know what do you think about it.. > something like below. > Context.beginDagChange(); > context.addOperator("o1") <== launch operator from previous check-pointed > state. > context.addOperator("o2", new Operator2()) <== create new operator > context.addStream("s1", "reader.output", "o1.input"); > context.shutdown("o3"); <== delete this and downstream operators from the > DAG. > context.apply(); <== dag changes will be send to master, and master > will apply these changes. > > Similarly API for other functionalities such as locality settings > needs to be provided. > > > Usecase 2 - Classic Batch Scheduling. > > Provide an API callable from operator to launch a DAG. The operator > will prepare an dag object and submit it to the yarn, the DAG will be > scheduled as a new application. This way complex schedulers can be > written as operators. > > public SchedulerOperator implements Operator { >void handleIdleTime() { > // check of conditions to start a job (for example enough files > available, enough items are available in kafa, or time has reached > Dag dag = context.createDAG(); > dag.addOperator(); > dag.addOperator(); > LaunchOptions lOptions = new LaunchOptions(); > lOptions.oldId = ""; // start for this checkpoint. > DagHandler dagHandler = context.submit(dag, lOptions); >} > } > > DagHandler will have methods to monitor the final state of > application, or to kill the DAG > dagHandler.waitForCompletion() <== wait till the DAG terminates > dagHandler.status() <== get the status of application. > dagHandler.kill() <== kill the running application. > dagHandler.shutdown() <== shutdown the application. > > The more complex Scheduler operators could be written to manage the > workflows, i.e DAG of DAGs. using these APIs. > > Regards, > -Tushar. >
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15342403#comment-15342403 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user sandeep-n commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67925991 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + public static class PmfSink extends BaseOperator + { +public final DefaultInputPortinput = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public PmfSink() {} + } + + public static class QuantileSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public QuantileSink() {} + } + + public static class CdfSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort() +{ + @Override + public void process(Double tuple) {} +}; + +public CdfSink() {} + } + + @Test + public void testQuantiles() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.quantilesOutput, sink); + +Random rand = new Random(1234L); +ArrayList randArray = new ArrayList<>(); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 101; i++) { + double r = rand.nextGaussian(); + quantilesOp.data.process(r); + randArray.add(r); +} + +quantilesOp.endWindow(); + +Collections.sort(randArray); + +Assert.assertEquals("Captures all computed quantiles", sink.collectedTuples.size(), 101); +Assert.assertTrue("Computes median correctly", randArray.get(50) == sink.collectedTuples.get(100)[2]); + } + + @Test + public void testCDF() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user sandeep-n commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67925991 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + public static class PmfSink extends BaseOperator + { +public final DefaultInputPortinput = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public PmfSink() {} + } + + public static class QuantileSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public QuantileSink() {} + } + + public static class CdfSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort() +{ + @Override + public void process(Double tuple) {} +}; + +public CdfSink() {} + } + + @Test + public void testQuantiles() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.quantilesOutput, sink); + +Random rand = new Random(1234L); +ArrayList randArray = new ArrayList<>(); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 101; i++) { + double r = rand.nextGaussian(); + quantilesOp.data.process(r); + randArray.add(r); +} + +quantilesOp.endWindow(); + +Collections.sort(randArray); + +Assert.assertEquals("Captures all computed quantiles", sink.collectedTuples.size(), 101); +Assert.assertTrue("Computes median correctly", randArray.get(50) == sink.collectedTuples.get(100)[2]); + } + + @Test + public void testCDF() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.cdfOutput, sink); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 1001; i++) { + double r = 0.001 * i; + quantilesOp.data.process(r); +
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user sandeep-n commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67925558 --- Diff: pom.xml --- @@ -212,6 +212,7 @@ library contrib demos +sketches --- End diff -- Thanks for the heads-up! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
APEXCORE-408 : Ability to schedule Sub-DAG from running application
Hi All, We have seen few use cases in field which require Apex application scheduling based on some condition. This has also came up as part of Batch Support in Apex previously (http://mail-archives.apache.org/mod_mbox/apex-dev/201602.mbox/%3CCAKJfLDPXNsG1kEQs4T_2e%3DweRJLs4VeL%2B1-7LxO_bjovD9%2B-Rw%40mail.gmail.com%3E) . I am proposing following functionality in Apex to help scheduling and better resource utilization for batch jobs. Please provide your comments. Usecase 1 - Dynamic Dag modification. Each operator in DAG consumes yarn resources, sometimes it is desirable to return the resources to yarn when no data is available for processing, and deploy whole DAG once data starts to appear. For this to happen automatically, we will need some data monitoring operators running in the DAG to trigger restart and shutdown of the operators in the DAG. Apex already have such api to dynamically change the running dag through cli. We could provide similar API available to operators which will trigger dag modification at runtime. This information can be passed to master using heartbeat RPC and master will make required changed to the DAG. let me know what do you think about it.. something like below. Context.beginDagChange(); context.addOperator("o1") <== launch operator from previous check-pointed state. context.addOperator("o2", new Operator2()) <== create new operator context.addStream("s1", "reader.output", "o1.input"); context.shutdown("o3"); <== delete this and downstream operators from the DAG. context.apply(); <== dag changes will be send to master, and master will apply these changes. Similarly API for other functionalities such as locality settings needs to be provided. Usecase 2 - Classic Batch Scheduling. Provide an API callable from operator to launch a DAG. The operator will prepare an dag object and submit it to the yarn, the DAG will be scheduled as a new application. This way complex schedulers can be written as operators. public SchedulerOperator implements Operator { void handleIdleTime() { // check of conditions to start a job (for example enough files available, enough items are available in kafa, or time has reached Dag dag = context.createDAG(); dag.addOperator(); dag.addOperator(); LaunchOptions lOptions = new LaunchOptions(); lOptions.oldId = ""; // start for this checkpoint. DagHandler dagHandler = context.submit(dag, lOptions); } } DagHandler will have methods to monitor the final state of application, or to kill the DAG dagHandler.waitForCompletion() <== wait till the DAG terminates dagHandler.status() <== get the status of application. dagHandler.kill() <== kill the running application. dagHandler.shutdown() <== shutdown the application. The more complex Scheduler operators could be written to manage the workflows, i.e DAG of DAGs. using these APIs. Regards, -Tushar.
[jira] [Commented] (APEXMALHAR-2116) File Record reader module
[ https://issues.apache.org/jira/browse/APEXMALHAR-2116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341888#comment-15341888 ] ASF GitHub Bot commented on APEXMALHAR-2116: GitHub user yogidevendra opened a pull request: https://github.com/apache/apex-malhar/pull/326 APEXMALHAR-2116 Added FS record reader operator, module, test You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogidevendra/apex-malhar APEXMALHAR-2116-record-reader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #326 commit aaeb7fe34dc69de71dae120ea01452149389854f Author: yogidevendraDate: 2016-06-20T05:47:08Z Added FS record reader operator, module, test > File Record reader module > - > > Key: APEXMALHAR-2116 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2116 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Yogi Devendra >Assignee: Yogi Devendra > > This will be useful for the usecases which involves reading from files "line > by line" in parallel and emit each line as seperate tuple. > Proposal is to have new Module which would allow users to monitor > directories, read files and emit data records(tuple). Records are based on > record separator (e.g. newline) or fixed size (no of bytes). > Plan is as follows: > 1. New operator FileRecordReader which will extend BlockReader. > 2. This operator will have configuration option to select mode for > FIXED_LENGTH, SEPARATOR_BASED recors. > 3. Using appropriate ReaderContext based on mode. > 4. New module FileRecordReaderModule which wraps (FileSplitter (existing) + > FileRecordReader operator) > Reason for having different operator than BlockReader is because output port > signature is different than BlockReader. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #326: APEXMALHAR-2116 Added FS record reader operat...
GitHub user yogidevendra opened a pull request: https://github.com/apache/apex-malhar/pull/326 APEXMALHAR-2116 Added FS record reader operator, module, test You can merge this pull request into a Git repository by running: $ git pull https://github.com/yogidevendra/apex-malhar APEXMALHAR-2116-record-reader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #326 commit aaeb7fe34dc69de71dae120ea01452149389854f Author: yogidevendraDate: 2016-06-20T05:47:08Z Added FS record reader operator, module, test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67852697 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + public static class PmfSink extends BaseOperator + { +public final DefaultInputPortinput = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public PmfSink() {} + } + + public static class QuantileSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public QuantileSink() {} + } + + public static class CdfSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort() +{ + @Override + public void process(Double tuple) {} +}; + +public CdfSink() {} + } + + @Test + public void testQuantiles() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.quantilesOutput, sink); + +Random rand = new Random(1234L); +ArrayList randArray = new ArrayList<>(); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 101; i++) { + double r = rand.nextGaussian(); + quantilesOp.data.process(r); + randArray.add(r); +} + +quantilesOp.endWindow(); + +Collections.sort(randArray); + +Assert.assertEquals("Captures all computed quantiles", sink.collectedTuples.size(), 101); +Assert.assertTrue("Computes median correctly", randArray.get(50) == sink.collectedTuples.get(100)[2]); + } + + @Test + public void testCDF() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.cdfOutput, sink); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 1001; i++) { + double r = 0.001 * i; + quantilesOp.data.process(r); +
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341617#comment-15341617 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67852390 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + public static class PmfSink extends BaseOperator + { +public final DefaultInputPortinput = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public PmfSink() {} + } + + public static class QuantileSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public QuantileSink() {} + } + + public static class CdfSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort() +{ + @Override + public void process(Double tuple) {} +}; + +public CdfSink() {} + } + + @Test + public void testQuantiles() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.quantilesOutput, sink); + +Random rand = new Random(1234L); +ArrayList randArray = new ArrayList<>(); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 101; i++) { + double r = rand.nextGaussian(); + quantilesOp.data.process(r); + randArray.add(r); +} + +quantilesOp.endWindow(); + +Collections.sort(randArray); + +Assert.assertEquals("Captures all computed quantiles", sink.collectedTuples.size(), 101); +Assert.assertTrue("Computes median correctly", randArray.get(50) == sink.collectedTuples.get(100)[2]); + } + + @Test + public void testCDF() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67852390 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + public static class PmfSink extends BaseOperator + { +public final DefaultInputPortinput = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public PmfSink() {} + } + + public static class QuantileSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public QuantileSink() {} + } + + public static class CdfSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort() +{ + @Override + public void process(Double tuple) {} +}; + +public CdfSink() {} + } + + @Test + public void testQuantiles() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.quantilesOutput, sink); + +Random rand = new Random(1234L); +ArrayList randArray = new ArrayList<>(); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 101; i++) { + double r = rand.nextGaussian(); + quantilesOp.data.process(r); + randArray.add(r); +} + +quantilesOp.endWindow(); + +Collections.sort(randArray); + +Assert.assertEquals("Captures all computed quantiles", sink.collectedTuples.size(), 101); +Assert.assertTrue("Computes median correctly", randArray.get(50) == sink.collectedTuples.get(100)[2]); + } + + @Test + public void testCDF() + { +QuantilesEstimator quantilesOp = new QuantilesEstimator(128, (short)12345); + +CollectorTestSink sink = new CollectorTestSink<>(); +TestUtils.setSink(quantilesOp.cdfOutput, sink); + +quantilesOp.setup(null); +quantilesOp.beginWindow(0); + +for (int i = 0; i < 1001; i++) { + double r = 0.001 * i; + quantilesOp.data.process(r); +
[jira] [Commented] (APEXMALHAR-2094) Quantiles sketch operator
[ https://issues.apache.org/jira/browse/APEXMALHAR-2094?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341612#comment-15341612 ] ASF GitHub Bot commented on APEXMALHAR-2094: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67852225 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + public static class PmfSink extends BaseOperator + { +public final DefaultInputPortinput = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public PmfSink() {} + } + + public static class QuantileSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public QuantileSink() {} + } + + public static class CdfSink extends BaseOperator --- End diff -- Not used > Quantiles sketch operator > - > > Key: APEXMALHAR-2094 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2094 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Sandeep Narayanaswami >Assignee: Sandeep Narayanaswami >Priority: Minor > > An operator that "sketches" in an online fashion the probability distribution > of an input (numeric) data stream, enabling computation of quantiles and > cumulative distribution functions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67852225 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + public static class PmfSink extends BaseOperator + { +public final DefaultInputPortinput = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public PmfSink() {} + } + + public static class QuantileSink extends BaseOperator + { +public final DefaultInputPort input = new DefaultInputPort () +{ + @Override + public void process(double[] tuple) {} +}; + +public QuantileSink() {} + } + + public static class CdfSink extends BaseOperator --- End diff -- Not used --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #301: [APEXMALHAR-2094] Quantiles operator
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/301#discussion_r67852192 --- Diff: sketches/src/test/java/org/apache/apex/malhar/sketches/QuantilesEstimatorTest.java --- @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.sketches; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.TestUtils; + +public class QuantilesEstimatorTest +{ + private static Logger LOG = LoggerFactory.getLogger(QuantilesEstimatorTest.class); + + public static class NumberSource extends BaseOperator implements InputOperator + { +public final DefaultOutputPort output = new DefaultOutputPort<>(); + +private Random rand = new Random(1234L); + +public NumberSource() {} + +@Override +public void emitTuples() +{ + output.emit(rand.nextGaussian()); +} + } + + public static class PmfSink extends BaseOperator --- End diff -- This is not used anywhere. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger
[ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341509#comment-15341509 ] ASF GitHub Bot commented on APEXMALHAR-2106: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r67842738 --- Diff: library/src/test/java/com/datatorrent/lib/stream/MultipleStreamMergerTest.java --- @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.testbench.RandomWordGenerator; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; + +public class MultipleStreamMergerTest +{ + private static Logger LOG = LoggerFactory.getLogger(MultipleStreamMergerTest.class); + + private ArrayList.Stream> streamsToAddToDag; + private ArrayList .NamedMerger> operatorsToAdd; + + private static Counter counterOp = new Counter(); + + private static int tuplesToSend = 173; + private static int streamsToMerge = 15; + + private static Map tuplesReceived = new HashMap<>(); + + @Before + public void setUp() throws Exception + { +streamsToAddToDag = new ArrayList<>(); +operatorsToAdd = new ArrayList<>(); + } + + @Test + public void mergeTwoStreams() + { +RandomWordGenerator randomWordGenerator = new RandomWordGenerator(); +RandomWordGenerator randomWordGenerator2 = new RandomWordGenerator(); + +randomWordGenerator.setTuplesPerWindow(1); +randomWordGenerator2.setTuplesPerWindow(1); + +MultipleStreamMerger merger = new MultipleStreamMerger<>(); +merger.merge(randomWordGenerator.output) +.merge(randomWordGenerator2.output); + +merger.constructMergeTree(streamsToAddToDag, operatorsToAdd); + +assertEquals("Count of created streams", 2, streamsToAddToDag.size()); +assertEquals("Count of created operators", 1, operatorsToAdd.size()); + +// Next check actual connections +assertEquals("Generator 1 stream", randomWordGenerator.output, +streamsToAddToDag.get(0).sourcePort.port); + +assertEquals("Generator 2 stream", randomWordGenerator2.output, +streamsToAddToDag.get(1).sourcePort.port); + +assertEquals("Final operator input_1", operatorsToAdd.get(0).merger.data1, streamsToAddToDag.get(0).destPort); +assertEquals("Final operator input_2", operatorsToAdd.get(0).merger.data2, streamsToAddToDag.get(1).destPort); + } + + @Test(expected = IllegalArgumentException.class) + public void mergeOneStream() + { +RandomWordGenerator randomWordGenerator = new RandomWordGenerator(); +MultipleStreamMerger merger = new MultipleStreamMerger<>(); +merger.merge(randomWordGenerator.output); +merger.constructMergeTree(streamsToAddToDag, operatorsToAdd); + } + + @Test(expected = IllegalArgumentException.class) + public void mergeZeroStream()
[GitHub] apex-malhar pull request #309: [APEXMALHAR-2106] Support multiple streams in...
Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r67842738 --- Diff: library/src/test/java/com/datatorrent/lib/stream/MultipleStreamMergerTest.java --- @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.stream; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.testbench.RandomWordGenerator; + +import static junit.framework.TestCase.assertTrue; +import static org.junit.Assert.assertEquals; + +public class MultipleStreamMergerTest +{ + private static Logger LOG = LoggerFactory.getLogger(MultipleStreamMergerTest.class); + + private ArrayList.Stream> streamsToAddToDag; + private ArrayList .NamedMerger> operatorsToAdd; + + private static Counter counterOp = new Counter(); + + private static int tuplesToSend = 173; + private static int streamsToMerge = 15; + + private static Map tuplesReceived = new HashMap<>(); + + @Before + public void setUp() throws Exception + { +streamsToAddToDag = new ArrayList<>(); +operatorsToAdd = new ArrayList<>(); + } + + @Test + public void mergeTwoStreams() + { +RandomWordGenerator randomWordGenerator = new RandomWordGenerator(); +RandomWordGenerator randomWordGenerator2 = new RandomWordGenerator(); + +randomWordGenerator.setTuplesPerWindow(1); +randomWordGenerator2.setTuplesPerWindow(1); + +MultipleStreamMerger merger = new MultipleStreamMerger<>(); +merger.merge(randomWordGenerator.output) +.merge(randomWordGenerator2.output); + +merger.constructMergeTree(streamsToAddToDag, operatorsToAdd); + +assertEquals("Count of created streams", 2, streamsToAddToDag.size()); +assertEquals("Count of created operators", 1, operatorsToAdd.size()); + +// Next check actual connections +assertEquals("Generator 1 stream", randomWordGenerator.output, +streamsToAddToDag.get(0).sourcePort.port); + +assertEquals("Generator 2 stream", randomWordGenerator2.output, +streamsToAddToDag.get(1).sourcePort.port); + +assertEquals("Final operator input_1", operatorsToAdd.get(0).merger.data1, streamsToAddToDag.get(0).destPort); +assertEquals("Final operator input_2", operatorsToAdd.get(0).merger.data2, streamsToAddToDag.get(1).destPort); + } + + @Test(expected = IllegalArgumentException.class) + public void mergeOneStream() + { +RandomWordGenerator randomWordGenerator = new RandomWordGenerator(); +MultipleStreamMerger merger = new MultipleStreamMerger<>(); +merger.merge(randomWordGenerator.output); +merger.constructMergeTree(streamsToAddToDag, operatorsToAdd); + } + + @Test(expected = IllegalArgumentException.class) + public void mergeZeroStream() + { +MultipleStreamMerger merger = new MultipleStreamMerger<>(); +merger.constructMergeTree(streamsToAddToDag, operatorsToAdd); + } + + public static class Application implements StreamingApplication + {
[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger
[ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341487#comment-15341487 ] ASF GitHub Bot commented on APEXMALHAR-2106: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r67841249 --- Diff: library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java --- @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.stream; + +import java.util.ArrayList; +import java.util.Queue; + +import org.eclipse.jetty.util.ArrayQueue; --- End diff -- Why not use the java.util implementations instead? May be LinkedList? > Support merging multiple streams with StreamMerger > --- > > Key: APEXMALHAR-2106 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2106 > Project: Apache Apex Malhar > Issue Type: New Feature >Reporter: Ilya Ganelin >Assignee: Ilya Ganelin > > To properly implement the Flatten transformation (and other Stream > combination operations), Apex must support merging data from multiple > sources. The StreamMerger operator can be improved to merge multiple streams, > rather than just the two streams it can handle in the present implementation. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (APEXMALHAR-2106) Support merging multiple streams with StreamMerger
[ https://issues.apache.org/jira/browse/APEXMALHAR-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341485#comment-15341485 ] ASF GitHub Bot commented on APEXMALHAR-2106: Github user bhupeshchawda commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/309#discussion_r67841160 --- Diff: library/src/main/java/com/datatorrent/lib/stream/MultipleStreamMerger.java --- @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.lib.stream; + +import java.util.ArrayList; +import java.util.Queue; + +import org.eclipse.jetty.util.ArrayQueue; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; + +/** + * A helper class that adds functionality to bypass the platform limitations of combining more than two + * streams at a time with Stream Merger. + * + * Usage: + * + * dag.addOperator("Stream_1", op1); + * dag.addOperator("Stream_2", op2); + * dag.addOperator("Stream_3", op3); + * + * MultipleStreamMerger merger = new MultipleStreamMerger(); + * DefaultOutputPort streamOut = merger.merge(op1.out) + * .merge(op2.out) + * .merge(op3.out) + * .mergeStreams(dag, conf); + * + * dag.addStream("merger-counter", streamOut, counterOp.counter); + * + * @param + */ +public class MultipleStreamMerger +{ + public class Stream + { +DefaultInputPort destPort; +SourcedOutputPort sourcePort; +String name; + +public Stream(String name, SourcedOutputPort sourcePort, DefaultInputPort destPort) +{ + this.destPort = destPort; + this.sourcePort = sourcePort; + this.name = name; +} + } + + public class NamedMerger + { +StreamMerger merger; +String name; + +public NamedMerger(String name, StreamMerger merger) +{ + this.merger = merger; + this.name = name; +} + } + + /** + * A simple class to allow us to track whether the port to be merged is internal (allowing it to be thread local) + * or external + */ + public class SourcedOutputPort + { +boolean internal; +DefaultOutputPort port; + +public SourcedOutputPort(DefaultOutputPort port) +{ + this.internal = false; + this.port = port; +} + +public SourcedOutputPort(boolean internal, DefaultOutputPort port) +{ + this.internal = internal; + this.port = port; +} + } + + ArrayListstreamsToMerge = new ArrayList<>(); + + private DefaultOutputPort streamOutput = new DefaultOutputPort<>(); + + /** + * Used to define all the sources to be merged into a single stream. + * + * @param sourcePort - The output port from the upstream operator that provides data + * @return The updated MultipleStreamMerger object that tracks which streams should be unified. + */ + public MultipleStreamMerger merge(DefaultOutputPort sourcePort) + { +streamsToMerge.add(sourcePort); +return this; + } + + + /** + * Given the streams to merge have been selected with {@link #merge(DefaultOutputPort)}, create a subDAG and add it + * to an existing DAG. + * + * To merge more than two streams at a time, we construct a tiered hierarchy of thread-local StreamMerger operators + * E.g. + * + * Stream 0 -> + *StreamMerger_1 -> + * Stream 1 -> + * StreamMerger_Final -> Out + * Stream 2 -> + *StreamMerger_2 -> + * Stream 3 -> + * Note that we don't use the populateDAG function because that is only used to
[jira] [Commented] (APEXMALHAR-2122) Upgrading elasticsearch java api from 1.1.2 to 2.3.3.
[ https://issues.apache.org/jira/browse/APEXMALHAR-2122?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341286#comment-15341286 ] ASF GitHub Bot commented on APEXMALHAR-2122: GitHub user akshay-harale opened a pull request: https://github.com/apache/apex-malhar/pull/325 APEXMALHAR-2122 Upgrading elasticsearch java api from 1.1.2 to 2.3.3. Upgrading elasticsearch java api from 1.1.2 to 2.3.3. Made necessary changes in operator. Fixed build issues recognised while migration. You can merge this pull request into a Git repository by running: $ git pull https://github.com/akshay-harale/apex-malhar master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/325.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #325 commit ff5dbaad0ae349377ae9e20f19275e54189a5e07 Author: Akshay HaraleDate: 2016-06-21T07:03:36Z Upgrading elasticsearch java api from 1.1.2 to 2.3.3. Made necessary changes in operator. Fixed build issues recognised while migration. > Upgrading elasticsearch java api from 1.1.2 to 2.3.3. > - > > Key: APEXMALHAR-2122 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2122 > Project: Apache Apex Malhar > Issue Type: Bug > Components: query operators >Affects Versions: 3.4.0 >Reporter: Akshay Harale > Fix For: 3.4.1 > > > Current elassticsearch version is 2.3.3 so wee need to upgrade the > corresponding java api to 2.3.3. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #325: APEXMALHAR-2122 Upgrading elasticsearch java ...
GitHub user akshay-harale opened a pull request: https://github.com/apache/apex-malhar/pull/325 APEXMALHAR-2122 Upgrading elasticsearch java api from 1.1.2 to 2.3.3. Upgrading elasticsearch java api from 1.1.2 to 2.3.3. Made necessary changes in operator. Fixed build issues recognised while migration. You can merge this pull request into a Git repository by running: $ git pull https://github.com/akshay-harale/apex-malhar master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/325.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #325 commit ff5dbaad0ae349377ae9e20f19275e54189a5e07 Author: Akshay HaraleDate: 2016-06-21T07:03:36Z Upgrading elasticsearch java api from 1.1.2 to 2.3.3. Made necessary changes in operator. Fixed build issues recognised while migration. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only
GitHub user ilooner opened a pull request: https://github.com/apache/apex-malhar/pull/324 Spillable Datastructures PR for review only You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilooner/incubator-apex-malhar APEXMALHAR-2048_pull Alternatively you can review and apply these changes as the patch at: https://github.com/apache/apex-malhar/pull/324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #324 commit c2c3f0acfcdf033a0e3044967ab3f8048f719259 Author: Timothy FarkasDate: 2016-06-05T00:11:20Z - Intermediate commit. commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8 Author: Timothy Farkas Date: 2016-06-13T06:03:21Z Intermediate commit commit 60acf68b96f2145af3d90b410c6b20613347f881 Author: Timothy Farkas Date: 2016-06-21T06:58:09Z Intermediate commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (APEXMALHAR-2119) Make DirectoryScanner in AbstractFileInputOperator inheritance friendly.
[ https://issues.apache.org/jira/browse/APEXMALHAR-2119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15341202#comment-15341202 ] ASF GitHub Bot commented on APEXMALHAR-2119: Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/318#discussion_r67813977 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java --- @@ -1110,6 +1116,16 @@ public String toString() return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + " partitionIndex=" + partitionIndex + " partitionCount=" + partitionCount + "]"; } + +protected void setPartitionIndex(int partitionIndex) +{ + this.partitionIndex = partitionIndex; +} + +protected void setPartitionCount(int partitionCount) +{ + this.partitionCount = partitionCount; +} --- End diff -- Added test for setPartition and verified that the base function usage the value set. > Make DirectoryScanner in AbstractFileInputOperator inheritance friendly. > - > > Key: APEXMALHAR-2119 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2119 > Project: Apache Apex Malhar > Issue Type: Bug >Reporter: Tushar Gosavi >Assignee: Tushar Gosavi > > The DirectoryScanner has partitionIndex and partitionCount declared as > private without any setters. Inherited DirectoryScanner can not set them and > hence can not call most of the methods in DirectoryScanner which depends on > these fields (acceptFile). > Also new DirectoryScanner has to implement createPartition as default one > creates instance of DirectoryScanner by default. > Make the class inheritance friendly by adding setters and use kryo clone in > createPartition. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] apex-malhar pull request #318: APEXMALHAR-2119 add setters for partition cou...
Github user tushargosavi commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/318#discussion_r67813977 --- Diff: library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java --- @@ -1110,6 +1116,16 @@ public String toString() return "DirectoryScanner [filePatternRegexp=" + filePatternRegexp + " partitionIndex=" + partitionIndex + " partitionCount=" + partitionCount + "]"; } + +protected void setPartitionIndex(int partitionIndex) +{ + this.partitionIndex = partitionIndex; +} + +protected void setPartitionCount(int partitionCount) +{ + this.partitionCount = partitionCount; +} --- End diff -- Added test for setPartition and verified that the base function usage the value set. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---