My Problem is I want to use Bulk Import Method for different tables, each having one column family only, by Generating one HFlie per table at different Paths . If i use HFileOutputFormat, it works fine for generating Hfile for one table and one column family and for One Map that contains only one context.write(ImmutableBytesWritable, KeyValue);
In my case, I want to generate one HFile for each table, each Having one column Family at different Paths. i have two different context.write(ImmutableBytesWritable1, KeyValue1); context.write(ImmutableBytesWritable2, KeyValue2); in one Map. i try by editing existing MultiHFileOutputFormat at this https://gist.github.com/1237770 For generating different paths, i change in line 48,49,50, because my each table's column family name is same but each table's qualifier is different table1, family1:qualifier1 for context.write(ImmutableBytesWritable1, KeyValue1); table2, family1:qualifier2 for context.write(ImmutableBytesWritable2, KeyValue2); Main Path /user/root/URI/output/ new Path for first context /user/root/URI/output/c1/family1 New Path for second context /user/root/URI/output/c2/family1 it works but problem is, it generates one hfile per record inside column family name folder, for overview of one path, input file having 7 records, there are 7 hfiles : /user/root/URI/output/c1/family1 Name Type Size Replication Block Size Modification Time Permission Owner Group 1683285817312272721 file 0.41 KB 1 64 MB 2011-09-28 15:24 rw-r--r-- root supergroup 2621366688675098537 file 0.41 KB 1 64 MB 2011-09-28 15:24 rw-r--r-- root supergroup 4010796510230526713 file 0.41 KB 1 64 MB 2011-09-28 15:24 rw-r--r-- root supergroup 4776537456762330874 file 0.41 KB 1 64 MB 2011-09-28 15:24 rw-r--r-- root supergroup 5183627073769524429 file 0.41 KB 1 64 MB 2011-09-28 15:24 rw-r--r-- root supergroup 8473210328361167254 file 0.41 KB 1 64 MB 2011-09-28 15:24 rw-r--r-- root supergroup 8665529999981215284 file 0.41 KB 1 64 MB 2011-09-28 15:24 rw-r--r-- root supergroup I think, it is because of line 137 and 140 for keeping working separately for each context.write Please suggest change in code that it create one hfile per column family like HFileOutputFormat work. > 1. import java.io.IOException; > 2. import java.util.HashMap; > 3. import java.util.Map; > 4. import java.util.TreeMap; > 5. import org.apache.hadoop.conf.Configuration; > 6. import org.apache.hadoop.fs.FileSystem; > 7. import org.apache.hadoop.fs.Path; > 8. import org.apache.hadoop.hbase.HConstants; > 9. import org.apache.hadoop.hbase.KeyValue; > 10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > 11. import org.apache.hadoop.hbase.io.hfile.Compression; > 12. import org.apache.hadoop.hbase.io.hfile.HFile; > 13. import org.apache.hadoop.hbase.regionserver.StoreFile; > 14. import org.apache.hadoop.hbase.util.Bytes; > 15. import org.apache.hadoop.mapreduce.RecordWriter; > 16. import org.apache.hadoop.mapreduce.TaskAttemptContext; > 17. import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; > 18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; > 19. public class MultiHFileOutputFormat > 20. extends FileOutputFormat<ImmutableBytesWritable, KeyValue> { > 21. Map<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable, > KeyValue>> writers = > 22. new HashMap<ImmutableBytesWritable, > RecordWriter<ImmutableBytesWritable, KeyValue>>(); > 23. static boolean result= false; > 24. /* Data structure to hold a Writer and amount of data written on > it. */ > 25. static class WriterLength { > 26. long written = 0; > 27. HFile.Writer writer = null; > 28. } > 29. public RecordWriter<ImmutableBytesWritable, KeyValue> > getRecordWriter(final TaskAttemptContext context) > 30. throws IOException, InterruptedException { > 31. return new RecordWriter<ImmutableBytesWritable, KeyValue>() { > 32. @Override > 33. public void close(TaskAttemptContext context) throws > IOException, > 34. InterruptedException { > 35. for (RecordWriter<ImmutableBytesWritable, KeyValue> writer: > writers.values()) { > 36. writer.close(context); > 37. } > 38. } > 39. @Override > 40. public void write(ImmutableBytesWritable key, KeyValue value) > 41. throws IOException, InterruptedException { > 42. System.out.println("1st Write Function Call"); > 43. System.out.println("Writer Size = " + writers.size()); > 44. RecordWriter<ImmutableBytesWritable, KeyValue> writer = > writers.get(key); > 45. System.out.println("Writers Value " + > Bytes.toString(key.get()) +" & " + writers.get(key)); > 46. if (writer == null) { > 47. System.out.println("1st if Section Write Function > Call"); > 48. final Path outputPath = > 49. new > Path(FileOutputFormat.getOutputPath(context).toString() + "/" + > 50. Bytes.toString(value.getQualifier())); > 51. writer = new RecordWriter<ImmutableBytesWritable, > KeyValue>() { > 52. FileOutputCommitter committer = new > FileOutputCommitter(outputPath, context); > 53. Path outputdir = committer.getWorkPath(); > 54. Configuration conf = context.getConfiguration(); > 55. FileSystem fs = outputdir.getFileSystem(conf); > 56. long maxsize = > conf.getLong("hbase.hregion.max.filesize", 268435456); > 57. int blocksize = > 58. > conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", 65536); > 59. String compression = conf.get("hfile.compression", > 60. Compression.Algorithm.NONE.getName()); > 61. // Map of families to writers and how much has been > output on the writer. > 62. Map<byte [], WriterLength> writers = > 63. new TreeMap<byte [], > WriterLength>(Bytes.BYTES_COMPARATOR); > 64. byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; > 65. byte [] now = > Bytes.toBytes(System.currentTimeMillis()); > 66. /* Create a new HFile.Writer. Close current if there is > one. > 67. * @param writer > 68. * @param familydir > 69. * @return A new HFile.Writer. > 70. * @throws IOException > 71. */ > 72. private HFile.Writer getNewWriter(final HFile.Writer > writer, > 73. final Path familydir) > 74. throws IOException { > 75. close(writer); > 76. return new HFile.Writer(fs, > StoreFile.getUniqueFile(fs, familydir), > 77. blocksize, compression, KeyValue.KEY_COMPARATOR); > 78. } > 79. private void close(final HFile.Writer w) throws > IOException { > 80. System.out.println("2 Close Function Start"); > 81. if (w != null) { > 82. w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, > 83. Bytes.toBytes(System.currentTimeMillis())); > 84. w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, > 85. > Bytes.toBytes(context.getTaskAttemptID().toString())); > 86. w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, > 87. Bytes.toBytes(true)); > 88. w.close(); > 89. } > 90. System.out.println("2 Close Function End"); > 91. } > 92. @Override > 93. public void close(TaskAttemptContext context) > 94. throws IOException, InterruptedException { > 95. System.out.println("3 Close Function Start"); > 96. for (Map.Entry<byte [], WriterLength> e: > this.writers.entrySet()) { > 97. close(e.getValue().writer); > 98. } > 99. committer.commitTask(context); > 100. System.out.println("3 Close Function End"); > 101. } > 102. @Override > 103. public void write(ImmutableBytesWritable key, KeyValue > value) > 104. throws IOException, InterruptedException { > 105. System.out.println("2nd Write Function Call"); > 106. long length = value.getLength(); > 107. byte [] family = value.getFamily(); > 108. WriterLength wl = this.writers.get(family); > 109. if (wl == null || ((length + wl.written) >= maxsize) > && > 110. Bytes.compareTo(this.previousRow, 0, > this.previousRow.length, > 111. value.getBuffer(), value.getRowOffset(), > 112. value.getRowLength()) != 0) { > 113. // Get a new writer. > 114. Path basedir = new Path(outputdir, > Bytes.toString(family)); > 115. System.out.println("Creating Directory > Condition"); > 116. if (wl == null) { > 117. wl = new WriterLength(); > 118. this.writers.put(family, wl); > 119. if (this.writers.size() > 1) throw new > IOException("One family only"); > 120. // If wl == null, first file in family. Ensure > family dir exits. > 121. System.out.println("Creating Directory > Start"); > 122. if (!fs.exists(basedir)) fs.mkdirs(basedir); > 123. System.out.println("Creating Directory End"); > 124. } > 125. wl.writer = getNewWriter(wl.writer, basedir); > 126. wl.written = 0; > 127. } > 128. value.updateLatestStamp(this.now); > 129. wl.writer.append(value); > 130. wl.written += length; > 131. // Copy the row so we know when a row transition. > 132. this.previousRow = value.getRow(); > 133. System.out.println("2nd Write Function Call End"); > 134. } > 135. }; > 136. writers.put(key, writer); > 137. writers.clear(); > 138. } > 139. writer.write(new ImmutableBytesWritable(value.getRow()), > value); > 140. writer.close(context); > 141. System.out.println("1st Write Function Call End"); > 142. } > 143. }; > 144. } > 145. } > > > > > -- Best Regards, Arsalan Bilal -- Best Regards, Arsalan Bilal
