Repository: hive Updated Branches: refs/heads/master b82995517 -> b4302bb7a
http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java deleted file mode 100644 index 6aa079d..0000000 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java +++ /dev/null @@ -1,432 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.hive.metastore.messaging.json; - -import java.util.Arrays; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Predicate; -import java.util.regex.PatternSyntaxException; - -import javax.annotation.Nullable; - -import org.apache.hadoop.hive.metastore.api.Catalog; -import org.apache.hadoop.hive.metastore.api.Database; -import org.apache.hadoop.hive.metastore.api.Function; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NotificationEvent; -import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.SQLForeignKey; -import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; -import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; -import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.api.TxnToWriteId; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf; -import org.apache.hadoop.hive.metastore.events.AcidWriteEvent; -import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; -import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; -import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; -import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; -import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; -import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage; -import org.apache.hadoop.hive.metastore.messaging.AlterCatalogMessage; -import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; -import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; -import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateCatalogMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; -import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; -import org.apache.hadoop.hive.metastore.messaging.DropCatalogMessage; -import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; -import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; -import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; -import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; -import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; -import org.apache.hadoop.hive.metastore.messaging.InsertMessage; -import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; -import org.apache.hadoop.hive.metastore.messaging.MessageFactory; -import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; -import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage; -import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.thrift.TBase; -import org.apache.thrift.TDeserializer; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TJSONProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; - -import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.filterMapkeys; - -/** - * The JSON implementation of the MessageFactory. Constructs JSON implementations of each - * message-type. - */ -public class JSONMessageFactory extends MessageFactory { - - private static final Logger LOG = LoggerFactory.getLogger(JSONMessageFactory.class.getName()); - - private static JSONMessageDeserializer deserializer = new JSONMessageDeserializer(); - - private static List<Predicate<String>> paramsFilter; - - @Override - public void init() throws MetaException { - super.init(); - - List<String> excludePatterns = Arrays.asList(MetastoreConf - .getTrimmedStringsVar(conf, MetastoreConf.ConfVars.EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS)); - try { - paramsFilter = MetaStoreUtils.compilePatternsToPredicates(excludePatterns); - } catch (PatternSyntaxException e) { - LOG.error("Regex pattern compilation failed. Verify that " - + "metastore.notification.parameters.exclude.patterns has valid patterns."); - throw new MetaException("Regex pattern compilation failed. " + e.getMessage()); - } - } - - @Override - public MessageDeserializer getDeserializer() { - return deserializer; - } - - @Override - public String getMessageFormat() { - return "json-0.2"; - } - - @Override - public CreateDatabaseMessage buildCreateDatabaseMessage(Database db) { - return new JSONCreateDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, now()); - } - - @Override - public AlterDatabaseMessage buildAlterDatabaseMessage(Database beforeDb, Database afterDb) { - return new JSONAlterDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, - beforeDb, afterDb, now()); - } - - @Override - public DropDatabaseMessage buildDropDatabaseMessage(Database db) { - return new JSONDropDatabaseMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db.getName(), now()); - } - - @Override - public CreateTableMessage buildCreateTableMessage(Table table, Iterator<String> fileIter) { - return new JSONCreateTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, fileIter, now()); - } - - @Override - public AlterTableMessage buildAlterTableMessage(Table before, Table after, boolean isTruncateOp, Long writeId) { - return new JSONAlterTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, before, after, isTruncateOp, writeId, now()); - } - - @Override - public DropTableMessage buildDropTableMessage(Table table) { - return new JSONDropTableMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, now()); - } - - @Override - public AddPartitionMessage buildAddPartitionMessage(Table table, - Iterator<Partition> partitionsIterator, Iterator<PartitionFiles> partitionFileIter) { - return new JSONAddPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, - partitionsIterator, partitionFileIter, now()); - } - - @Override - public AlterPartitionMessage buildAlterPartitionMessage(Table table, Partition before, - Partition after, boolean isTruncateOp, Long writeId) { - return new JSONAlterPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, - table, before, after, isTruncateOp, writeId, now()); - } - - @Override - public DropPartitionMessage buildDropPartitionMessage(Table table, - Iterator<Partition> partitionsIterator) { - return new JSONDropPartitionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, table, - getPartitionKeyValues(table, partitionsIterator), now()); - } - - @Override - public CreateFunctionMessage buildCreateFunctionMessage(Function fn) { - return new JSONCreateFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now()); - } - - @Override - public DropFunctionMessage buildDropFunctionMessage(Function fn) { - return new JSONDropFunctionMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fn, now()); - } - - @Override - public InsertMessage buildInsertMessage(Table tableObj, Partition partObj, - boolean replace, Iterator<String> fileIter) { - return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, - tableObj, partObj, replace, fileIter, now()); - } - - @Override - public AddPrimaryKeyMessage buildAddPrimaryKeyMessage(List<SQLPrimaryKey> pks) { - return new JSONAddPrimaryKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, pks, now()); - } - - @Override - public AddForeignKeyMessage buildAddForeignKeyMessage(List<SQLForeignKey> fks) { - return new JSONAddForeignKeyMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fks, now()); - } - - @Override - public AddUniqueConstraintMessage buildAddUniqueConstraintMessage(List<SQLUniqueConstraint> uks) { - return new JSONAddUniqueConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, uks, now()); - } - - @Override - public AddNotNullConstraintMessage buildAddNotNullConstraintMessage(List<SQLNotNullConstraint> nns) { - return new JSONAddNotNullConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, nns, now()); - } - - @Override - public DropConstraintMessage buildDropConstraintMessage(String dbName, String tableName, - String constraintName) { - return new JSONDropConstraintMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, dbName, tableName, - constraintName, now()); - } - - @Override - public CreateCatalogMessage buildCreateCatalogMessage(Catalog catalog) { - return new JSONCreateCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(), now()); - } - - @Override - public AlterCatalogMessage buildAlterCatalogMessage(Catalog beforeCat, Catalog afterCat) { - return new JSONAlterCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, - beforeCat, afterCat, now()); - } - - @Override - public DropCatalogMessage buildDropCatalogMessage(Catalog catalog) { - return new JSONDropCatalogMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, catalog.getName(), now()); - } - - @Override - public OpenTxnMessage buildOpenTxnMessage(Long fromTxnId, Long toTxnId) { - return new JSONOpenTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, fromTxnId, toTxnId, now()); - } - - @Override - public CommitTxnMessage buildCommitTxnMessage(Long txnId) { - return new JSONCommitTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now()); - } - - @Override - public AbortTxnMessage buildAbortTxnMessage(Long txnId) { - return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now()); - } - - @Override - public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList, - String dbName, String tableName) { - return new JSONAllocWriteIdMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnToWriteIdList, dbName, tableName, now()); - } - - @Override - public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files) { - return new JSONAcidWriteMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), acidWriteEvent, files); - } - - private long now() { - return System.currentTimeMillis() / 1000; - } - - static Map<String, String> getPartitionKeyValues(Table table, Partition partition) { - Map<String, String> partitionKeys = new LinkedHashMap<>(); - for (int i = 0; i < table.getPartitionKeysSize(); ++i) { - partitionKeys.put(table.getPartitionKeys().get(i).getName(), partition.getValues().get(i)); - } - return partitionKeys; - } - - static List<Map<String, String>> getPartitionKeyValues(final Table table, - Iterator<Partition> iterator) { - return Lists.newArrayList(Iterators.transform(iterator, - new com.google.common.base.Function<Partition, Map<String, String>>() { - @Override - public Map<String, String> apply(@Nullable Partition partition) { - return getPartitionKeyValues(table, partition); - } - })); - } - - static String createPrimaryKeyObjJson(SQLPrimaryKey primaryKeyObj) throws TException { - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(primaryKeyObj, "UTF-8"); - } - - static String createForeignKeyObjJson(SQLForeignKey foreignKeyObj) throws TException { - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(foreignKeyObj, "UTF-8"); - } - - static String createUniqueConstraintObjJson(SQLUniqueConstraint uniqueConstraintObj) throws TException { - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(uniqueConstraintObj, "UTF-8"); - } - - static String createNotNullConstraintObjJson(SQLNotNullConstraint notNullConstaintObj) throws TException { - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(notNullConstaintObj, "UTF-8"); - } - - static String createDatabaseObjJson(Database dbObj) throws TException { - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(dbObj, "UTF-8"); - } - - static String createCatalogObjJson(Catalog catObj) throws TException { - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(catObj, "UTF-8"); - } - - static String createTableObjJson(Table tableObj) throws TException { - //Note: The parameters of the Table object will be removed in the filter if it matches - // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS - filterMapkeys(tableObj.getParameters(), paramsFilter); - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(tableObj, "UTF-8"); - } - - static String createPartitionObjJson(Partition partitionObj) throws TException { - //Note: The parameters of the Partition object will be removed in the filter if it matches - // any pattern provided through EVENT_NOTIFICATION_PARAMETERS_EXCLUDE_PATTERNS - filterMapkeys(partitionObj.getParameters(), paramsFilter); - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(partitionObj, "UTF-8"); - } - - static String createFunctionObjJson(Function functionObj) throws TException { - TSerializer serializer = new TSerializer(new TJSONProtocol.Factory()); - return serializer.toString(functionObj, "UTF-8"); - } - - public static ObjectNode getJsonTree(NotificationEvent event) throws Exception { - return getJsonTree(event.getMessage()); - } - - public static ObjectNode getJsonTree(String eventMessage) throws Exception { - JsonParser jsonParser = (new JsonFactory()).createJsonParser(eventMessage); - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(jsonParser, ObjectNode.class); - } - - public static Table getTableObj(ObjectNode jsonTree) throws Exception { - TDeserializer deSerializer = new TDeserializer(new TJSONProtocol.Factory()); - Table tableObj = new Table(); - String tableJson = jsonTree.get("tableObjJson").asText(); - deSerializer.deserialize(tableObj, tableJson, "UTF-8"); - return tableObj; - } - - /* - * TODO: Some thoughts here : We have a current todo to move some of these methods over to - * MessageFactory instead of being here, so we can override them, but before we move them over, - * we should keep the following in mind: - * - * a) We should return Iterables, not Lists. That makes sure that we can be memory-safe when - * implementing it rather than forcing ourselves down a path wherein returning List is part of - * our interface, and then people use .size() or somesuch which makes us need to materialize - * the entire list and not change. Also, returning Iterables allows us to do things like - * Iterables.transform for some of these. - * b) We should not have "magic" names like "tableObjJson", because that breaks expectation of a - * couple of things - firstly, that of serialization format, although that is fine for this - * JSONMessageFactory, and secondly, that makes us just have a number of mappings, one for each - * obj type, and sometimes, as the case is with alter, have multiples. Also, any event-specific - * item belongs in that event message / event itself, as opposed to in the factory. It's okay to - * have utility accessor methods here that are used by each of the messages to provide accessors. - * I'm adding a couple of those here. - * - */ - - public static TBase getTObj(String tSerialized, Class<? extends TBase> objClass) throws Exception{ - TDeserializer thriftDeSerializer = new TDeserializer(new TJSONProtocol.Factory()); - TBase obj = objClass.newInstance(); - thriftDeSerializer.deserialize(obj, tSerialized, "UTF-8"); - return obj; - } - - public static Iterable<? extends TBase> getTObjs( - Iterable<String> objRefStrs, final Class<? extends TBase> objClass) throws Exception { - - try { - return Iterables.transform(objRefStrs, new com.google.common.base.Function<String,TBase>(){ - @Override - public TBase apply(@Nullable String objStr){ - try { - return getTObj(objStr, objClass); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - } catch (RuntimeException re){ - // We have to add this bit of exception handling here, because Function.apply does not allow us to throw - // the actual exception that might be a checked exception, so we wind up needing to throw a RuntimeException - // with the previously thrown exception as its cause. However, since RuntimeException.getCause() returns - // a throwable instead of an Exception, we have to account for the possibility that the underlying code - // might have thrown a Throwable that we wrapped instead, in which case, continuing to throw the - // RuntimeException is the best thing we can do. - Throwable t = re.getCause(); - if (t instanceof Exception){ - throw (Exception) t; - } else { - throw re; - } - } - } - - // If we do not need this format of accessor using ObjectNode, this is a candidate for removal as well - public static Iterable<? extends TBase> getTObjs( - ObjectNode jsonTree, String objRefListName, final Class<? extends TBase> objClass) throws Exception { - Iterable<JsonNode> jsonArrayIterator = jsonTree.get(objRefListName); - com.google.common.base.Function<JsonNode,String> textExtractor = - new com.google.common.base.Function<JsonNode, String>() { - @Nullable - @Override - public String apply(@Nullable JsonNode input) { - return input.asText(); - } - }; - return getTObjs(Iterables.transform(jsonArrayIterator, textExtractor), objClass); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java new file mode 100644 index 0000000..9c64e33 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/DeSerializer.java @@ -0,0 +1,181 @@ +package org.apache.hadoop.hive.metastore.messaging.json.gzip; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage; +import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AddPrimaryKeyMessage; +import org.apache.hadoop.hive.metastore.messaging.AddUniqueConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage; +import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage; +import org.apache.hadoop.hive.metastore.messaging.DropConstraintMessage; +import org.apache.hadoop.hive.metastore.messaging.DropDatabaseMessage; +import org.apache.hadoop.hive.metastore.messaging.DropFunctionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage; +import org.apache.hadoop.hive.metastore.messaging.DropTableMessage; +import org.apache.hadoop.hive.metastore.messaging.InsertMessage; +import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage; +import org.apache.hadoop.hive.metastore.messaging.json.JSONMessageDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.zip.GZIPInputStream; + + +public class DeSerializer extends JSONMessageDeserializer { + private static final Logger LOG = LoggerFactory.getLogger(Serializer.class.getName()); + + private static String deCompress(String messageBody) { + byte[] decodedBytes = Base64.getDecoder().decode(messageBody.getBytes(StandardCharsets.UTF_8)); + try ( + ByteArrayInputStream in = new ByteArrayInputStream(decodedBytes); + GZIPInputStream is = new GZIPInputStream(in) + ) { + byte[] bytes = IOUtils.toByteArray(is); + return new String(bytes, StandardCharsets.UTF_8); + } catch (IOException e) { + LOG.error("cannot decode the stream", e); + LOG.debug("base64 encoded String", messageBody); + throw new RuntimeException("cannot decode the stream ", e); + } + } + + /** + * this is mainly as a utility to allow debugging of messages for developers by providing the + * message in a file and getting an actual message out. + * This class on a deployed hive instance will also be bundled in hive-exec jar. + * + */ + public static void main(String[] args) throws IOException { + if(args.length != 1) { + System.out.println("Usage:"); + System.out.println("java -cp [classpath] "+DeSerializer.class.getCanonicalName() +" [file_location]"); + } + System.out.print( + deCompress(FileUtils.readFileToString(new File(args[0]), StandardCharsets.UTF_8))); + } + + @Override + public CreateDatabaseMessage getCreateDatabaseMessage(String messageBody) { + return super.getCreateDatabaseMessage(deCompress(messageBody)); + } + + @Override + public AlterDatabaseMessage getAlterDatabaseMessage(String messageBody) { + return super.getAlterDatabaseMessage(deCompress(messageBody)); + } + + @Override + public DropDatabaseMessage getDropDatabaseMessage(String messageBody) { + return super.getDropDatabaseMessage(deCompress(messageBody)); + } + + @Override + public CreateTableMessage getCreateTableMessage(String messageBody) { + return super.getCreateTableMessage(deCompress(messageBody)); + } + + @Override + public AlterTableMessage getAlterTableMessage(String messageBody) { + return super.getAlterTableMessage(deCompress(messageBody)); + } + + @Override + public DropTableMessage getDropTableMessage(String messageBody) { + return super.getDropTableMessage(deCompress(messageBody)); + } + + @Override + public AddPartitionMessage getAddPartitionMessage(String messageBody) { + return super.getAddPartitionMessage(deCompress(messageBody)); + } + + @Override + public AlterPartitionMessage getAlterPartitionMessage(String messageBody) { + return super.getAlterPartitionMessage(deCompress(messageBody)); + } + + @Override + public DropPartitionMessage getDropPartitionMessage(String messageBody) { + return super.getDropPartitionMessage(deCompress(messageBody)); + } + + @Override + public CreateFunctionMessage getCreateFunctionMessage(String messageBody) { + return super.getCreateFunctionMessage(deCompress(messageBody)); + } + + @Override + public DropFunctionMessage getDropFunctionMessage(String messageBody) { + return super.getDropFunctionMessage(deCompress(messageBody)); + } + + @Override + public InsertMessage getInsertMessage(String messageBody) { + return super.getInsertMessage(deCompress(messageBody)); + } + + @Override + public AddPrimaryKeyMessage getAddPrimaryKeyMessage(String messageBody) { + return super.getAddPrimaryKeyMessage(deCompress(messageBody)); + } + + @Override + public AddForeignKeyMessage getAddForeignKeyMessage(String messageBody) { + return super.getAddForeignKeyMessage(deCompress(messageBody)); + } + + @Override + public AddUniqueConstraintMessage getAddUniqueConstraintMessage(String messageBody) { + return super.getAddUniqueConstraintMessage(deCompress(messageBody)); + } + + @Override + public AddNotNullConstraintMessage getAddNotNullConstraintMessage(String messageBody) { + return super.getAddNotNullConstraintMessage(deCompress(messageBody)); + } + + @Override + public DropConstraintMessage getDropConstraintMessage(String messageBody) { + return super.getDropConstraintMessage(deCompress(messageBody)); + } + + @Override + public OpenTxnMessage getOpenTxnMessage(String messageBody) { + return super.getOpenTxnMessage(deCompress(messageBody)); + } + + @Override + public CommitTxnMessage getCommitTxnMessage(String messageBody) { + return super.getCommitTxnMessage(deCompress(messageBody)); + } + + @Override + public AbortTxnMessage getAbortTxnMessage(String messageBody) { + return super.getAbortTxnMessage(deCompress(messageBody)); + } + + @Override + public AllocWriteIdMessage getAllocWriteIdMessage(String messageBody) { + return super.getAllocWriteIdMessage(deCompress(messageBody)); + } + + @Override + public AcidWriteMessage getAcidWriteMessage(String messageBody) { + return super.getAcidWriteMessage(deCompress(messageBody)); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java new file mode 100644 index 0000000..07b01a9 --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/GzipJSONMessageEncoder.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hive.metastore.messaging.json.gzip; + +import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer; +import org.apache.hadoop.hive.metastore.messaging.MessageEncoder; +import org.apache.hadoop.hive.metastore.messaging.MessageFactory; +import org.apache.hadoop.hive.metastore.messaging.MessageSerializer; + +/** + * This implementation gzips and then Base64 encodes the message before writing it out. + * This MessageEncoder will break the backward compatibility for hive replication v1 which uses webhcat endpoints. + */ +public class GzipJSONMessageEncoder implements MessageEncoder { + public static final String FORMAT = "gzip(json-2.0)"; + + static { + MessageFactory.register(FORMAT, GzipJSONMessageEncoder.class); + } + + private static DeSerializer deSerializer = new DeSerializer(); + private static Serializer serializer = new Serializer(); + + private static volatile MessageEncoder instance; + + public static MessageEncoder getInstance() { + if (instance == null) { + synchronized (GzipJSONMessageEncoder.class) { + if (instance == null) { + instance = new GzipJSONMessageEncoder(); + } + } + } + return instance; + } + + @Override + public MessageDeserializer getDeserializer() { + return deSerializer; + } + + @Override + public MessageSerializer getSerializer() { + return serializer; + } + + @Override + public String getMessageFormat() { + return FORMAT; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java new file mode 100644 index 0000000..c40600d --- /dev/null +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/gzip/Serializer.java @@ -0,0 +1,32 @@ +package org.apache.hadoop.hive.metastore.messaging.json.gzip; + +import org.apache.hadoop.hive.metastore.messaging.EventMessage; +import org.apache.hadoop.hive.metastore.messaging.MessageSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.zip.GZIPOutputStream; + +class Serializer implements MessageSerializer { + private static final Logger LOG = LoggerFactory.getLogger(Serializer.class.getName()); + + @Override + public String serialize(EventMessage message) { + String messageAsString = MessageSerializer.super.serialize(message); + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + GZIPOutputStream gout = new GZIPOutputStream(baos); + gout.write(messageAsString.getBytes(StandardCharsets.UTF_8)); + gout.close(); + byte[] compressed = baos.toByteArray(); + return new String(Base64.getEncoder().encode(compressed), StandardCharsets.UTF_8); + } catch (IOException e) { + LOG.error("could not use gzip output stream", e); + LOG.debug("message " + messageAsString); + throw new RuntimeException("could not use the gzip output Stream", e); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java index 3d36b60..b01a632 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java @@ -23,6 +23,7 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,6 +43,8 @@ import org.slf4j.LoggerFactory; import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME; public class MetaStoreTestUtils { + private static Map<Integer, Thread> map = new HashMap<>(); + private static final Logger LOG = LoggerFactory.getLogger(MetaStoreTestUtils.class); private static final String TMP_DIR = System.getProperty("test.tmp.dir"); public static final int RETRY_COUNT = 10; @@ -75,9 +78,17 @@ public class MetaStoreTestUtils { }, "MetaStoreThread-" + port); thread.setDaemon(true); thread.start(); + map.put(port,thread); MetaStoreTestUtils.loopUntilHMSReady(port); } + public static void close(final int port){ + Thread thread = map.get(port); + if(thread != null){ + thread.stop(); + } + } + public static int startMetaStoreWithRetry(final HadoopThriftAuthBridge bridge) throws Exception { return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, MetastoreConf.newMetastoreConf()); } http://git-wip-us.apache.org/repos/asf/hive/blob/b4302bb7/testutils/ptest2/conf/deployed/master-mr2.properties ---------------------------------------------------------------------- diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties b/testutils/ptest2/conf/deployed/master-mr2.properties index 23ad0f6..ad5405f 100644 --- a/testutils/ptest2/conf/deployed/master-mr2.properties +++ b/testutils/ptest2/conf/deployed/master-mr2.properties @@ -68,7 +68,7 @@ ut.service.batchSize=8 unitTests.module.itests.hive-unit=itests.hive-unit ut.itests.hive-unit.batchSize=9 -ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr +ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios TestReplWithJsonMessageFormat unitTests.module.itests.qtest=itests.qtest ut.itests.qtest.batchSize=9