Re: Multiple directories
I request you sir for the being I don't want this kind of mails which is related to your topic So please stop sending mail In future if I need your help about Apache apex or Hadoop I will contact you but for now please stop sending mails On 7 Jul 2016 1:56 a.m., "Mukkamula, Suryavamshivardhan (CWM-NR)" < suryavamshivardhan.mukkam...@rbc.com> wrote: > Hi, > > Can you please let me know, How would I add multiple directories to an > Operator which extends ‘AbstractFileInputOperator’? > > I would like to read from multiple directories by a single operator by > selecting multiple files using ‘filePatternRegExp’. > > Regards, > Surya Vamshi > > > ___ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > >
Re: Multiple directories
In fact you can take a look at FSInputModule in Malhar. This module will filter as well as read all file blocks (files matching regex from all input directories) for you. Check this json based application for sample usage: https://github.com/apache/apex-malhar/tree/master/apps/filecopy Only catch is as I understand from previous mail communication from you, you need to read rectories serially i.e. one after the other, this is not directly supported but can you check if you can modify sanner thread in FileSplitterIunput to serve your needs. -Priyanka On Fri, Jul 8, 2016 at 11:43 AM, Priyanka Gugale <priya...@datatorrent.com> wrote: > Hi, > > Take a look at TimeBasedDirectoryScanner in FileSplitterInput, this > scanner accepts list of files/directories to scan. Also it accepts regex to > filter on file names. I think you can pick ides on how to scan multiple > directories from there. > > -Priyanka > > On Thu, Jul 7, 2016 at 6:59 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > >> Hi Yunhan, >> >> >> >> This example I am already using for reading the data from multiple >> directories in parallel. Hear each directory is given to an operator in >> parallel. >> >> >> >> My requirement is I would like add multiple directories to a single >> operator. >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> *From:* Yunhan Wang [mailto:yun...@datatorrent.com] >> *Sent:* 2016, July, 06 4:37 PM >> *To:* users@apex.apache.org >> *Subject:* Re: Multiple directories >> >> >> >> Hi Surya, >> >> >> >> Please check our fileIO-multiDir example. >> https://github.com/DataTorrent/examples/tree/master/tutorials/fileIO-multiDir >> . >> >> Hope this can help. >> >> >> >> Thanks, >> >> Yunhan >> >> >> >> On Wed, Jul 6, 2016 at 1:26 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < >> suryavamshivardhan.mukkam...@rbc.com> wrote: >> >> Hi, >> >> >> >> Can you please let me know, How would I add multiple directories to an >> Operator which extends ‘AbstractFileInputOperator’? >> >> >> >> I would like to read from multiple directories by a single operator by >> selecting multiple files using ‘filePatternRegExp’. >> >> >> >> Regards, >> >> Surya Vamshi >> >> >> >> ___ >> >> If you received this email in error, please advise the sender (by return >> email or otherwise) immediately. You have consented to receive the attached >> electronically at the above-noted email address; please retain a copy of >> this confirmation for future reference. >> >> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur >> immédiatement, par retour de courriel ou par un autre moyen. Vous avez >> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à >> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de >> cette confirmation pour les fins de reference future. >> >> >> >> ___ >> >> If you received this email in error, please advise the sender (by return >> email or otherwise) immediately. You have consented to receive the attached >> electronically at the above-noted email address; please retain a copy of >> this confirmation for future reference. >> >> Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur >> immédiatement, par retour de courriel ou par un autre moyen. Vous avez >> accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à >> l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de >> cette confirmation pour les fins de reference future. >> >> >
Re: Multiple directories
Hi, Take a look at TimeBasedDirectoryScanner in FileSplitterInput, this scanner accepts list of files/directories to scan. Also it accepts regex to filter on file names. I think you can pick ides on how to scan multiple directories from there. -Priyanka On Thu, Jul 7, 2016 at 6:59 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < suryavamshivardhan.mukkam...@rbc.com> wrote: > Hi Yunhan, > > > > This example I am already using for reading the data from multiple > directories in parallel. Hear each directory is given to an operator in > parallel. > > > > My requirement is I would like add multiple directories to a single > operator. > > > > Regards, > > Surya Vamshi > > > > *From:* Yunhan Wang [mailto:yun...@datatorrent.com] > *Sent:* 2016, July, 06 4:37 PM > *To:* users@apex.apache.org > *Subject:* Re: Multiple directories > > > > Hi Surya, > > > > Please check our fileIO-multiDir example. > https://github.com/DataTorrent/examples/tree/master/tutorials/fileIO-multiDir > . > > Hope this can help. > > > > Thanks, > > Yunhan > > > > On Wed, Jul 6, 2016 at 1:26 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > > Hi, > > > > Can you please let me know, How would I add multiple directories to an > Operator which extends ‘AbstractFileInputOperator’? > > > > I would like to read from multiple directories by a single operator by > selecting multiple files using ‘filePatternRegExp’. > > > > Regards, > > Surya Vamshi > > > > ___ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > > > > ___ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > >
Multiple directories
Hi, Can you please let me know, How would I add multiple directories to an Operator which extends 'AbstractFileInputOperator'? I would like to read from multiple directories by a single operator by selecting multiple files using 'filePatternRegExp'. Regards, Surya Vamshi ___ If you received this email in error, please advise the sender (by return email or otherwise) immediately. You have consented to receive the attached electronically at the above-noted email address; please retain a copy of this confirmation for future reference. Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation pour les fins de reference future.
RE: Reading Multiple directories in parallel
Hi, Thank you so much Ram, it worked !! Regards, Surya Vamshi From: Munagala Ramanath [mailto:r...@datatorrent.com] Sent: 2016, June, 28 4:12 PM To: users@apex.apache.org Subject: Re: Reading Multiple directories in parallel The return collection should match the function return type: List<Partition> newPartitions = new ArrayList(nPartitions); But when adding the operator to the collection, use a matching type: AbstractFileInputOperator op = oper; newPartitions.add(new DefaultPartition<>(op)); Ram On Tue, Jun 28, 2016 at 12:12 PM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkam...@rbc.com<mailto:suryavamshivardhan.mukkam...@rbc.com>> wrote: Hi Ram, I tried in the same way, but the problem that I face is that the return type of definePartition() method does not match with super class. Hence I temporarily changed AbstractFileInputOperator, but looking for a better way to do this. #definePartition() method ### @Override public Collection<Partition<AbstractFileInputOperator>> definePartitions( Collection<Partition<AbstractFileInputOperator>> partitions, PartitioningContext context) { final int prevCount = partitions.size(); if (1 < prevCount) { throw new RuntimeException("Error: Dynamic repartition not supported"); } //compute first and last indices of partitions for each directory //final int numDirs = directories.length, numParCounts = partitionCounts.length; final int numDirs = inputDirectories.size(), numParCounts = partCounts.size(); //final int[] sliceFirstIndex = new int[numDirs]; Map<String,Integer> sliceFirstIndex = new HashMap<String,Integer>(numDirs); LOG.info("definePartitions: prevCount = {}, directories.size = {}, " + "partitionCounts.size = {}", prevCount, numDirs, numParCounts); int nPartitions = 0; // desired number of partitions for(String sourceId : inputDirectories.keySet()){ sliceFirstIndex.put(sourceId, nPartitions); final int nP = Integer.parseInt(partCounts.get(sourceId)); LOG.info("definePartitions:sourceId = {} ,no of partitions = {}, dir = {}", sourceId, nP, inputDirectories.get(sourceId)); nPartitions += nP; } /*if (1 == nPartitions) { LOG.info("definePartitions: Nothing to do in definePartitions"); return partitions; // nothing to do }*/ if (nPartitions <= 0) { // error final String msg = String.format("Error: Bad number of partitions %d%n", nPartitions); LOG.error(msg); throw new RuntimeException(msg); } this.partitionCount = nPartitions; LOG.debug("definePartitions: Creating {} partitions", nPartitions); /* * Create partitions of scanners, scanner's partition method will do * state transfer for DirectoryScanner objects. */ Kryo kryo = new Kryo(); SlicedDirectoryScanner sds = (SlicedDirectoryScanner) scanner; List scanners = sds.partition(nPartitions, inputDirectories, partCounts); // return value: new list of partitions (includes old list) List<Partition> newPartitions = new ArrayList(nPartitions); // parallel list of storage managers Collection newManagers = new ArrayList(nPartitions); // setup new partitions LOG.info("definePartitions: setting up {} new partitoins with {} monitored directories", nPartitions, numDirs); final IdempotentStorageManager ism = getIdempotentStorageManager(); for (String sourceId : inputDirectories.keySet()) { int first = sliceFirstIndex.get(sourceId); int last = first + Integer.parseInt(partCounts.get(sourceId)); String dir = Helper.changeInputDirectory(sourceId, inputDirectories.get(sourceId),snapDates.get(sourceId)); //String dir = inputDirectories.get(sourceId); String inConfig = inputConfigFiles.get(sourceId); String outConfig = outputConfigFiles.get(sourceId); String loadDate = snapDates.get(sourceId); LOG.info("definePartitions: first = {}, last = {}, dir = {}", first, last, dir); LOG.info("definePartitions: di
Re: Multiple directories
It looks like you may be including old Hadoop jars in your apa package since the stack trace shows *ConverterUtils.toContainerId* calling *ConverterUtils.toApplicationAttemptId* but recent versions don't have that call sequence. In 2.7.1 (which is what your cluster has) the function looks like this: * public static ContainerId toContainerId(String containerIdStr) {* *return ContainerId.fromString(containerIdStr);* * }* Could you post the output of "*jar tvf {your-apa-file}*" as well as: "*mvn dependency:tree"* Ram On Thu, Jun 16, 2016 at 12:38 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < suryavamshivardhan.mukkam...@rbc.com> wrote: > Hi Ram, > > > > Below is the information. > > > > > > % Total% Received % Xferd Average Speed TimeTime Time > Current > > Dload Upload Total SpentLeft > Speed > > 100 7120 7120 0 3807 0 --:--:-- --:--:-- --:--:-- > 3807 > > { > > "clusterInfo": { > > "haState": "ACTIVE", > > "haZooKeeperConnectionState": "CONNECTED", > > "hadoopBuildVersion": "2.7.1.2.3.2.0-2950 from > 5cc60e0003e33aa98205f18bc > > caeaf36cb193c1c by jenkins source checksum 69a3bf8c667267c2c252a54fbbf23d", > > "hadoopVersion": "2.7.1.2.3.2.0-2950", > > "hadoopVersionBuiltOn": "2015-09-30T18:08Z", > > "id": 1465495186350, > > "resourceManagerBuildVersion": "2.7.1.2.3.2.0-2950 from > 5cc60e0003e33aa9 > > 8205f18bccaeaf36cb193c1c by jenkins source checksum > 48db4b572827c2e9c2da66982d14 > > 7626", > > "resourceManagerVersion": "2.7.1.2.3.2.0-2950", > >"resourceManagerVersionBuiltOn": "2015-09-30T18:20Z", > > "rmStateStoreName": > "org.apache.hadoop.yarn.server.resourcemanager.recov > > ery.ZKRMStateStore", > > "startedOn": 1465495186350, > > "state": "STARTED" > > } > > } > > > > Regards, > > Surya Vamshi > > > > *From:* Munagala Ramanath [mailto:r...@datatorrent.com] > *Sent:* 2016, June, 16 2:57 PM > *To:* users@apex.apache.org > *Subject:* Re: Multiple directories > > > > Can you ssh to one of the cluster nodes ? If so, can you run this command > and show the output > > (where *{rm} *is the *host:port* running the resource manager, aka YARN): > > > > *curl http://{rm}/ws/v1/cluster <http://%7brm%7d/ws/v1/cluster> | python > -mjson.tool* > > > > Ram > > ps. You can determine the node running YARN with: > > > > *hdfs getconf -confKey yarn.resourcemanager.webapp.address* > > *hdfs getconf -confKey yarn.resourcemanager.webapp.https.address* > > > > > > > > On Thu, Jun 16, 2016 at 11:15 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < > suryavamshivardhan.mukkam...@rbc.com> wrote: > > Hi, > > > > I am facing a weird issue and the logs are not clear to me !! > > > > I have created apa file which works fine within my local sandbox but > facing problems when I upload on the enterprise Hadoop cluster using DT > Console. > > > > Below is the error message from yarn logs. Please help in understanding > the issue. > > > > ## Error Logs > > > > > Log Type: AppMaster.stderr > > Log Upload Time: Thu Jun 16 14:07:46 -0400 2016 > > Log Length: 1259 > > SLF4J: Class path contains multiple SLF4J bindings. > > SLF4J: Found binding in > [jar:file:/grid/06/hadoop/yarn/local/usercache/mukkamula/appcache/application_1465495186350_2224/filecache/36/slf4j-log4j12-1.7.19.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: Found binding in > [jar:file:/usr/hdp/2.3.2.0-2950/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] > > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > > Exception in thread "main" java.lang.IllegalArgumentException: Invalid > ContainerId: container_e35_1465495186350_2224_01_01 > > at > org.apache.hadoop.yarn.util.ConverterUtils.toContainerId(ConverterUtils.java:182) > > at > com.datatorrent.stram.StreamingAppMaster.main(StreamingAppMaster.java:90) > > Caused by: java.lang.NumberFormatException: For input string: "e35" > >
RE: Multiple directories
Hi Ram, Assuming that properties are set as Key,Value pairs. I have used the properties as below and I can read the multiple directories in parallel. Thank you. dt.application.FileIO.operator.read.prop.inputDirectory(source_123) tmp/fileIO/source_123 dt.application.FileIO.operator.read.prop.inputConfigFile(source_123) tmp/fileIO/config/source_123/source_123_input_config.xml dt.application.FileIO.operator.read.prop.partCount(source_123) 1 dt.application.FileIO.operator.read.prop.outputDirectory(source_123) tmp/fileIO/source_123 dt.application.FileIO.operator.read.prop.outputConfigFile(source_123) tmp/fileIO/config/source_123/source_123_output_config.xml Regards, Surya Vamshi From: Mukkamula, Suryavamshivardhan (CWM-NR) [mailto:suryavamshivardhan.mukkam...@rbc.com] Sent: 2016, June, 08 5:14 PM To: users@apex.apache.org Subject: RE: Multiple directories Hi Ram, Thank you. I would like to define the below class elements as list from the properties.xml , I tried creating as below but no luck. Can you please help on how to correctly define the list of below elements. public class InputValues<SOURCEID,DIRECTORY,CONFIGFILE,PARTITIONCOUNT> { public SOURCEID sourceId; public DIRECTORY directory; public CONFIGFILE configFile; public PARTITIONCOUNT partitionCount; public InputValues() { } public InputValues(SOURCEID sourceId, DIRECTORY directory,CONFIGFILE configFile,PARTITIONCOUNT partitionCount) { this.sourceId = sourceId; this.directory = directory; this.configFile = configFile; this.partitionCount = partitionCount; } } Properties: dt.application.FileIO.operator.read.prop.inputValues(source_123) tmp/fileIO/source_123 tmp/fileIO/config/source_123_config.xml 1 dt.application.FileIO.operator.read.prop.inputValues(source_124) tmp/fileIO/source_124 tmp/fileIO/config/source_124_config.xml 1 Regards, Surya Vamshi From: Munagala Ramanath [mailto:r...@datatorrent.com] Sent: 2016, June, 05 10:24 PM To: users@apex.apache.org<mailto:users@apex.apache.org> Subject: Re: Multiple directories Some sample code to monitor multiple directories is now available at: https://github.com/DataTorrent/examples/tree/master/tutorials/fileIO-multiDir It shows how to use a custom implementation of definePartitions() to create multiple partitions of the file input operator and group them into "slices" where each slice monitors a single directory. Ram On Wed, May 25, 2016 at 9:55 AM, Munagala Ramanath <r...@datatorrent.com<mailto:r...@datatorrent.com>> wrote: I'm hoping to have a sample sometime next week. Ram On Wed, May 25, 2016 at 9:30 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkam...@rbc.com<mailto:suryavamshivardhan.mukkam...@rbc.com>> wrote: Thank you so much ram, for your advice , Option (a) would be ideal for my requirement. Do you have sample usage for partitioning with individual configuration set ups different partitions? Regards, Surya Vamshi From: Munagala Ramanath [mailto:r...@datatorrent.com<mailto:r...@datatorrent.com>] Sent: 2016, May, 25 12:11 PM To: users@apex.apache.org<mailto:users@apex.apache.org> Subject: Re: Multiple directories You have 2 options: (a) AbstractFileInputOperator (b) FileSplitter/BlockReader For (a), each partition (i.e. replica or the operator) can scan only a single directory, so if you have 100 directories, you can simply start with 100 partitions; since each partition is scanning its own directory you don't need to worry about which files the lines came from. This approach however needs a custom definePartition() implementation in your subclass to assign the appropriate directory and XML parsing config file to each partition; it also needs adequate cluster resources to be able to spin up the required number of partitions. For (b), there is some documentation in the Operators section at http://docs.datatorrent.com/ including sample code. There operators support scanning multiple directories out of the box but have more elaborate configuration options. Check this out and see if it works in your use case. Ram On Wed, May 25, 2016 at 8:17 AM, Mukkamula, Suryavamshivardhan (CWM-NR) <suryavamshivardhan.mukkam...@rbc.com<mailto:suryavamshivardhan.mukkam...@rbc.com>> wrote: Hello Ram/Team, My requirement is to read input feeds from different locations on HDFS and parse those files by reading XML configuration files (each input feed has configuration file which defines the fields inside the input feeds). My approach : I would like to define a mapping file which contains individual feed identifier, feed location , configuration file location. I would like to read this mapping file at initial load w