PHOENIX-2674 PhoenixMapReduceUtil#setInput doesn't honor condition clause Setting the condition in the PhoenixMapReduceUtil, as well as some slight cleanup for duplicate code in setInput(). Adding a test that covers mapreduce with and without a condition.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/8ece81b5 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/8ece81b5 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/8ece81b5 Branch: refs/heads/calcite Commit: 8ece81b5522df3e6bd9dfdb3112e101215bb49f1 Parents: 0c1fd3a Author: Jesse Yates <jya...@apache.org> Authored: Wed Feb 10 12:46:47 2016 -0800 Committer: Jesse Yates <jya...@apache.org> Committed: Fri Feb 12 12:15:42 2016 -0800 ---------------------------------------------------------------------- .../org/apache/phoenix/end2end/MapReduceIT.java | 230 +++++++++++++++++++ .../mapreduce/util/PhoenixMapReduceUtil.java | 65 +++--- 2 files changed, 264 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece81b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java new file mode 100644 index 0000000..f030701 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.mapreduce.PhoenixOutputFormat; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; +import org.apache.phoenix.schema.types.PDouble; +import org.apache.phoenix.schema.types.PhoenixArray; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.sql.*; + +import static org.junit.Assert.*; + +/** + * Test that our MapReduce basic tools work as expected + */ +public class MapReduceIT extends BaseHBaseManagedTimeIT { + + private static final String STOCK_TABLE_NAME = "stock"; + private static final String STOCK_STATS_TABLE_NAME = "stock_stats"; + private static final String STOCK_NAME = "STOCK_NAME"; + private static final String RECORDING_YEAR = "RECORDING_YEAR"; + private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER"; + private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS " + STOCK_TABLE_NAME + " ( " + + STOCK_NAME + " VARCHAR NOT NULL ," + RECORDING_YEAR + " INTEGER NOT NULL, " + RECORDINGS_QUARTER + + " DOUBLE array[] CONSTRAINT pk PRIMARY KEY (" + STOCK_NAME + " , " + RECORDING_YEAR + "))"; + + private static final String MAX_RECORDING = "MAX_RECORDING"; + private static final String CREATE_STOCK_STATS_TABLE = + "CREATE TABLE IF NOT EXISTS " + STOCK_STATS_TABLE_NAME + "(" + STOCK_NAME + " VARCHAR NOT NULL , " + + MAX_RECORDING + " DOUBLE CONSTRAINT pk PRIMARY KEY (" + STOCK_NAME + "))"; + private static final String UPSERT = "UPSERT into " + STOCK_TABLE_NAME + " values (?, ?, ?)"; + + @Before + public void setupTables() throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute(CREATE_STOCK_TABLE); + conn.createStatement().execute(CREATE_STOCK_STATS_TABLE); + conn.commit(); + } + + @Test + public void testNoConditionsOnSelect() throws Exception { + final Configuration conf = getUtility().getConfiguration(); + Job job = Job.getInstance(conf); + PhoenixMapReduceUtil.setInput(job, StockWritable.class, STOCK_TABLE_NAME, null, + STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER); + testJob(job, 91.04); + } + + @Test + public void testConditionsOnSelect() throws Exception { + final Configuration conf = getUtility().getConfiguration(); + Job job = Job.getInstance(conf); + PhoenixMapReduceUtil.setInput(job, StockWritable.class, STOCK_TABLE_NAME, RECORDING_YEAR+" < 2009", + STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER); + testJob(job, 81.04); + } + + private void testJob(Job job, double expectedMax) + throws SQLException, InterruptedException, IOException, ClassNotFoundException { + upsertData(); + + // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints + job.getConfiguration().set("mapreduce.framework.name", "local"); + setOutput(job); + + job.setMapperClass(StockMapper.class); + job.setReducerClass(StockReducer.class); + job.setOutputFormatClass(PhoenixOutputFormat.class); + + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(DoubleWritable.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(StockWritable.class); + + // run job + assertTrue("Job didn't complete successfully! Check logs for reason.", job.waitForCompletion(true)); + + // verify + ResultSet stats = DriverManager.getConnection(getUrl()).createStatement() + .executeQuery("SELECT * FROM " + STOCK_STATS_TABLE_NAME); + assertTrue("No data stored in stats table!", stats.next()); + String name = stats.getString(1); + double max = stats.getDouble(2); + assertEquals("Got the wrong stock name!", "AAPL", name); + assertEquals("Max value didn't match the expected!", expectedMax, max, 0); + assertFalse("Should only have stored one row in stats table!", stats.next()); + } + + /** + * Custom output setting because output upsert statement setting is broken (PHOENIX-2677) + * + * @param job to update + */ + private void setOutput(Job job) { + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setOutputTableName(configuration, STOCK_STATS_TABLE_NAME); + configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, "UPSERT into " + STOCK_STATS_TABLE_NAME + + " (" + STOCK_NAME + ", " + MAX_RECORDING + ") values (?,?)"); + job.setOutputFormatClass(PhoenixOutputFormat.class); + } + + private void upsertData() throws SQLException { + Connection conn = DriverManager.getConnection(getUrl()); + PreparedStatement stmt = conn.prepareStatement(UPSERT); + upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3}); + upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3}); + conn.commit(); + } + + private void upsertData(PreparedStatement stmt, String name, int year, Double[] data) throws SQLException { + int i = 1; + stmt.setString(i++, name); + stmt.setInt(i++, year); + Array recordings = new PhoenixArray.PrimitiveDoublePhoenixArray(PDouble.INSTANCE, data); + stmt.setArray(i++, recordings); + stmt.execute(); + } + + public static class StockWritable implements DBWritable { + + private String stockName; + private double[] recordings; + private double maxPrice; + + @Override + public void readFields(ResultSet rs) throws SQLException { + stockName = rs.getString(STOCK_NAME); + final Array recordingsArray = rs.getArray(RECORDINGS_QUARTER); + recordings = (double[]) recordingsArray.getArray(); + } + + @Override + public void write(PreparedStatement pstmt) throws SQLException { + pstmt.setString(1, stockName); + pstmt.setDouble(2, maxPrice); + } + + public double[] getRecordings() { + return recordings; + } + + public String getStockName() { + return stockName; + } + + public void setStockName(String stockName) { + this.stockName = stockName; + } + + public void setMaxPrice(double maxPrice) { + this.maxPrice = maxPrice; + } + } + + /** + * Extract the max price for each stock recording + */ + public static class StockMapper extends Mapper<NullWritable, StockWritable, Text, DoubleWritable> { + + private Text stock = new Text(); + private DoubleWritable price = new DoubleWritable(); + + @Override + protected void map(NullWritable key, StockWritable stockWritable, Context context) + throws IOException, InterruptedException { + double[] recordings = stockWritable.getRecordings(); + final String stockName = stockWritable.getStockName(); + double maxPrice = Double.MIN_VALUE; + for (double recording : recordings) { + if (maxPrice < recording) { + maxPrice = recording; + } + } + stock.set(stockName); + price.set(maxPrice); + context.write(stock, price); + } + } + + /** + * Store the max price seen for each stock + */ + public static class StockReducer extends Reducer<Text, DoubleWritable, NullWritable, StockWritable> { + + @Override + protected void reduce(Text key, Iterable<DoubleWritable> recordings, Context context) + throws IOException, InterruptedException { + double maxPrice = Double.MIN_VALUE; + for (DoubleWritable recording : recordings) { + if (maxPrice < recording.get()) { + maxPrice = recording.get(); + } + } + final StockWritable stock = new StockWritable(); + stock.setStockName(key.toString()); + stock.setMaxPrice(maxPrice); + context.write(NullWritable.get(), stock); + } + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/8ece81b5/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java index f52c860..125c6a8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java @@ -30,43 +30,46 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; public final class PhoenixMapReduceUtil { private PhoenixMapReduceUtil() { - + } - + /** - * + * * @param job * @param inputClass DBWritable class * @param tableName Input table name - * @param conditions Condition clause to be added to the WHERE clause. + * @param conditions Condition clause to be added to the WHERE clause. Can be <tt>null</tt> if there are no conditions. * @param fieldNames fields being projected for the SELECT query. */ - public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName , final String conditions, final String... fieldNames) { - job.setInputFormatClass(PhoenixInputFormat.class); - final Configuration configuration = job.getConfiguration(); - PhoenixConfigurationUtil.setInputTableName(configuration, tableName); - PhoenixConfigurationUtil.setSelectColumnNames(configuration,fieldNames); - PhoenixConfigurationUtil.setInputClass(configuration,inputClass); - PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE); + public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName, + final String conditions, final String... fieldNames) { + final Configuration configuration = setInput(job, inputClass, tableName); + if(conditions != null) { + PhoenixConfigurationUtil.setInputTableConditions(configuration, conditions); + } + PhoenixConfigurationUtil.setSelectColumnNames(configuration, fieldNames); } - + /** - * - * @param job - * @param inputClass DBWritable class + * + * @param job + * @param inputClass DBWritable class * @param tableName Input table name * @param inputQuery Select query. */ public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName, final String inputQuery) { - job.setInputFormatClass(PhoenixInputFormat.class); - final Configuration configuration = job.getConfiguration(); - PhoenixConfigurationUtil.setInputTableName(configuration, tableName); - PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery); - PhoenixConfigurationUtil.setInputClass(configuration,inputClass); + final Configuration configuration = setInput(job, inputClass, tableName); PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY); - } - + + private static Configuration setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName){ + job.setInputFormatClass(PhoenixInputFormat.class); + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setInputTableName(configuration, tableName); + PhoenixConfigurationUtil.setInputClass(configuration,inputClass); + return configuration; + } + /** * A method to override which HBase cluster for {@link PhoenixInputFormat} to read from * @param job MapReduce Job @@ -77,10 +80,10 @@ public final class PhoenixMapReduceUtil { PhoenixConfigurationUtil.setInputCluster(configuration, quorum); } /** - * + * * @param job - * @param outputClass - * @param tableName Output table + * @param outputClass + * @param tableName Output table * @param columns List of columns separated by , */ public static void setOutput(final Job job, final String tableName,final String columns) { @@ -89,13 +92,13 @@ public final class PhoenixMapReduceUtil { PhoenixConfigurationUtil.setOutputTableName(configuration, tableName); PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns.split(",")); } - - + + /** - * + * * @param job * @param outputClass - * @param tableName Output table + * @param tableName Output table * @param fieldNames fields */ public static void setOutput(final Job job, final String tableName , final String... fieldNames) { @@ -104,7 +107,7 @@ public final class PhoenixMapReduceUtil { PhoenixConfigurationUtil.setOutputTableName(configuration, tableName); PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames); } - + /** * A method to override which HBase cluster for {@link PhoenixOutputFormat} to write to * @param job MapReduce Job @@ -115,5 +118,5 @@ public final class PhoenixMapReduceUtil { PhoenixConfigurationUtil.setOutputCluster(configuration, quorum); } - + }