RYA-179 fixed license Updated the licenses in the rya.merger project to use the correct Apache header. Also, moved the license to above the package declaration.
closes #114 Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/1d33b435 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/1d33b435 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/1d33b435 Branch: refs/heads/master Commit: 1d33b4359dac3bc479f98919493138aaf8031129 Parents: 093e7c2 Author: isper3at <smith...@gmail.com> Authored: Thu Oct 13 21:20:55 2016 -0400 Committer: Aaron Mihalik <miha...@alum.mit.edu> Committed: Sat Oct 15 15:17:07 2016 -0400 ---------------------------------------------------------------------- .../AbstractDualInstanceAccumuloMRTool.java | 34 ++- .../apache/rya/accumulo/mr/merge/CopyTool.java | 57 ++--- .../apache/rya/accumulo/mr/merge/MergeTool.java | 188 +++++++------- .../accumulo/mr/merge/common/InstanceType.java | 36 ++- .../mr/merge/gui/DateTimePickerDialog.java | 77 +++--- .../merge/mappers/AccumuloCopyToolMapper.java | 40 ++- .../mr/merge/mappers/AccumuloRyaRuleMapper.java | 40 ++- .../mr/merge/mappers/BaseCopyToolMapper.java | 68 +++-- .../mr/merge/mappers/BaseRuleMapper.java | 78 +++--- .../mr/merge/mappers/FileCopyToolMapper.java | 50 ++-- .../mr/merge/mappers/MergeToolMapper.java | 178 +++++++------ .../mr/merge/mappers/RowRuleMapper.java | 68 +++-- .../mr/merge/reducers/MultipleFileReducer.java | 44 ++-- .../mr/merge/util/AccumuloInstanceDriver.java | 101 ++++---- .../mr/merge/util/AccumuloQueryRuleset.java | 104 ++++---- .../mr/merge/util/AccumuloRyaUtils.java | 247 +++++++++---------- .../rya/accumulo/mr/merge/util/CopyRule.java | 126 +++++----- .../rya/accumulo/mr/merge/util/GroupedRow.java | 66 +++-- .../accumulo/mr/merge/util/QueryRuleset.java | 180 +++++++------- .../rya/accumulo/mr/merge/util/TimeUtils.java | 110 ++++----- .../accumulo/mr/merge/util/ToolConfigUtils.java | 66 +++-- .../rya/accumulo/mr/merge/CopyToolTest.java | 98 ++++---- .../rya/accumulo/mr/merge/MergeToolTest.java | 138 +++++------ .../rya/accumulo/mr/merge/RulesetCopyIT.java | 156 ++++++------ .../accumulo/mr/merge/demo/CopyToolDemo.java | 92 ++++--- .../accumulo/mr/merge/demo/MergeToolDemo.java | 76 +++--- .../mr/merge/demo/util/DemoUtilities.java | 38 ++- .../driver/AccumuloDualInstanceDriver.java | 72 +++--- .../merge/driver/MiniAccumuloClusterDriver.java | 66 +++-- .../rya/accumulo/mr/merge/util/TestUtils.java | 76 +++--- 30 files changed, 1358 insertions(+), 1412 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1d33b435/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/AbstractDualInstanceAccumuloMRTool.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/AbstractDualInstanceAccumuloMRTool.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/AbstractDualInstanceAccumuloMRTool.java index fde6257..8e9c21b 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/AbstractDualInstanceAccumuloMRTool.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/AbstractDualInstanceAccumuloMRTool.java @@ -1,24 +1,22 @@ -package org.apache.rya.accumulo.mr.merge; - /* - * #%L - * org.apache.rya.accumulo.mr.merge - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.apache.rya.accumulo.mr.merge; import org.apache.accumulo.core.security.Authorizations; import org.apache.hadoop.util.Tool; @@ -53,7 +51,7 @@ public abstract class AbstractDualInstanceAccumuloMRTool extends AbstractAccumul if (childTablePrefix != null) { RdfCloudTripleStoreConstants.prefixTables(childTablePrefix); } - String childAuth = conf.get(MRUtils.AC_AUTH_PROP + MergeTool.CHILD_SUFFIX); + final String childAuth = conf.get(MRUtils.AC_AUTH_PROP + MergeTool.CHILD_SUFFIX); if (childAuth != null) { childAuthorizations = new Authorizations(childAuth.split(",")); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1d33b435/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java index 87161f6..660fa19 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/CopyTool.java @@ -1,24 +1,22 @@ -package org.apache.rya.accumulo.mr.merge; - /* - * #%L - * org.apache.rya.accumulo.mr.merge - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.apache.rya.accumulo.mr.merge; import java.io.BufferedOutputStream; import java.io.File; @@ -49,6 +47,7 @@ import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; import org.apache.accumulo.core.client.mapreduce.AccumuloMultiTableInputFormat; import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; import org.apache.accumulo.core.client.mapreduce.InputTableConfig; import org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner; import org.apache.accumulo.core.client.mock.MockInstance; @@ -74,6 +73,8 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.util.ToolRunner; @@ -373,7 +374,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { setupAccumuloInput(job); - AccumuloInputFormat.setInputTableName(job, table); + InputFormatBase.setInputTableName(job, table); // Set input output of the particular job if (useCopyFileOutput) { @@ -719,7 +720,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { protected void setupAccumuloInput(final Job job) throws AccumuloSecurityException { if (useCopyFileImport) { try { - AccumuloHDFSFileInputFormat.setInputPaths(job, localCopyFileImportDir); + FileInputFormat.setInputPaths(job, localCopyFileImportDir); } catch (final IOException e) { log.error("Failed to set copy file import directory", e); } @@ -730,25 +731,25 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { } else { job.setInputFormatClass(AccumuloHDFSFileInputFormat.class); } - AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); - AccumuloInputFormat.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix)); - AccumuloInputFormat.setScanAuthorizations(job, authorizations); + AbstractInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + InputFormatBase.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix)); + AbstractInputFormat.setScanAuthorizations(job, authorizations); if (!mock) { - AccumuloInputFormat.setZooKeeperInstance(job, new ClientConfiguration().withInstance(instance).withZkHosts(zk)); + AbstractInputFormat.setZooKeeperInstance(job, new ClientConfiguration().withInstance(instance).withZkHosts(zk)); } else { - AccumuloInputFormat.setMockInstance(job, instance); + AbstractInputFormat.setMockInstance(job, instance); } if (ttl != null) { final IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class); AgeOffFilter.setTTL(setting, Long.valueOf(ttl)); - AccumuloInputFormat.addIterator(job, setting); + InputFormatBase.addIterator(job, setting); } if (startTime != null) { final IteratorSetting setting = getStartTimeSetting(startTime); - AccumuloInputFormat.addIterator(job, setting); + InputFormatBase.addIterator(job, setting); } for (final IteratorSetting iteratorSetting : AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) { - AccumuloInputFormat.addIterator(job, iteratorSetting); + InputFormatBase.addIterator(job, iteratorSetting); } } } @@ -832,7 +833,7 @@ public class CopyTool extends AbstractDualInstanceAccumuloMRTool { } catch (final IOException e) { log.error("Failed to set permission for output path.", e); } - AccumuloFileOutputFormat.setOutputPath(job, filesOutputPath); + FileOutputFormat.setOutputPath(job, filesOutputPath); if (StringUtils.isNotBlank(compressionType)) { if (isValidCompressionType(compressionType)) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1d33b435/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/MergeTool.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/MergeTool.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/MergeTool.java index f923a75..8f045ee 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/MergeTool.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/MergeTool.java @@ -1,24 +1,22 @@ -package org.apache.rya.accumulo.mr.merge; - /* - * #%L - * org.apache.rya.accumulo.mr.merge - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.apache.rya.accumulo.mr.merge; import java.io.File; import java.io.IOException; @@ -38,7 +36,9 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.TableExistsException; import org.apache.accumulo.core.client.admin.SecurityOperations; import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mapreduce.InputFormatBase; import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.iterators.user.AgeOffFilter; @@ -160,13 +160,13 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * Sets duplicate keys in the config. * @param config the {@link Configuration}. */ - public static void setDuplicateKeys(Configuration config) { - for (Entry<String, List<String>> entry : DUPLICATE_KEY_MAP.entrySet()) { - String key = entry.getKey(); - List<String> duplicateKeys = entry.getValue(); - String value = config.get(key); + public static void setDuplicateKeys(final Configuration config) { + for (final Entry<String, List<String>> entry : DUPLICATE_KEY_MAP.entrySet()) { + final String key = entry.getKey(); + final List<String> duplicateKeys = entry.getValue(); + final String value = config.get(key); if (value != null) { - for (String duplicateKey : duplicateKeys) { + for (final String duplicateKey : duplicateKeys) { config.set(duplicateKey, value); } } @@ -179,11 +179,11 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * @param property the property to set and all its duplicates. * @param value the value to set the property to. */ - public static void setDuplicateKeysForProperty(Configuration config, String property, String value) { - List<String> duplicateKeys = DUPLICATE_KEY_MAP.get(property); + public static void setDuplicateKeysForProperty(final Configuration config, final String property, final String value) { + final List<String> duplicateKeys = DUPLICATE_KEY_MAP.get(property); config.set(property, value); if (duplicateKeys != null) { - for (String key : duplicateKeys) { + for (final String key : duplicateKeys) { config.set(key, value); } } @@ -212,26 +212,26 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { if (USE_START_TIME_DIALOG.equals(startTime)) { log.info("Select start time from dialog..."); - DateTimePickerDialog dateTimePickerDialog = new DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE); + final DateTimePickerDialog dateTimePickerDialog = new DateTimePickerDialog(DIALOG_TITLE, DIALOG_MESSAGE); dateTimePickerDialog.setVisible(true); - Date date = dateTimePickerDialog.getSelectedDateTime(); + final Date date = dateTimePickerDialog.getSelectedDateTime(); startTime = START_TIME_FORMATTER.format(date); conf.set(START_TIME_PROP, startTime); log.info("Will merge all data after " + date); } else if (startTime != null) { try { - Date date = START_TIME_FORMATTER.parse(startTime); + final Date date = START_TIME_FORMATTER.parse(startTime); log.info("Will merge all data after " + date); - } catch (ParseException e) { + } catch (final ParseException e) { throw new Exception("Unable to parse the provided start time: " + startTime, e); } } - boolean useTimeSync = conf.getBoolean(CopyTool.USE_NTP_SERVER_PROP, false); + final boolean useTimeSync = conf.getBoolean(CopyTool.USE_NTP_SERVER_PROP, false); if (useTimeSync) { - String tomcatUrl = conf.get(CopyTool.CHILD_TOMCAT_URL_PROP, null); - String ntpServerHost = conf.get(CopyTool.NTP_SERVER_HOST_PROP, null); + final String tomcatUrl = conf.get(CopyTool.CHILD_TOMCAT_URL_PROP, null); + final String ntpServerHost = conf.get(CopyTool.NTP_SERVER_HOST_PROP, null); Long timeOffset = null; try { log.info("Comparing child machine's time to NTP server time..."); @@ -251,7 +251,7 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { } @Override - public int run(String[] strings) throws Exception { + public int run(final String[] strings) throws Exception { useMergeFileInput = conf.getBoolean(USE_MERGE_FILE_INPUT, false); log.info("Setting up Merge Tool..."); @@ -263,14 +263,14 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { copyParentPropertiesToChild(conf); } - for (String table : tables) { - String childTable = table.replaceFirst(tablePrefix, childTablePrefix); - String jobName = "Merge Tool, merging Child Table: " + childTable + ", into Parent Table: " + table + ", " + System.currentTimeMillis(); + for (final String table : tables) { + final String childTable = table.replaceFirst(tablePrefix, childTablePrefix); + final String jobName = "Merge Tool, merging Child Table: " + childTable + ", into Parent Table: " + table + ", " + System.currentTimeMillis(); log.info("Initializing job: " + jobName); conf.set(MRUtils.JOB_NAME_PROP, jobName); conf.set(TABLE_NAME_PROP, table); - Job job = Job.getInstance(conf); + final Job job = Job.getInstance(conf); job.setJarByClass(MergeTool.class); if (useMergeFileInput) { @@ -279,7 +279,7 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { setupAccumuloInput(job); - AccumuloInputFormat.setInputTableName(job, table); + InputFormatBase.setInputTableName(job, table); // Set input output of the particular job job.setMapOutputKeyClass(Text.class); @@ -294,22 +294,22 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { job.setReducerClass(Reducer.class); // Submit the job - Date beginTime = new Date(); + final Date beginTime = new Date(); log.info("Job for table \"" + table + "\" started: " + beginTime); - int exitCode = job.waitForCompletion(true) ? 0 : 1; + final int exitCode = job.waitForCompletion(true) ? 0 : 1; if (useMergeFileInput && StringUtils.isNotBlank(tempChildAuths)) { // Clear any of the temporary child auths given to the parent - AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); + final AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); parentAccumuloRdfConfiguration.setTablePrefix(tablePrefix); - Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); - SecurityOperations secOps = parentConnector.securityOperations(); + final Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); + final SecurityOperations secOps = parentConnector.securityOperations(); AccumuloRyaUtils.removeUserAuths(userName, secOps, tempChildAuths); } if (exitCode == 0) { - Date endTime = new Date(); + final Date endTime = new Date(); log.info("Job for table \"" + table + "\" finished: " + endTime); log.info("The job took " + (endTime.getTime() - beginTime.getTime()) / 1000 + " seconds."); } else { @@ -326,27 +326,27 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * @param childTableName the name of the child table. * @throws IOException */ - public void createTempTableIfNeeded(String childTableName) throws IOException { + public void createTempTableIfNeeded(final String childTableName) throws IOException { try { - AccumuloRdfConfiguration accumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); + final AccumuloRdfConfiguration accumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); accumuloRdfConfiguration.setTablePrefix(childTablePrefix); - Connector connector = AccumuloRyaUtils.setupConnector(accumuloRdfConfiguration); + final Connector connector = AccumuloRyaUtils.setupConnector(accumuloRdfConfiguration); if (!connector.tableOperations().exists(childTableName)) { log.info("Creating table: " + childTableName); connector.tableOperations().create(childTableName); log.info("Created table: " + childTableName); log.info("Granting authorizations to table: " + childTableName); - SecurityOperations secOps = connector.securityOperations(); + final SecurityOperations secOps = connector.securityOperations(); secOps.grantTablePermission(userName, childTableName, TablePermission.WRITE); log.info("Granted authorizations to table: " + childTableName); - Authorizations parentAuths = secOps.getUserAuthorizations(userName); + final Authorizations parentAuths = secOps.getUserAuthorizations(userName); // Add child authorizations so the temp parent table can be accessed. if (!parentAuths.equals(childAuthorizations)) { - List<String> childAuthList = findUniqueAuthsFromChild(parentAuths.toString(), childAuthorizations.toString()); + final List<String> childAuthList = findUniqueAuthsFromChild(parentAuths.toString(), childAuthorizations.toString()); tempChildAuths = Joiner.on(",").join(childAuthList); log.info("Adding the authorization, \"" + tempChildAuths + "\", to the parent user, \"" + userName + "\""); - Authorizations newAuths = AccumuloRyaUtils.addUserAuths(userName, secOps, new Authorizations(tempChildAuths)); + final Authorizations newAuths = AccumuloRyaUtils.addUserAuths(userName, secOps, new Authorizations(tempChildAuths)); secOps.changeUserAuthorizations(userName, newAuths); } } @@ -361,9 +361,9 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * @param childAuths the comma-separated string of parent authorizations. * @return the unique child authorizations that are not in the parent. */ - private static List<String> findUniqueAuthsFromChild(String parentAuths, String childAuths) { - List<String> parentAuthList = AccumuloRyaUtils.convertAuthStringToList(parentAuths); - List<String> childAuthList = AccumuloRyaUtils.convertAuthStringToList(childAuths); + private static List<String> findUniqueAuthsFromChild(final String parentAuths, final String childAuths) { + final List<String> parentAuthList = AccumuloRyaUtils.convertAuthStringToList(parentAuths); + final List<String> childAuthList = AccumuloRyaUtils.convertAuthStringToList(childAuths); childAuthList.removeAll(parentAuthList); @@ -375,25 +375,25 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * @param childTableName the name of the child table to import into a temporary parent table. * @throws Exception */ - public void importChildFilesToTempParentTable(String childTableName) throws Exception { + public void importChildFilesToTempParentTable(final String childTableName) throws Exception { // Create a temporary table in the parent instance to import the child files to. Then run the merge process on the parent table and temp child table. - String tempChildTable = childTableName + TEMP_SUFFIX; + final String tempChildTable = childTableName + TEMP_SUFFIX; createTempTableIfNeeded(tempChildTable); - AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); + final AccumuloRdfConfiguration parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(conf); parentAccumuloRdfConfiguration.setTablePrefix(childTablePrefix); - Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); - TableOperations parentTableOperations = parentConnector.tableOperations(); + final Connector parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); + final TableOperations parentTableOperations = parentConnector.tableOperations(); - Path localWorkDir = CopyTool.getPath(localMergeFileImportDir, childTableName); - Path hdfsBaseWorkDir = CopyTool.getPath(baseImportDir, childTableName); + final Path localWorkDir = CopyTool.getPath(localMergeFileImportDir, childTableName); + final Path hdfsBaseWorkDir = CopyTool.getPath(baseImportDir, childTableName); CopyTool.copyLocalToHdfs(localWorkDir, hdfsBaseWorkDir, conf); - Path files = CopyTool.getPath(hdfsBaseWorkDir.toString(), "files"); - Path failures = CopyTool.getPath(hdfsBaseWorkDir.toString(), "failures"); - FileSystem fs = FileSystem.get(conf); + final Path files = CopyTool.getPath(hdfsBaseWorkDir.toString(), "files"); + final Path failures = CopyTool.getPath(hdfsBaseWorkDir.toString(), "failures"); + final FileSystem fs = FileSystem.get(conf); // With HDFS permissions on, we need to make sure the Accumulo user can read/move the files fs.setPermission(hdfsBaseWorkDir, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); if (fs.exists(failures)) { @@ -410,7 +410,7 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * Copies all the relevant parent instance config properties to the corresponding child properties. * @param config the {@link Configuration} to use. */ - public static void copyParentPropertiesToChild(Configuration config) { + public static void copyParentPropertiesToChild(final Configuration config) { // Copy the parent properties for the child to use. copyParentPropToChild(config, MRUtils.AC_MOCK_PROP); copyParentPropToChild(config, MRUtils.AC_INSTANCE_PROP); @@ -429,34 +429,34 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * @param config the {@link Configuration} to use. * @param parentPropertyName the parent property name to use. */ - public static void copyParentPropToChild(Configuration config, String parentPropertyName) { - String parentValue = config.get(parentPropertyName, ""); + public static void copyParentPropToChild(final Configuration config, final String parentPropertyName) { + final String parentValue = config.get(parentPropertyName, ""); config.set(parentPropertyName + MergeTool.CHILD_SUFFIX, parentValue); } @Override - protected void setupAccumuloInput(Job job) throws AccumuloSecurityException { + protected void setupAccumuloInput(final Job job) throws AccumuloSecurityException { // set up accumulo input if (!hdfsInput) { job.setInputFormatClass(AccumuloInputFormat.class); } else { job.setInputFormatClass(AccumuloHDFSFileInputFormat.class); } - AccumuloInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); - AccumuloInputFormat.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix)); - AccumuloInputFormat.setScanAuthorizations(job, authorizations); + AbstractInputFormat.setConnectorInfo(job, userName, new PasswordToken(pwd)); + InputFormatBase.setInputTableName(job, RdfCloudTripleStoreUtils.layoutPrefixToTable(rdfTableLayout, tablePrefix)); + AbstractInputFormat.setScanAuthorizations(job, authorizations); if (!mock) { - AccumuloInputFormat.setZooKeeperInstance(job, new ClientConfiguration().withInstance(instance).withZkHosts(zk)); + AbstractInputFormat.setZooKeeperInstance(job, new ClientConfiguration().withInstance(instance).withZkHosts(zk)); } else { - AccumuloInputFormat.setMockInstance(job, instance); + AbstractInputFormat.setMockInstance(job, instance); } if (ttl != null) { - IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class); + final IteratorSetting setting = new IteratorSetting(1, "fi", AgeOffFilter.class); AgeOffFilter.setTTL(setting, Long.valueOf(ttl)); - AccumuloInputFormat.addIterator(job, setting); + InputFormatBase.addIterator(job, setting); } - for (IteratorSetting iteratorSetting : AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) { - AccumuloInputFormat.addIterator(job, iteratorSetting); + for (final IteratorSetting iteratorSetting : AccumuloRyaUtils.COMMON_REG_EX_FILTER_SETTINGS) { + InputFormatBase.addIterator(job, iteratorSetting); } } @@ -465,28 +465,28 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * @param args the arguments list. * @return the execution result. */ - public static int setupAndRun(String[] args) { + public static int setupAndRun(final String[] args) { int returnCode = -1; try { - Configuration conf = new Configuration(); - Set<String> toolArgs = ToolConfigUtils.getUserArguments(conf, args); + final Configuration conf = new Configuration(); + final Set<String> toolArgs = ToolConfigUtils.getUserArguments(conf, args); if (!toolArgs.isEmpty()) { - String parameters = Joiner.on("\r\n\t").join(toolArgs); + final String parameters = Joiner.on("\r\n\t").join(toolArgs); log.info("Running Merge Tool with the following parameters...\r\n\t" + parameters); } returnCode = ToolRunner.run(conf, new MergeTool(), args); - } catch (Exception e) { + } catch (final Exception e) { log.error("Error running merge tool", e); } return returnCode; } - public static void main(String[] args) { - String log4jConfiguration = System.getProperties().getProperty("log4j.configuration"); + public static void main(final String[] args) { + final String log4jConfiguration = System.getProperties().getProperty("log4j.configuration"); if (StringUtils.isNotBlank(log4jConfiguration)) { - String parsedConfiguration = StringUtils.removeStart(log4jConfiguration, "file:"); - File configFile = new File(parsedConfiguration); + final String parsedConfiguration = StringUtils.removeStart(log4jConfiguration, "file:"); + final File configFile = new File(parsedConfiguration); if (configFile.exists()) { DOMConfigurator.configure(parsedConfiguration); } else { @@ -497,12 +497,12 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override - public void uncaughtException(Thread thread, Throwable throwable) { + public void uncaughtException(final Thread thread, final Throwable throwable) { log.error("Uncaught exception in " + thread.getName(), throwable); } }); - int returnCode = setupAndRun(args); + final int returnCode = setupAndRun(args); log.info("Finished running Merge Tool"); @@ -516,7 +516,7 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * to use the provided {@code startDate}. * @return the formatted start time string or {@code "dialog"}. */ - public static String getStartTimeString(Date startDate, boolean isStartTimeDialogEnabled) { + public static String getStartTimeString(final Date startDate, final boolean isStartTimeDialogEnabled) { String startTimeString; if (isStartTimeDialogEnabled) { startTimeString = USE_START_TIME_DIALOG; // set start date from dialog box @@ -531,8 +531,8 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * @param date the start {@link Date} of the filter that will be formatted as a string. * @return the formatted start time string. */ - public static String convertDateToStartTimeString(Date date) { - String startTimeString = START_TIME_FORMATTER.format(date); + public static String convertDateToStartTimeString(final Date date) { + final String startTimeString = START_TIME_FORMATTER.format(date); return startTimeString; } @@ -541,11 +541,11 @@ public class MergeTool extends AbstractDualInstanceAccumuloMRTool { * @param startTimeString the formatted time string. * @return the start {@link Date}. */ - public static Date convertStartTimeStringToDate(String startTimeString) { + public static Date convertStartTimeStringToDate(final String startTimeString) { Date date; try { date = START_TIME_FORMATTER.parse(startTimeString); - } catch (ParseException e) { + } catch (final ParseException e) { log.error("Could not parse date", e); return null; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1d33b435/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/common/InstanceType.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/common/InstanceType.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/common/InstanceType.java index cbb7652..abbdf4f 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/common/InstanceType.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/common/InstanceType.java @@ -1,24 +1,22 @@ -package org.apache.rya.accumulo.mr.merge.common; - /* - * #%L - * org.apache.rya.accumulo.mr.merge - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.apache.rya.accumulo.mr.merge.common; import org.apache.accumulo.core.client.mock.MockInstance; import org.apache.accumulo.minicluster.MiniAccumuloCluster; @@ -45,8 +43,8 @@ public enum InstanceType { * @param name the name to find. * @return the {@link InstanceType} or {@code null} if none could be found. */ - public static InstanceType fromName(String name) { - for (InstanceType instanceType : InstanceType.values()) { + public static InstanceType fromName(final String name) { + for (final InstanceType instanceType : InstanceType.values()) { if (instanceType.toString().equals(name)) { return instanceType; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1d33b435/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java index c035915..1c12bee 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/gui/DateTimePickerDialog.java @@ -1,24 +1,22 @@ -package org.apache.rya.accumulo.mr.merge.gui; - /* - * #%L - * org.apache.rya.accumulo.mr.merge - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.apache.rya.accumulo.mr.merge.gui; import java.awt.GridBagConstraints; import java.awt.GridBagLayout; @@ -37,6 +35,7 @@ import javax.swing.JSpinner; import javax.swing.JSpinner.DateEditor; import javax.swing.SpinnerDateModel; import javax.swing.SpinnerModel; +import javax.swing.WindowConstants; import com.toedter.calendar.JCalendar; @@ -50,7 +49,7 @@ public class DateTimePickerDialog extends JDialog { private JSpinner timeSpinner; private Date selectedDateTime; - private JLabel label; + private final JLabel label; /** @@ -58,7 +57,7 @@ public class DateTimePickerDialog extends JDialog { * @param title the title to display up top. * @param message the message to display. */ - public DateTimePickerDialog(String title, String message) { + public DateTimePickerDialog(final String title, final String message) { this(null, title, message); } @@ -68,7 +67,7 @@ public class DateTimePickerDialog extends JDialog { * @param title the title to display up top. * @param message the message to display. */ - public DateTimePickerDialog(Date date, String title, String message) { + public DateTimePickerDialog(final Date date, final String title, final String message) { // Create a modal dialog super((JDialog) null); setTitle(title); @@ -76,12 +75,12 @@ public class DateTimePickerDialog extends JDialog { setType(Type.NORMAL); setLayout(new GridBagLayout()); - setDefaultCloseOperation(JDialog.DISPOSE_ON_CLOSE); + setDefaultCloseOperation(WindowConstants.DISPOSE_ON_CLOSE); - JButton okButton = new JButton("OK"); + final JButton okButton = new JButton("OK"); okButton.addActionListener (new ActionListener() { @Override - public void actionPerformed(ActionEvent event) { + public void actionPerformed(final ActionEvent event) { selectedDateTime = findSelectedDateTime(); // Hide dialog @@ -91,11 +90,11 @@ public class DateTimePickerDialog extends JDialog { getRootPane().setDefaultButton(okButton); - JPanel dateTimePanel = buildDateTimePanel(date); + final JPanel dateTimePanel = buildDateTimePanel(date); label = new JLabel (message); label.setBorder(BorderFactory.createEtchedBorder()); - GridBagConstraints c = new GridBagConstraints(); + final GridBagConstraints c = new GridBagConstraints(); c.fill = GridBagConstraints.HORIZONTAL; c.insets = new Insets(5, 5, 5, 5); c.gridx = 0; @@ -112,21 +111,21 @@ public class DateTimePickerDialog extends JDialog { pack(); } - private JPanel buildDateTimePanel(Date date) { - JPanel datePanel = new JPanel(); + private JPanel buildDateTimePanel(final Date date) { + final JPanel datePanel = new JPanel(); dateChooser = new JCalendar(); if (date != null) { - Calendar calendar = Calendar.getInstance(); + final Calendar calendar = Calendar.getInstance(); calendar.setTime(date); dateChooser.setCalendar(calendar); } datePanel.add(dateChooser); - SpinnerModel model = new SpinnerDateModel(); + final SpinnerModel model = new SpinnerDateModel(); timeSpinner = new JSpinner(model); - DateEditor editor = new DateEditor(timeSpinner, "HH:mm:ss"); + final DateEditor editor = new DateEditor(timeSpinner, "HH:mm:ss"); timeSpinner.setEditor(editor); if (date != null) { timeSpinner.setValue(date); @@ -139,19 +138,19 @@ public class DateTimePickerDialog extends JDialog { private Date findSelectedDateTime() { // Get the values from the date chooser - int day = dateChooser.getDayChooser().getDay(); - int month = dateChooser.getMonthChooser().getMonth(); - int year = dateChooser.getYearChooser().getYear(); + final int day = dateChooser.getDayChooser().getDay(); + final int month = dateChooser.getMonthChooser().getMonth(); + final int year = dateChooser.getYearChooser().getYear(); // Get the values from the time chooser - Calendar timeCalendar = Calendar.getInstance(); + final Calendar timeCalendar = Calendar.getInstance(); timeCalendar.setTime((Date) timeSpinner.getValue()); - int hour = timeCalendar.get(Calendar.HOUR_OF_DAY); - int minute = timeCalendar.get(Calendar.MINUTE); - int second = timeCalendar.get(Calendar.SECOND); + final int hour = timeCalendar.get(Calendar.HOUR_OF_DAY); + final int minute = timeCalendar.get(Calendar.MINUTE); + final int second = timeCalendar.get(Calendar.SECOND); // Combine these values into a single date object - Calendar newCalendar = Calendar.getInstance(); + final Calendar newCalendar = Calendar.getInstance(); newCalendar.set(Calendar.YEAR, year); newCalendar.set(Calendar.MONTH, month); newCalendar.set(Calendar.DATE, day); @@ -159,7 +158,7 @@ public class DateTimePickerDialog extends JDialog { newCalendar.set(Calendar.MINUTE, minute); newCalendar.set(Calendar.SECOND, second); - Date newDate = newCalendar.getTime(); + final Date newDate = newCalendar.getTime(); return newDate; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1d33b435/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java index 1aa7fbe..f48eac4 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/AccumuloCopyToolMapper.java @@ -1,24 +1,22 @@ -package org.apache.rya.accumulo.mr.merge.mappers; - /* - * #%L - * org.apache.rya.accumulo.mr.merge - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.apache.rya.accumulo.mr.merge.mappers; import java.io.IOException; @@ -38,14 +36,14 @@ public class AccumuloCopyToolMapper extends BaseCopyToolMapper<Key, Value, Text, } @Override - protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { + protected void map(final Key key, final Value value, final Context context) throws IOException, InterruptedException { //log.trace("Mapping key: " + key + " = " + value); - Mutation mutation = makeAddMutation(key, value); + final Mutation mutation = makeAddMutation(key, value); context.write(childTableNameText, mutation); } - private static Mutation makeAddMutation(Key key, Value value) { - Mutation mutation = new Mutation(key.getRow().getBytes()); + private static Mutation makeAddMutation(final Key key, final Value value) { + final Mutation mutation = new Mutation(key.getRow().getBytes()); mutation.put(key.getColumnFamily(), key.getColumnQualifier(), key.getColumnVisibilityParsed(), key.getTimestamp(), value); return mutation; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1d33b435/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java index badd26c..19c2032 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/AccumuloRyaRuleMapper.java @@ -1,24 +1,22 @@ -package org.apache.rya.accumulo.mr.merge.mappers; - /* - * #%L - * org.apache.rya.accumulo.mr.merge - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.apache.rya.accumulo.mr.merge.mappers; import java.io.IOException; @@ -42,11 +40,11 @@ public class AccumuloRyaRuleMapper extends BaseRuleMapper<Text, Mutation> { * @throws IOException if the DAO encounters an error adding the statement to Accumulo */ @Override - protected void copyStatement(RyaStatement rstmt, Context context) throws IOException { + protected void copyStatement(final RyaStatement rstmt, final Context context) throws IOException { try { childDao.add(rstmt); } - catch (RyaDAOException e) { + catch (final RyaDAOException e) { throw new IOException("Error inserting statement into child Rya DAO", e); } } @@ -61,8 +59,8 @@ public class AccumuloRyaRuleMapper extends BaseRuleMapper<Text, Mutation> { * @throws IOException if the framework encounters an error writing the row to Accumulo */ @Override - protected void copyRow(Key key, Value value, Context context) throws IOException, InterruptedException { - Mutation mutation = new Mutation(key.getRow().getBytes()); + protected void copyRow(final Key key, final Value value, final Context context) throws IOException, InterruptedException { + final Mutation mutation = new Mutation(key.getRow().getBytes()); mutation.put(key.getColumnFamily(), key.getColumnQualifier(), key.getColumnVisibilityParsed(), key.getTimestamp(), value); context.write(childTableNameText, mutation); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1d33b435/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java index 6b4f082..96f0ce6 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java @@ -1,24 +1,22 @@ -package org.apache.rya.accumulo.mr.merge.mappers; - /* - * #%L - * org.apache.rya.accumulo.mr.merge - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.apache.rya.accumulo.mr.merge.mappers; import java.io.IOException; import java.net.URI; @@ -96,7 +94,7 @@ public class BaseCopyToolMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper } @Override - protected void setup(Context context) throws IOException, InterruptedException { + protected void setup(final Context context) throws IOException, InterruptedException { super.setup(context); log.info("Setting up mapper"); @@ -109,12 +107,12 @@ public class BaseCopyToolMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper startTime = MergeTool.convertStartTimeStringToDate(startTimeString); } - String runTimeString = parentConfig.get(CopyTool.COPY_RUN_TIME_PROP, null); + final String runTimeString = parentConfig.get(CopyTool.COPY_RUN_TIME_PROP, null); if (runTimeString != null) { runTime = MergeTool.convertStartTimeStringToDate(runTimeString); } - String offsetString = parentConfig.get(CopyTool.PARENT_TIME_OFFSET_PROP, null); + final String offsetString = parentConfig.get(CopyTool.PARENT_TIME_OFFSET_PROP, null); if (offsetString != null) { timeOffset = Long.valueOf(offsetString); } @@ -166,24 +164,24 @@ public class BaseCopyToolMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper if (useCopyFileOutput) { // The "mapreduce.job.cache.local.files" property contains a comma-separated // list of cached local file paths. - String cachedLocalFiles = parentConfig.get(MRJobConfig.CACHE_LOCALFILES); + final String cachedLocalFiles = parentConfig.get(MRJobConfig.CACHE_LOCALFILES); if (cachedLocalFiles != null) { - List<String> cachedLocalFilesList = Lists.newArrayList(Splitter.on(',').split(cachedLocalFiles)); - List<String> formattedCachedLocalFilesList = new ArrayList<>(); - for (String cachedLocalFile : cachedLocalFilesList) { + final List<String> cachedLocalFilesList = Lists.newArrayList(Splitter.on(',').split(cachedLocalFiles)); + final List<String> formattedCachedLocalFilesList = new ArrayList<>(); + for (final String cachedLocalFile : cachedLocalFilesList) { String pathToAdd = cachedLocalFile; if (cachedLocalFile.endsWith("splits.txt")) { URI uri = null; try { uri = new URI(cachedLocalFiles); pathToAdd = uri.getPath(); - } catch (URISyntaxException e) { + } catch (final URISyntaxException e) { log.error("Invalid syntax in local cache file path", e); } } formattedCachedLocalFilesList.add(pathToAdd); } - String formattedCachedLocalFiles = Joiner.on(',').join(formattedCachedLocalFilesList); + final String formattedCachedLocalFiles = Joiner.on(',').join(formattedCachedLocalFilesList); if (!cachedLocalFiles.equals(formattedCachedLocalFiles)) { parentConfig.set(MRJobConfig.CACHE_LOCALFILES, formattedCachedLocalFiles); } @@ -191,7 +189,7 @@ public class BaseCopyToolMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper } } - protected void addMetadataKeys(Context context) throws IOException { + protected void addMetadataKeys(final Context context) throws IOException { try { if (AccumuloRyaUtils.getCopyToolRunDate(childDao) == null) { log.info("Writing copy tool run time metadata to child table: " + runTime); @@ -206,7 +204,7 @@ public class BaseCopyToolMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper log.info("Writing copy tool time offset metadata to child table: " + timeOffset); AccumuloRyaUtils.setTimeOffset(timeOffset, childDao); } - } catch (RyaDAOException e) { + } catch (final RyaDAOException e) { throw new IOException("Failed to set time metadata key for table: " + childTableName, e); } } @@ -228,15 +226,15 @@ public class BaseCopyToolMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper protected void copyAuthorizations() throws IOException { try { - SecurityOperations parentSecOps = parentConnector.securityOperations(); - SecurityOperations childSecOps = childConnector.securityOperations(); + final SecurityOperations parentSecOps = parentConnector.securityOperations(); + final SecurityOperations childSecOps = childConnector.securityOperations(); - Authorizations parentAuths = parentSecOps.getUserAuthorizations(parentUser); - Authorizations childAuths = childSecOps.getUserAuthorizations(childUser); + final Authorizations parentAuths = parentSecOps.getUserAuthorizations(parentUser); + final Authorizations childAuths = childSecOps.getUserAuthorizations(childUser); // Add any parent authorizations that the child doesn't have. if (!childAuths.equals(parentAuths)) { log.info("Adding the authorization, \"" + parentAuths.toString() + "\", to the child user, \"" + childUser + "\""); - Authorizations newChildAuths = AccumuloRyaUtils.addUserAuths(childUser, childSecOps, parentAuths); + final Authorizations newChildAuths = AccumuloRyaUtils.addUserAuths(childUser, childSecOps, parentAuths); childSecOps.changeUserAuthorizations(childUser, newChildAuths); } } catch (AccumuloException | AccumuloSecurityException e) { @@ -245,14 +243,14 @@ public class BaseCopyToolMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper } @Override - protected void cleanup(Context context) throws IOException, InterruptedException { + protected void cleanup(final Context context) throws IOException, InterruptedException { super.cleanup(context); log.info("Cleaning up mapper..."); try { if (childDao != null) { childDao.destroy(); } - } catch (RyaDAOException e) { + } catch (final RyaDAOException e) { log.error("Error destroying child DAO", e); } log.info("Cleaned up mapper"); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1d33b435/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java index ab50215..aca2dbc 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java @@ -1,24 +1,22 @@ -package org.apache.rya.accumulo.mr.merge.mappers; - /* - * #%L - * org.apache.rya.accumulo.mr.merge - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.apache.rya.accumulo.mr.merge.mappers; import java.io.IOException; import java.util.List; @@ -61,23 +59,23 @@ public abstract class BaseRuleMapper<KEYOUT, VALUEOUT> extends BaseCopyToolMappe private static final Logger log = Logger.getLogger(BaseRuleMapper.class); - private TripleRowResolver resolver = new WholeRowTripleResolver(); + private final TripleRowResolver resolver = new WholeRowTripleResolver(); private TABLE_LAYOUT parentLayout = null; private ValueExpr condition; private ParallelEvaluationStrategyImpl strategy; private RangeInputSplit split; @Override - protected void setup(Context context) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); + protected void setup(final Context context) throws IOException, InterruptedException { + final Configuration conf = context.getConfiguration(); split = (RangeInputSplit) context.getInputSplit(); - Range range = split.getRange(); + final Range range = split.getRange(); // Determine the table and table layout we're scanning parentTableName = split.getTableName(); parentTablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY); - for (TABLE_LAYOUT layout : TABLE_LAYOUT.values()) { - String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, parentTablePrefix); + for (final TABLE_LAYOUT layout : TABLE_LAYOUT.values()) { + final String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, parentTablePrefix); if (tableName.equals(parentTableName)) { parentLayout = layout; } @@ -92,15 +90,15 @@ public abstract class BaseRuleMapper<KEYOUT, VALUEOUT> extends BaseCopyToolMappe AccumuloQueryRuleset ruleset; try { ruleset = new AccumuloQueryRuleset(new AccumuloRdfConfiguration(conf)); - } catch (QueryRulesetException e) { + } catch (final QueryRulesetException e) { throw new IOException("Error parsing the input query", e); } - List<CopyRule> rules = ruleset.getRules(parentLayout, range); + final List<CopyRule> rules = ruleset.getRules(parentLayout, range); - for (CopyRule rule : rules) { + for (final CopyRule rule : rules) { log.info("Mapper applies to rule:"); - for (String line : rule.toString().split("\n")) { + for (final String line : rule.toString().split("\n")) { log.info("\t" + line); } } @@ -108,7 +106,7 @@ public abstract class BaseRuleMapper<KEYOUT, VALUEOUT> extends BaseCopyToolMappe // Combine the rules' conditions so that if any of the individual conditions matches, the // composite condition will match as well. We know all the rules match all the statements // this input split will receive, so if any condition is true we'll want to copy the statement. - for (CopyRule rule : rules) { + for (final CopyRule rule : rules) { // Attach any relevant filter conditions given by this rule. // If there are no conditions, all statements read by this mapper should be accepted // (even if there are redundant rules with conditions) @@ -139,7 +137,7 @@ public abstract class BaseRuleMapper<KEYOUT, VALUEOUT> extends BaseCopyToolMappe } else { log.info("Condition:"); - for (String line : condition.toString().split("\n")) { + for (final String line : condition.toString().split("\n")) { log.info("\t" + line); } } @@ -151,8 +149,8 @@ public abstract class BaseRuleMapper<KEYOUT, VALUEOUT> extends BaseCopyToolMappe } @Override - protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { - TripleRow row = new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), + protected void map(final Key key, final Value value, final Context context) throws IOException, InterruptedException { + final TripleRow row = new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), key.getColumnQualifierData().toArray(), key.getTimestamp(), key.getColumnVisibilityData().toArray(), value == null ? null : value.get()); try { @@ -163,15 +161,15 @@ public abstract class BaseRuleMapper<KEYOUT, VALUEOUT> extends BaseCopyToolMappe } // If there is a layout, deserialize the statement and insert it if it meets the condition else { - RyaStatement rs = resolver.deserialize(parentLayout, row); + final RyaStatement rs = resolver.deserialize(parentLayout, row); if (condition == null || CopyRule.accept(RyaToRdfConversions.convertStatement(rs), condition, strategy)) { copyStatement(rs, context); context.getCounter(Counters.STATEMENTS_COPIED).increment(1); } } - } catch (TripleRowResolverException e) { + } catch (final TripleRowResolverException e) { throw new IOException("Error deserializing triple", e); - } catch (QueryEvaluationException e) { + } catch (final QueryEvaluationException e) { throw new IOException("Error evaluating the filter condition", e); } } @@ -202,12 +200,12 @@ public abstract class BaseRuleMapper<KEYOUT, VALUEOUT> extends BaseCopyToolMappe * @param max The maximum printed length of each individual portion * @return A human-readable representation of the Key */ - private static String keyToString(Key key, int max) { - StringBuilder sb = new StringBuilder(); - byte[] row = key.getRow().copyBytes(); - byte[] colFamily = key.getColumnFamily().copyBytes(); - byte[] colQualifier = key.getColumnQualifier().copyBytes(); - byte[] colVisibility = key.getColumnVisibility().copyBytes(); + private static String keyToString(final Key key, final int max) { + final StringBuilder sb = new StringBuilder(); + final byte[] row = key.getRow().copyBytes(); + final byte[] colFamily = key.getColumnFamily().copyBytes(); + final byte[] colQualifier = key.getColumnQualifier().copyBytes(); + final byte[] colVisibility = key.getColumnVisibility().copyBytes(); Key.appendPrintableString(row, 0, row.length, max, sb); sb.append(" "); Key.appendPrintableString(colFamily, 0, colFamily.length, max, sb); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/1d33b435/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java index 962138f..afb0585 100644 --- a/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java +++ b/extras/rya.merger/src/main/java/org/apache/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java @@ -1,24 +1,22 @@ -package org.apache.rya.accumulo.mr.merge.mappers; - /* - * #%L - * org.apache.rya.accumulo.mr.merge - * %% - * Copyright (C) 2014 Rya - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ +package org.apache.rya.accumulo.mr.merge.mappers; import java.io.IOException; import java.util.Map; @@ -47,23 +45,23 @@ public class FileCopyToolMapper extends BaseCopyToolMapper<Key, Value, Key, Valu } @Override - protected void addMetadataKeys(Context context) throws IOException { + protected void addMetadataKeys(final Context context) throws IOException { try { if (runTime != null) { log.info("Writing copy tool run time metadata to child table: " + runTime); - RyaStatement ryaStatement = AccumuloRyaUtils.createCopyToolRunTimeRyaStatement(runTime); + final RyaStatement ryaStatement = AccumuloRyaUtils.createCopyToolRunTimeRyaStatement(runTime); writeRyaStatement(ryaStatement, context); } if (startTime != null) { log.info("Writing copy split time metadata to child table: " + startTime); - RyaStatement ryaStatement = AccumuloRyaUtils.createCopyToolSplitTimeRyaStatement(startTime); + final RyaStatement ryaStatement = AccumuloRyaUtils.createCopyToolSplitTimeRyaStatement(startTime); writeRyaStatement(ryaStatement, context); } if (timeOffset != null) { log.info("Writing copy tool time offset metadata to child table: " + timeOffset); - RyaStatement ryaStatement = AccumuloRyaUtils.createTimeOffsetRyaStatement(timeOffset); + final RyaStatement ryaStatement = AccumuloRyaUtils.createTimeOffsetRyaStatement(timeOffset); writeRyaStatement(ryaStatement, context); } } catch (TripleRowResolverException | IOException | InterruptedException e) { @@ -71,11 +69,11 @@ public class FileCopyToolMapper extends BaseCopyToolMapper<Key, Value, Key, Valu } } - private void writeRyaStatement(RyaStatement ryaStatement, Context context) throws TripleRowResolverException, IOException, InterruptedException { - Map<TABLE_LAYOUT, TripleRow> serialize = childRyaContext.getTripleResolver().serialize(ryaStatement); - TripleRow tripleRow = serialize.get(TABLE_LAYOUT.SPO); - Key key = AccumuloRdfUtils.from(tripleRow); - Value value = AccumuloRdfUtils.extractValue(tripleRow); + private void writeRyaStatement(final RyaStatement ryaStatement, final Context context) throws TripleRowResolverException, IOException, InterruptedException { + final Map<TABLE_LAYOUT, TripleRow> serialize = childRyaContext.getTripleResolver().serialize(ryaStatement); + final TripleRow tripleRow = serialize.get(TABLE_LAYOUT.SPO); + final Key key = AccumuloRdfUtils.from(tripleRow); + final Value value = AccumuloRdfUtils.extractValue(tripleRow); context.write(key, value); } } \ No newline at end of file