The return collection should match the function return type: *List<Partition<AbstractFileInputOperator>> newPartitions = new ArrayList(nPartitions);*
But when adding the operator to the collection, use a matching type: *AbstractFileInputOperator op = oper;* *newPartitions.add(new DefaultPartition<>(op));* Ram On Tue, Jun 28, 2016 at 12:12 PM, Mukkamula, Suryavamshivardhan (CWM-NR) < [email protected]> wrote: > Hi Ram, > > > > I tried in the same way, but the problem that I face is that the return > type of definePartition() method does not match with super class. Hence I > temporarily changed AbstractFileInputOperator, but looking for a better way > to do this. > > > > #############################definePartition() method > ####################################### > > > > @Override > > *public* Collection<Partition<AbstractFileInputOperator<String>>> > definePartitions( > > > Collection<Partition<AbstractFileInputOperator<String>>> partitions, > PartitioningContext context) { > > > > *final* *int* prevCount = partitions.size(); > > *if* (1 < prevCount) { > > *throw* *new* RuntimeException("Error: Dynamic > repartition not supported"); > > } > > > > //compute first and last indices of partitions for each > directory > > //final * int* numDirs = directories.length, numParCounts = > partitionCounts.length; > > *final* *int* numDirs = inputDirectories.size(), > numParCounts = partCounts.size(); > > //final * int*[] sliceFirstIndex = new *int*[numDirs]; > > Map<String,Integer> sliceFirstIndex = *new* > HashMap<String,Integer>(numDirs); > > > > *LOG*.info("definePartitions: prevCount = {}, > directories.size = {}, " + "partitionCounts.size = {}", > > prevCount, numDirs, numParCounts); > > > > *int* nPartitions = 0; // desired number of partitions > > > > *for*(String sourceId : inputDirectories.keySet()){ > > sliceFirstIndex.put(sourceId, nPartitions); > > *final* *int* nP = Integer.*parseInt*(partCounts.get( > sourceId)); > > *LOG*.info("definePartitions:sourceId = {} ,no of > partitions = {}, dir = {}", sourceId, nP, inputDirectories.get(sourceId)); > > nPartitions += nP; > > } > > > > /*if (1 == nPartitions) { > > LOG.info("definePartitions: Nothing to do in > definePartitions"); > > return partitions; // nothing to do > > }*/ > > > > *if* (nPartitions <= 0) { // error > > *final* String msg = String.*format*("Error: Bad > number of partitions %d%n", nPartitions); > > *LOG*.error(msg); > > *throw* *new* RuntimeException(msg); > > } > > *this*.partitionCount = nPartitions; > > > > *LOG*.debug("definePartitions: Creating {} partitions", > nPartitions); > > > > /* > > * Create partitions of scanners, scanner's partition method > will do > > * state transfer for DirectoryScanner objects. > > */ > > Kryo kryo = *new* Kryo(); > > > > SlicedDirectoryScanner sds = (SlicedDirectoryScanner) > scanner; > > List<SlicedDirectoryScanner> scanners = sds.partition( > nPartitions, inputDirectories, partCounts); > > > > // return value: new list of partitions (includes old list) > > List<Partition<AbstractFileReader>> newPartitions = *new** > ArrayList(**nPartitions**)*; > > > > // parallel list of storage managers > > Collection<IdempotentStorageManager> newManagers = *new** > ArrayList(**nPartitions**)*; > > > > // setup new partitions > > *LOG*.info("definePartitions: setting up {} new partitoins > with {} monitored directories", nPartitions, numDirs); > > > > *final* IdempotentStorageManager ism = > getIdempotentStorageManager(); > > > > *for* (String sourceId : inputDirectories.keySet()) { > > *int* first = sliceFirstIndex.get(sourceId); > > *int* last = first + Integer.*parseInt*(partCounts > .get(sourceId)); > > String dir = Helper.*changeInputDirectory*(sourceId, > inputDirectories.get(sourceId),snapDates.get(sourceId)); > > //String *dir* = inputDirectories.get(sourceId); > > String inConfig = inputConfigFiles.get(sourceId); > > String outConfig = outputConfigFiles.get(sourceId); > > String loadDate = snapDates.get(sourceId); > > *LOG*.info("definePartitions: first = {}, last = {}, > dir = {}", first, last, dir); > > *LOG*.info("definePartitions: directory = {}, > inputConfigFile = {}, outputConfigFile = {} , loadDate = {}", dir, > inConfig,outConfig,loadDate); > > *for* (*int* i = first; i < last; ++i) { > > AbstractFileReader oper = (AbstractFileReader) > *cloneObject*(kryo, *this*); > > oper.setDirectory(dir); > > oper.setInputConfFile(inConfig); > > oper.setOutputConfFile(outConfig); > > oper.setSourceId(sourceId); > > oper.setLoadDate(loadDate); > > oper.setOldInputDir(inputDirectories.get( > sourceId)); > > SlicedDirectoryScanner scn = > (SlicedDirectoryScanner) scanners.get(i); > > scn.setStartIndex(first); > > scn.setEndIndex(last); > > scn.setDirectory(dir); > > > > oper.setScanner(scn); > > newPartitions.add(*new* DefaultPartition<>(oper > )); > > newManagers.add(oper > .getIdempotentStorageManager()); > > } > > } > > > > ism.partitioned(newManagers, *null*); > > *LOG*.info("definePartition: returning {} partitions", > newPartitions.size()); > > *return* *newPartitions*; > > } > > > > Regards, > > Surya Vamshi > > > > *From:* Munagala Ramanath [mailto:[email protected]] > *Sent:* 2016, June, 28 2:35 PM > *To:* [email protected] > *Subject:* Re: Reading Multiple directories in parallel > > > > You can add those properties in your super class and simply cast the clone > to > > that class, so change: > > > > *AbstractFileInputOperator<String> oper = cloneObject(kryo, this);* > > > > to something like: > > > > MyFileInputOperator<String> oper = (MyFileInputOperator) cloneObject(kryo, > this); > > > > On Tue, Jun 28, 2016 at 11:17 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < > [email protected]> wrote: > > Hi Ram, > > > > I would like to add the parameters for each partition like below. Each > operator to be given with its own configuration file and source identifier. > If there is any other way please let me know ? > > In my current definepartition() method , I am doing similarly like below, > but I have to add setter and getter methods in AbstractFileInputOperator > class. > > > > for (int j = 0; j < numDirs; ++j) { > > int first = sliceFirstIndex[j]; > > int last = first + partitionCounts[j]; > > String dir = directories[j]; > > LOG.info("definePartitions: first = {}, last = {}, dir = {}", first, > last, dir); > > for (int i = first; i < last; ++i) { > > AbstractFileInputOperator<String> oper = cloneObject(kryo, this); > > oper.setDirectory(dir); > > oper.setSourceId(<sourceId>); > > oper.setConfigFile(<fileName>); > > //oper.setpIndex(i); > > SlicedDirectoryScanner scn = (SlicedDirectoryScanner) > scanners.get(i); > > scn.setStartIndex(first); > > scn.setEndIndex(last); > > scn.setDirectory(dir); > > > > oper.setScanner(scn); > > newPartitions.add(new DefaultPartition<>(oper)); > > newManagers.add(oper.getIdempotentStorageManager()); > > } > > } > > > > > > Regards, > > Surya Vamshi > > > > *From:* Munagala Ramanath [mailto:[email protected]] > *Sent:* 2016, June, 28 2:03 PM > *To:* [email protected] > *Subject:* Re: Reading Multiple directories in parallel > > > > Not sure I fully understand the question but you can add whatever fields > you need > > to your class that extends *AbstractFileInputOperator*. For example, > > > https://github.com/DataTorrent/examples/blob/master/tutorials/fileIO-multiDir/src/main/java/com/example/fileIO/FileReaderMultiDir.java > > defines fields *directories* and *partitionCounts*. > > > > You can then set these fields as needed in *definePartitions*. > > > > Ram > > > > On Tue, Jun 28, 2016 at 10:31 AM, Mukkamula, Suryavamshivardhan (CWM-NR) < > [email protected]> wrote: > > Hi Ram, > > > > Can you please suggest , how would I add another variable (like > ‘directory’) while creating multiple partitions of > AbstractFileInputOperator in the define partition method. > > > > I have currently added variables in the AbstractFileInputOperator , which > I guess not a better way. > > > > These variables are basically used to scan directories in parallel > differently. > > > > Regards, > > Surya Vamshi > > > > _______________________________________________________________________ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > > > > _______________________________________________________________________ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > > > > _______________________________________________________________________ > > If you received this email in error, please advise the sender (by return > email or otherwise) immediately. You have consented to receive the attached > electronically at the above-noted email address; please retain a copy of > this confirmation for future reference. > > Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur > immédiatement, par retour de courriel ou par un autre moyen. Vous avez > accepté de recevoir le(s) document(s) ci-joint(s) par voie électronique à > l'adresse courriel indiquée ci-dessus; veuillez conserver une copie de > cette confirmation pour les fins de reference future. > >
