My Problem is I want to use Bulk Import Method for different tables, each
having one column family only, by Generating one HFlie per table at
different Paths .
If i use HFileOutputFormat, it works fine for generating Hfile for one table
and one column family and for One Map that contains only one
context.write(ImmutableBytesWritable, KeyValue);

In my case, I want to generate one HFile for each table, each Having one
column Family at different Paths.

i have two different context.write(ImmutableBytesWritable1, KeyValue1);
context.write(ImmutableBytesWritable2, KeyValue2); in one Map.

i try by editing existing MultiHFileOutputFormat at this
https://gist.github.com/1237770

For generating different paths, i change in line 48,49,50, because my each
table's column family name is same but each table's qualifier is different
table1, family1:qualifier1 for context.write(ImmutableBytesWritable1,
KeyValue1);
table2, family1:qualifier2 for context.write(ImmutableBytesWritable2,
KeyValue2);

Main Path /user/root/URI/output/
new Path for first context /user/root/URI/output/c1/family1
New Path for second context /user/root/URI/output/c2/family1

it works but problem is, it generates one hfile per record inside column
family name folder,

for overview of one path, input file having 7 records, there are 7 hfiles :
/user/root/URI/output/c1/family1

Name Type Size Replication Block Size Modification Time Permission Owner
Group 1683285817312272721 file 0.41 KB 1 64 MB 2011-09-28 15:24 rw-r--r--
root supergroup 2621366688675098537 file 0.41 KB 1 64 MB 2011-09-28 15:24
rw-r--r-- root supergroup 4010796510230526713 file 0.41 KB 1 64 MB 2011-09-28
15:24 rw-r--r-- root supergroup 4776537456762330874 file 0.41 KB 1 64
MB 2011-09-28
15:24 rw-r--r-- root supergroup 5183627073769524429 file 0.41 KB 1 64
MB 2011-09-28
15:24 rw-r--r-- root supergroup 8473210328361167254 file 0.41 KB 1 64
MB 2011-09-28
15:24 rw-r--r-- root supergroup 8665529999981215284 file 0.41 KB 1 64
MB 2011-09-28
15:24 rw-r--r-- root supergroup
I think, it is because of line 137 and 140 for keeping working separately
for each context.write

Please suggest change in code that it create one hfile per column family
like HFileOutputFormat work.


>    1. import java.io.IOException;
>    2. import java.util.HashMap;
>    3. import java.util.Map;
>    4. import java.util.TreeMap;
>    5. import org.apache.hadoop.conf.Configuration;
>    6. import org.apache.hadoop.fs.FileSystem;
>    7. import org.apache.hadoop.fs.Path;
>    8. import org.apache.hadoop.hbase.HConstants;
>    9. import org.apache.hadoop.hbase.KeyValue;
>    10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>    11. import org.apache.hadoop.hbase.io.hfile.Compression;
>    12. import org.apache.hadoop.hbase.io.hfile.HFile;
>    13. import org.apache.hadoop.hbase.regionserver.StoreFile;
>    14. import org.apache.hadoop.hbase.util.Bytes;
>    15. import org.apache.hadoop.mapreduce.RecordWriter;
>    16. import org.apache.hadoop.mapreduce.TaskAttemptContext;
>    17. import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
>    18. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
>    19. public class MultiHFileOutputFormat
>    20. extends FileOutputFormat<ImmutableBytesWritable, KeyValue> {
>    21.   Map<ImmutableBytesWritable, RecordWriter<ImmutableBytesWritable,
>    KeyValue>> writers =
>    22.     new HashMap<ImmutableBytesWritable,
>    RecordWriter<ImmutableBytesWritable, KeyValue>>();
>    23.   static boolean result= false;
>    24.   /* Data structure to hold a Writer and amount of data written on
>    it. */
>    25.   static class WriterLength {
>    26.     long written = 0;
>    27.     HFile.Writer writer = null;
>    28.   }
>    29.   public RecordWriter<ImmutableBytesWritable, KeyValue>
>    getRecordWriter(final TaskAttemptContext context)
>    30.   throws IOException, InterruptedException {
>    31.     return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
>    32.       @Override
>    33.       public void close(TaskAttemptContext context) throws
>    IOException,
>    34.           InterruptedException {
>    35.         for (RecordWriter<ImmutableBytesWritable, KeyValue> writer:
>    writers.values()) {
>    36.           writer.close(context);
>    37.         }
>    38.       }
>    39.       @Override
>    40.       public void write(ImmutableBytesWritable key, KeyValue value)
>    41.           throws IOException, InterruptedException {
>    42.           System.out.println("1st Write Function Call");
>    43.         System.out.println("Writer Size = " + writers.size());
>    44.         RecordWriter<ImmutableBytesWritable, KeyValue> writer =
>    writers.get(key);
>    45.         System.out.println("Writers Value " +
>    Bytes.toString(key.get()) +" & " + writers.get(key));
>    46.         if (writer == null) {
>    47.             System.out.println("1st if Section Write Function
>    Call");
>    48.             final Path outputPath =
>    49.             new
>    Path(FileOutputFormat.getOutputPath(context).toString() + "/" +
>    50.                 Bytes.toString(value.getQualifier()));
>    51.           writer = new RecordWriter<ImmutableBytesWritable,
>    KeyValue>() {
>    52.             FileOutputCommitter committer = new
>    FileOutputCommitter(outputPath, context);
>    53.             Path outputdir = committer.getWorkPath();
>    54.             Configuration conf = context.getConfiguration();
>    55.             FileSystem fs = outputdir.getFileSystem(conf);
>    56.             long maxsize =
>    conf.getLong("hbase.hregion.max.filesize", 268435456);
>    57.             int blocksize =
>    58.
>    conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", 65536);
>    59.             String compression = conf.get("hfile.compression",
>    60.               Compression.Algorithm.NONE.getName());
>    61.             // Map of families to writers and how much has been
>    output on the writer.
>    62.             Map<byte [], WriterLength> writers =
>    63.               new TreeMap<byte [],
>    WriterLength>(Bytes.BYTES_COMPARATOR);
>    64.             byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
>    65.             byte [] now =
>    Bytes.toBytes(System.currentTimeMillis());
>    66.             /* Create a new HFile.Writer. Close current if there is
>    one.
>    67.              * @param writer
>    68.              * @param familydir
>    69.              * @return A new HFile.Writer.
>    70.              * @throws IOException
>    71.              */
>    72.             private HFile.Writer getNewWriter(final HFile.Writer
>    writer,
>    73.                 final Path familydir)
>    74.             throws IOException {
>    75.               close(writer);
>    76.               return new HFile.Writer(fs,
>    StoreFile.getUniqueFile(fs, familydir),
>    77.                 blocksize, compression, KeyValue.KEY_COMPARATOR);
>    78.             }
>    79.             private void close(final HFile.Writer w) throws
>    IOException {
>    80.               System.out.println("2 Close Function Start");
>    81.                 if (w != null) {
>    82.                 w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
>    83.                     Bytes.toBytes(System.currentTimeMillis()));
>    84.                 w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
>    85.
>    Bytes.toBytes(context.getTaskAttemptID().toString()));
>    86.                 w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
>    87.                     Bytes.toBytes(true));
>    88.                 w.close();
>    89.               }
>    90.               System.out.println("2 Close Function End");
>    91.             }
>    92.             @Override
>    93.             public void close(TaskAttemptContext context)
>    94.             throws IOException, InterruptedException {
>    95.                 System.out.println("3 Close Function Start");
>    96.               for (Map.Entry<byte [], WriterLength> e:
>    this.writers.entrySet()) {
>    97.                 close(e.getValue().writer);
>    98.               }
>    99.               committer.commitTask(context);
>    100.               System.out.println("3 Close Function End");
>    101.             }
>    102.             @Override
>    103.             public void write(ImmutableBytesWritable key, KeyValue
>    value)
>    104.                 throws IOException, InterruptedException {
>    105.                 System.out.println("2nd Write Function Call");
>    106.               long length = value.getLength();
>    107.               byte [] family = value.getFamily();
>    108.               WriterLength wl = this.writers.get(family);
>    109.               if (wl == null || ((length + wl.written) >= maxsize)
>    &&
>    110.                   Bytes.compareTo(this.previousRow, 0,
>    this.previousRow.length,
>    111.                     value.getBuffer(), value.getRowOffset(),
>    112.                     value.getRowLength()) != 0) {
>    113.                 // Get a new writer.
>    114.                 Path basedir = new Path(outputdir,
>    Bytes.toString(family));
>    115.                 System.out.println("Creating Directory
>    Condition");
>    116.                 if (wl == null) {
>    117.                   wl = new WriterLength();
>    118.                   this.writers.put(family, wl);
>    119.                   if (this.writers.size() > 1) throw new
>    IOException("One family only");
>    120.                   // If wl == null, first file in family.  Ensure
>    family dir exits.
>    121.                     System.out.println("Creating Directory
>    Start");
>    122.                   if (!fs.exists(basedir)) fs.mkdirs(basedir);
>    123.                     System.out.println("Creating Directory End");
>    124.                 }
>    125.                 wl.writer = getNewWriter(wl.writer, basedir);
>    126.                 wl.written = 0;
>    127.               }
>    128.               value.updateLatestStamp(this.now);
>    129.               wl.writer.append(value);
>    130.               wl.written += length;
>    131.               // Copy the row so we know when a row transition.
>    132.               this.previousRow = value.getRow();
>    133.               System.out.println("2nd Write Function Call End");
>    134.             }
>    135.           };
>    136.           writers.put(key, writer);
>    137.           writers.clear();
>    138.         }
>    139.         writer.write(new ImmutableBytesWritable(value.getRow()),
>    value);
>    140.         writer.close(context);
>    141.         System.out.println("1st Write Function Call End");
>    142.       }
>    143.     };
>    144.   }
>    145. }
>
>
>
>
>

-- 
Best Regards,
Arsalan Bilal





-- 
Best Regards,
Arsalan Bilal

Reply via email to