kasakrisz closed pull request #4: AMBARI-24731 - Infra Manager: scheduled cleanup of old jobs URL: https://github.com/apache/ambari-infra/pull/4
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/.gitignore b/.gitignore index 9c77d11..2dcdfea 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ derby.log pass.txt out +job-repository.db \ No newline at end of file diff --git a/ambari-infra-manager-it/pom.xml b/ambari-infra-manager-it/pom.xml index 2fac6a9..68d4352 100644 --- a/ambari-infra-manager-it/pom.xml +++ b/ambari-infra-manager-it/pom.xml @@ -45,9 +45,9 @@ <version>${solr.version}</version> </dependency> <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - <version>1.11.5</version> + <groupId>io.minio</groupId> + <artifactId>minio</artifactId> + <version>5.0.1</version> </dependency> <dependency> <groupId>commons-io</groupId> diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java new file mode 100644 index 0000000..f0b592d --- /dev/null +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/S3Client.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; + +import io.minio.MinioClient; +import io.minio.Result; +import io.minio.messages.Item; + +public class S3Client { + private final MinioClient s3client; + private final String bucket; + + public S3Client(String host, int port, String bucket) { + try { + s3client = new MinioClient(String.format("http://%s:%d", host, port), "remote-identity", "remote-credential"); + this.bucket = bucket; + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public void putObject(String key, InputStream inputStream, long length) { + try { + s3client.putObject(bucket, key, inputStream, length, "application/octet-stream"); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + public void putObject(String key, byte[] bytes) { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream("anything".getBytes())) { + putObject(key, inputStream, bytes.length); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public List<String> listObjectKeys() { + try { + List<String> keys = new ArrayList<>(); + for (Result<Item> item : s3client.listObjects(bucket)) { + keys.add(item.get().objectName()); + } + return keys; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public List<String> listObjectKeys(String text) { + try { + List<String> keys = new ArrayList<>(); + for (Result<Item> item : s3client.listObjects(bucket)) { + String objectName = item.get().objectName(); + if (objectName.contains(text)) + keys.add(objectName); + } + return keys; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void deleteObject(String key) { + try { + s3client.removeObject(bucket, key); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/Solr.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/Solr.java index 1ffdb2a..0dcc91a 100644 --- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/Solr.java +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/Solr.java @@ -55,7 +55,7 @@ public Solr() { public Solr(String configSetPath) { this.configSetPath = configSetPath; - this.solrClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(String.format("http://%s:%d/solr/%s_shard1_replica1", + this.solrClient = new LBHttpSolrClient.Builder().withBaseSolrUrls(String.format("http://%s:%d/solr/%s_shard1_replica_n1", getDockerHost(), SOLR_PORT, AUDIT_LOGS_COLLECTION)).build(); diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java index f219ce5..da962b9 100644 --- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/AbstractInfraSteps.java @@ -29,8 +29,10 @@ import java.net.URL; import java.time.OffsetDateTime; import java.util.Date; +import java.util.List; import org.apache.ambari.infra.InfraClient; +import org.apache.ambari.infra.S3Client; import org.apache.ambari.infra.Solr; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; @@ -44,11 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; - public abstract class AbstractInfraSteps { private static final Logger LOG = LoggerFactory.getLogger(AbstractInfraSteps.class); @@ -59,7 +56,7 @@ private String ambariFolder; private String shellScriptLocation; private String dockerHost; - private AmazonS3Client s3client; + private S3Client s3client; private int documentId = 0; private Solr solr; @@ -71,7 +68,7 @@ public Solr getSolr() { return solr; } - public AmazonS3Client getS3client() { + public S3Client getS3client() { return s3client; } @@ -86,8 +83,11 @@ public void initDockerContainer() throws Exception { URL location = AbstractInfraSteps.class.getProtectionDomain().getCodeSource().getLocation(); ambariFolder = new File(location.toURI()).getParentFile().getParentFile().getParentFile().getParent(); - LOG.info("Clean local data folder {}", getLocalDataFolder()); - FileUtils.cleanDirectory(new File(getLocalDataFolder())); + String localDataFolder = getLocalDataFolder(); + if (new File(localDataFolder).exists()) { + LOG.info("Clean local data folder {}", localDataFolder); + FileUtils.cleanDirectory(new File(localDataFolder)); + } shellScriptLocation = ambariFolder + "/ambari-infra/ambari-infra-manager/docker/infra-manager-docker-compose.sh"; LOG.info("Create new docker container for testing Ambari Infra Manager ..."); @@ -102,9 +102,7 @@ public void initDockerContainer() throws Exception { solr.createSolrCollection(HADOOP_LOGS_COLLECTION); LOG.info("Initializing s3 client"); - s3client = new AmazonS3Client(new BasicAWSCredentials("remote-identity", "remote-credential")); - s3client.setEndpoint(String.format("http://%s:%d", dockerHost, FAKE_S3_PORT)); - s3client.createBucket(S3_BUCKET_NAME); + s3client = new S3Client(dockerHost, FAKE_S3_PORT, S3_BUCKET_NAME); checkInfraManagerReachable(); } @@ -155,10 +153,9 @@ protected void addDocument(OffsetDateTime logtime) { @AfterStories public void shutdownContainers() throws Exception { Thread.sleep(2000); // sync with s3 server - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - ObjectListing objectListing = getS3client().listObjects(listObjectsRequest); - LOG.info("Found {} files on s3.", objectListing.getObjectSummaries().size()); - objectListing.getObjectSummaries().forEach(s3ObjectSummary -> LOG.info("Found file on s3 with key {}", s3ObjectSummary.getKey())); + List<String> objectKeys = getS3client().listObjectKeys(); + LOG.info("Found {} files on s3.", objectKeys.size()); + objectKeys.forEach(objectKey -> LOG.info("Found file on s3 with key {}", objectKey)); LOG.info("Listing files on hdfs."); try (FileSystem fileSystem = getHdfs()) { diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java index d84c23f..b1d36d1 100644 --- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java +++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/steps/ExportJobsSteps.java @@ -23,11 +23,9 @@ import static org.apache.ambari.infra.TestUtil.doWithin; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.hasProperty; import static org.hamcrest.core.IsCollectionContaining.hasItem; import static org.junit.Assert.assertThat; -import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.UncheckedIOException; @@ -35,10 +33,12 @@ import java.time.OffsetDateTime; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.ambari.infra.InfraClient; import org.apache.ambari.infra.JobExecutionInfo; +import org.apache.ambari.infra.S3Client; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -50,11 +50,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.ObjectMetadata; - public class ExportJobsSteps extends AbstractInfraSteps { private static final Logger LOG = LoggerFactory.getLogger(ExportJobsSteps.class); @@ -80,9 +75,7 @@ public void addDocuments(long count, OffsetDateTime startLogtime, OffsetDateTime @Given("a file on s3 with key $key") public void addFileToS3(String key) throws Exception { - try (ByteArrayInputStream inputStream = new ByteArrayInputStream("anything".getBytes())) { - getS3client().putObject(S3_BUCKET_NAME, key, inputStream, new ObjectMetadata()); - } + getS3client().putObject(key, "anything".getBytes()); } @When("start $jobName job") @@ -113,10 +106,8 @@ public void restartJob(String jobName, int waitSec) { @When("stop job $jobName after at least $count file exists in s3 with filename containing text $text within $waitSec seconds") public void stopJob(String jobName, int count, String text, int waitSec) throws Exception { - AmazonS3Client s3Client = getS3client(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) - && fileCountOnS3(text, s3Client, listObjectsRequest) > count); + S3Client s3Client = getS3client(); + doWithin(waitSec, "check uploaded files to s3", () -> s3Client.listObjectKeys(text).size() > count); try (InfraClient httpClient = getInfraClient()) { httpClient.stopJob(launchedJobs.get(jobName).getExecutionId()); @@ -125,40 +116,29 @@ public void stopJob(String jobName, int count, String text, int waitSec) throws @When("delete file with key $key from s3") public void deleteFileFromS3(String key) { - getS3client().deleteObject(S3_BUCKET_NAME, key); + getS3client().deleteObject(key); } @Then("Check filenames contains the text $text on s3 server after $waitSec seconds") public void checkS3After(String text, int waitSec) { - AmazonS3Client s3Client = getS3client(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) - && !s3Client.listObjects(listObjectsRequest).getObjectSummaries().isEmpty()); + S3Client s3Client = getS3client(); + doWithin(waitSec, "check uploaded files to s3", () -> !s3Client.listObjectKeys().isEmpty()); - ObjectListing objectListing = s3Client.listObjects(listObjectsRequest); - assertThat(objectListing.getObjectSummaries(), hasItem(hasProperty("key", containsString(text)))); + List<String> objectKeys = s3Client.listObjectKeys(text); + assertThat(objectKeys, hasItem(containsString(text))); } @Then("Check $count files exists on s3 server with filenames containing the text $text after $waitSec seconds") public void checkNumberOfFilesOnS3(long count, String text, int waitSec) { - AmazonS3Client s3Client = getS3client(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) - && fileCountOnS3(text, s3Client, listObjectsRequest) == count); - } - - private long fileCountOnS3(String text, AmazonS3Client s3Client, ListObjectsRequest listObjectsRequest) { - return s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream() - .filter(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text)) - .count(); + S3Client s3Client = getS3client(); + doWithin(waitSec, "check uploaded files to s3", () -> s3Client.listObjectKeys(text).size() == count); } @Then("Less than $count files exists on s3 server with filenames containing the text $text after $waitSec seconds") public void checkLessThanFileExistsOnS3(long count, String text, int waitSec) { - AmazonS3Client s3Client = getS3client(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - doWithin(waitSec, "check uploaded files to s3", () -> s3Client.doesBucketExist(S3_BUCKET_NAME) && between( - fileCountOnS3(text, s3Client, listObjectsRequest), 1L, count - 1L)); + S3Client s3Client = getS3client(); + doWithin(waitSec, "check uploaded files to s3", () -> between( + s3Client.listObjectKeys(text).size(), 1L, count - 1L)); } private boolean between(long count, long from, long to) { @@ -167,10 +147,9 @@ private boolean between(long count, long from, long to) { @Then("No file exists on s3 server with filenames containing the text $text") public void fileNotExistOnS3(String text) { - AmazonS3Client s3Client = getS3client(); - ListObjectsRequest listObjectsRequest = new ListObjectsRequest().withBucketName(S3_BUCKET_NAME); - assertThat(s3Client.listObjects(listObjectsRequest).getObjectSummaries().stream() - .anyMatch(s3ObjectSummary -> s3ObjectSummary.getKey().contains(text)), is(false)); + S3Client s3Client = getS3client(); + assertThat(s3Client.listObjectKeys().stream() + .anyMatch(objectKey -> objectKey.contains(text)), is(false)); } @Then("solr contains $count documents between $startLogtime and $endLogtime") diff --git a/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story b/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story index 122a634..876019f 100644 --- a/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story +++ b/ambari-infra-manager-it/src/test/resources/stories/infra_api_tests.story @@ -59,9 +59,9 @@ And solr does not contain documents between 2014-02-04T05:00:00.000Z and 2014-02 Scenario: Launch Archiving job. Initiate stop and check that part of the data is archived. After restart all data must be extracted. -Given 200 documents in solr with logtime from 2014-03-09T05:00:00.000Z to 2014-03-09T20:00:00.000Z +Given 500 documents in solr with logtime from 2014-03-09T05:00:00.000Z to 2014-03-09T20:00:00.000Z When start archive_audit_logs job with parameters writeBlockSize=20,start=2014-03-09T05:00:00.000Z,end=2014-03-09T20:00:00.000Z after 2 seconds And stop job archive_audit_logs after at least 1 file exists in s3 with filename containing text solr_archive_audit_logs_-_2014-03-09 within 10 seconds -Then Less than 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds +Then Less than 20 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds When restart archive_audit_logs job within 10 seconds -Then Check 10 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds +Then Check 25 files exists on s3 server with filenames containing the text solr_archive_audit_logs_-_2014-03-09 after 20 seconds diff --git a/ambari-infra-manager/.gitignore b/ambari-infra-manager/.gitignore index 94b3829..4aece78 100644 --- a/ambari-infra-manager/.gitignore +++ b/ambari-infra-manager/.gitignore @@ -2,4 +2,5 @@ out/* *.pid Profile .env -test-out \ No newline at end of file +test-out +test.db \ No newline at end of file diff --git a/ambari-infra-manager/docker/docker-compose.yml b/ambari-infra-manager/docker/docker-compose.yml index c031cd7..3fa21b2 100644 --- a/ambari-infra-manager/docker/docker-compose.yml +++ b/ambari-infra-manager/docker/docker-compose.yml @@ -45,7 +45,7 @@ services: - "-z" - ${ZOOKEEPER_CONNECTION_STRING} volumes: - - $AMBARI_LOCATION/ambari-logsearch/ambari-logsearch-server/src/main/configsets:/opt/solr/configsets + - $AMBARI_INFRA_LOCATION/ambari-infra-manager/docker/configsets:/opt/solr/configsets fakes3: image: localstack/localstack hostname: fakes3 @@ -53,6 +53,7 @@ services: - "4569:4569" environment: - SERVICES=s3:4569 + - DEBUG=s3 networks: infra-network: aliases: @@ -96,8 +97,8 @@ services: ZK_CONNECT_STRING: ${ZOOKEEPER_CONNECTION_STRING} DISPLAY: $DOCKERIP:0 volumes: - - $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/target/package:/root/ambari-infra-manager - - $AMBARI_LOCATION/ambari-infra/ambari-infra-manager/docker/test-out:/root/archive + - $AMBARI_INFRA_LOCATION/ambari-infra-manager/target/package:/root/ambari-infra-manager + - $AMBARI_INFRA_LOCATION/ambari-infra-manager/docker/test-out:/root/archive networks: infra-network: driver: bridge diff --git a/ambari-infra-manager/docker/infra-manager-docker-compose.sh b/ambari-infra-manager/docker/infra-manager-docker-compose.sh index 531440d..0c18e6f 100755 --- a/ambari-infra-manager/docker/infra-manager-docker-compose.sh +++ b/ambari-infra-manager/docker/infra-manager-docker-compose.sh @@ -61,13 +61,13 @@ function check_env_file() { function setup_env() { pushd $sdir/../../ - local AMBARI_LOCATION=$(pwd) + local AMBARI_INFRA_LOCATION=$(pwd) popd local docker_ip=$(get_docker_ip) cat << EOF > $sdir/.env DOCKERIP=$docker_ip MAVEN_REPOSITORY_LOCATION=$HOME/.m2 -AMBARI_LOCATION=$AMBARI_LOCATION +AMBARI_INFRA_LOCATION=$AMBARI_INFRA_LOCATION ZOOKEEPER_VERSION=3.4.10 ZOOKEEPER_CONNECTION_STRING=zookeeper:2181 diff --git a/ambari-infra-manager/pom.xml b/ambari-infra-manager/pom.xml index d25440f..f28f59d 100644 --- a/ambari-infra-manager/pom.xml +++ b/ambari-infra-manager/pom.xml @@ -134,7 +134,6 @@ <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-all</artifactId> - <version>1.3</version> <scope>test</scope> </dependency> <!-- Spring dependencies --> @@ -444,11 +443,6 @@ <groupId>com.google.guava</groupId> <version>20.0</version> </dependency> - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - <version>1.11.5</version> - </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-csv</artifactId> @@ -460,6 +454,11 @@ <version>${spring-boot.version}</version> <scope>provided</scope> </dependency> + <dependency> + <groupId>io.minio</groupId> + <artifactId>minio</artifactId> + <version>5.0.1</version> + </dependency> </dependencies> </project> diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/DurationConverter.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/DurationConverter.java new file mode 100644 index 0000000..60915de --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/conf/DurationConverter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.conf; + +import static org.apache.ambari.infra.json.StringToDurationConverter.toDuration; + +import java.time.Duration; + +import javax.inject.Named; + +import org.springframework.boot.context.properties.ConfigurationPropertiesBinding; +import org.springframework.core.convert.converter.Converter; + +@Named +@ConfigurationPropertiesBinding +public class DurationConverter implements Converter<String, Duration> { + @Override + public Duration convert(String s) { + return toDuration(s); + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java index 02a6885..314e52e 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/AbstractJobsConfiguration.java @@ -18,6 +18,10 @@ */ package org.apache.ambari.infra.job; +import java.util.Map; + +import javax.annotation.PostConstruct; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.Job; @@ -27,18 +31,15 @@ import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; -import javax.annotation.PostConstruct; -import java.util.Map; - -public abstract class AbstractJobsConfiguration<T extends JobProperties<T>> { +public abstract class AbstractJobsConfiguration<TProperties extends JobProperties<TParameters>, TParameters extends Validatable> { private static final Logger LOG = LoggerFactory.getLogger(AbstractJobsConfiguration.class); - private final Map<String, T> propertyMap; + private final Map<String, TProperties> propertyMap; private final JobScheduler scheduler; private final JobBuilderFactory jobs; private final JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor; - protected AbstractJobsConfiguration(Map<String, T> propertyMap, JobScheduler scheduler, JobBuilderFactory jobs, JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) { + protected AbstractJobsConfiguration(Map<String, TProperties> propertyMap, JobScheduler scheduler, JobBuilderFactory jobs, JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor) { this.propertyMap = propertyMap; this.scheduler = scheduler; this.jobs = jobs; diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/InfraJobExecutionDao.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/InfraJobExecutionDao.java new file mode 100644 index 0000000..903639c --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/InfraJobExecutionDao.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job; + +import java.time.OffsetDateTime; +import java.util.Date; + +import javax.inject.Inject; + +import org.springframework.batch.core.repository.dao.AbstractJdbcBatchMetadataDao; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.support.TransactionTemplate; + +@Repository +public class InfraJobExecutionDao extends AbstractJdbcBatchMetadataDao { + + private final TransactionTemplate transactionTemplate; + + @Inject + public InfraJobExecutionDao(JdbcTemplate jdbcTemplate, TransactionTemplate transactionTemplate) { + setJdbcTemplate(jdbcTemplate); + this.transactionTemplate = transactionTemplate; + } + + public void deleteJobExecutions(OffsetDateTime olderThan) { + transactionTemplate.execute(transactionStatus -> { + Date olderThanDate = Date.from(olderThan.toInstant()); + deleteStepExecutionContexts(olderThanDate); + deleteStepExecutions(olderThanDate); + deleteJobExecutionParams(olderThanDate); + deleteJobExecutionContexts(olderThanDate); + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%JOB_EXECUTION WHERE CREATE_TIME < ?"), olderThanDate); + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%JOB_INSTANCE WHERE JOB_INSTANCE_ID NOT IN (SELECT JOB_INSTANCE_ID FROM %PREFIX%JOB_EXECUTION)")); + return null; + }); + } + + private void deleteStepExecutionContexts(Date olderThan) { + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%STEP_EXECUTION_CONTEXT WHERE STEP_EXECUTION_ID IN (SELECT STEP_EXECUTION_ID FROM %PREFIX%STEP_EXECUTION WHERE JOB_EXECUTION_ID IN (SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION WHERE CREATE_TIME < ?))"), + olderThan); + } + + private void deleteStepExecutions(Date olderThan) { + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%STEP_EXECUTION WHERE JOB_EXECUTION_ID IN (SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION WHERE CREATE_TIME < ?)"), + olderThan); + } + + private void deleteJobExecutionParams(Date olderThan) { + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%JOB_EXECUTION_PARAMS WHERE JOB_EXECUTION_ID IN (SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION WHERE CREATE_TIME < ?)"), + olderThan); + } + + private void deleteJobExecutionContexts(Date olderThan) { + getJdbcTemplate().update(getQuery("DELETE FROM %PREFIX%JOB_EXECUTION_CONTEXT WHERE JOB_EXECUTION_ID IN (SELECT JOB_EXECUTION_ID FROM %PREFIX%JOB_EXECUTION WHERE CREATE_TIME < ?)"), + olderThan); + } + +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java index 79406d0..7be152f 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobProperties.java @@ -18,23 +18,15 @@ */ package org.apache.ambari.infra.job; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.springframework.batch.core.JobParameters; - -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.Optional; -public abstract class JobProperties<T extends JobProperties<T>> { +import org.springframework.batch.core.JobParameters; + +public abstract class JobProperties<TParameters extends Validatable> { private SchedulingProperties scheduling; - private final Class<T> clazz; private boolean enabled; - protected JobProperties(Class<T> clazz) { - this.clazz = clazz; - } - public SchedulingProperties getScheduling() { return scheduling; } @@ -49,23 +41,11 @@ public void setScheduling(SchedulingProperties scheduling) { this.scheduling = scheduling; } - public T deepCopy() { - try { - ObjectMapper objectMapper = new ObjectMapper(); - String json = objectMapper.writeValueAsString(this); - return objectMapper.readValue(json, clazz); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - - public abstract void apply(JobParameters jobParameters); - - public abstract void validate(); + public abstract TParameters merge(JobParameters jobParameters); public void validate(String jobName) { try { - validate(); + merge(new JobParameters()).validate(); } catch (Exception ex) { throw new JobConfigurationException(String.format("Configuration of job %s is invalid: %s!", jobName, ex.getMessage()), ex); diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertiesHolder.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertiesHolder.java new file mode 100644 index 0000000..67cdafa --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobPropertiesHolder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job; + +import static org.apache.ambari.infra.job.JobsPropertyMap.PARAMETERS_CONTEXT_KEY; + +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobExecutionListener; + +public class JobPropertiesHolder<T extends Validatable> + implements JobExecutionListener { + + private final JobProperties<T> defaultProperties; + + public JobPropertiesHolder(JobProperties<T> defaultProperties) { + this.defaultProperties = defaultProperties; + } + + @Override + public void beforeJob(JobExecution jobExecution) { + try { + T parameters = defaultProperties.merge(jobExecution.getJobParameters()); + parameters.validate(); + jobExecution.getExecutionContext().put(PARAMETERS_CONTEXT_KEY, parameters); + } + catch (UnsupportedOperationException | IllegalArgumentException ex) { + jobExecution.stop(); + jobExecution.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), ex.getMessage())); + throw ex; + } + } + + @Override + public void afterJob(JobExecution jobExecution) { + + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java index 324c0b3..8729c4e 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobScheduler.java @@ -18,6 +18,11 @@ */ package org.apache.ambari.infra.job; +import java.util.Date; + +import javax.inject.Inject; +import javax.inject.Named; + import org.apache.ambari.infra.manager.Jobs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,14 +38,6 @@ import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.support.CronTrigger; -import javax.inject.Inject; -import javax.inject.Named; -import java.time.Duration; -import java.time.OffsetDateTime; - -import static org.apache.ambari.infra.job.archive.FileNameSuffixFormatter.SOLR_DATETIME_FORMATTER; -import static org.apache.commons.lang.StringUtils.isBlank; - @Named public class JobScheduler { private static final Logger LOG = LoggerFactory.getLogger(JobScheduler.class); @@ -61,7 +58,7 @@ public void schedule(String jobName, SchedulingProperties schedulingProperties) throw new RuntimeException(e); } - scheduler.schedule(() -> launchJob(jobName, schedulingProperties.getIntervalEndDelta()), new CronTrigger(schedulingProperties.getCron())); + scheduler.schedule(() -> launchJob(jobName), new CronTrigger(schedulingProperties.getCron())); LOG.info("Job {} scheduled for running. Cron: {}", jobName, schedulingProperties.getCron()); } @@ -75,12 +72,10 @@ private void restartIfFailed(JobExecution jobExecution) { } } - private void launchJob(String jobName, String endDelta) { + private void launchJob(String jobName) { try { JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); - if (!isBlank(endDelta)) - jobParametersBuilder.addString("end", SOLR_DATETIME_FORMATTER.format(OffsetDateTime.now().minus(Duration.parse(endDelta)))); - + jobParametersBuilder.addDate("scheduledLaunchAt", new Date()); jobs.launchJob(jobName, jobParametersBuilder.toJobParameters()); } catch (JobParametersInvalidException | NoSuchJobException | JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException e) { throw new RuntimeException(e); diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java index 094e797..0eb5908 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/JobsPropertyMap.java @@ -18,38 +18,29 @@ */ package org.apache.ambari.infra.job; -import org.springframework.batch.core.ExitStatus; +import java.util.Map; + import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionListener; -import java.util.Map; +public class JobsPropertyMap<TProperties extends JobProperties<TParameters>, TParameters extends Validatable> + implements JobExecutionListener { -public class JobsPropertyMap<T extends JobProperties<T>> implements JobExecutionListener { + public static final String PARAMETERS_CONTEXT_KEY = "jobParameters"; + private final Map<String, TProperties> propertyMap; - private final Map<String, T> propertyMap; - - public JobsPropertyMap(Map<String, T> propertyMap) { + public JobsPropertyMap(Map<String, TProperties> propertyMap) { this.propertyMap = propertyMap; } @Override public void beforeJob(JobExecution jobExecution) { - try { - String jobName = jobExecution.getJobInstance().getJobName(); - T defaultProperties = propertyMap.get(jobName); - if (defaultProperties == null) - throw new UnsupportedOperationException("Properties not found for job " + jobName); - - T properties = defaultProperties.deepCopy(); - properties.apply(jobExecution.getJobParameters()); - properties.validate(jobName); - jobExecution.getExecutionContext().put("jobProperties", properties); - } - catch (UnsupportedOperationException | IllegalArgumentException ex) { - jobExecution.stop(); - jobExecution.setExitStatus(new ExitStatus(ExitStatus.FAILED.getExitCode(), ex.getMessage())); - throw ex; - } + String jobName = jobExecution.getJobInstance().getJobName(); + TProperties defaultProperties = propertyMap.get(jobName); + if (defaultProperties == null) + throw new UnsupportedOperationException("Properties not found for job " + jobName); + + new JobPropertiesHolder<>(defaultProperties).beforeJob(jobExecution); } @Override diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java index af81b4f..2f18c55 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/SchedulingProperties.java @@ -21,7 +21,6 @@ public class SchedulingProperties { private boolean enabled = false; private String cron; - private String intervalEndDelta; public boolean isEnabled() { return enabled; @@ -38,12 +37,4 @@ public String getCron() { public void setCron(String cron) { this.cron = cron; } - - public String getIntervalEndDelta() { - return intervalEndDelta; - } - - public void setIntervalEndDelta(String intervalEndDelta) { - this.intervalEndDelta = intervalEndDelta; - } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/Validatable.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/Validatable.java new file mode 100644 index 0000000..5c04406 --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/Validatable.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job; + +public interface Validatable { + void validate(); +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java new file mode 100644 index 0000000..eea87d2 --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import static java.util.Objects.requireNonNull; +import static org.apache.ambari.infra.job.archive.ExportDestination.HDFS; +import static org.apache.ambari.infra.job.archive.ExportDestination.LOCAL; +import static org.apache.ambari.infra.job.archive.ExportDestination.S3; +import static org.apache.commons.lang.StringUtils.isBlank; + +import java.time.Duration; +import java.util.Optional; + +import org.apache.ambari.infra.job.Validatable; +import org.apache.ambari.infra.json.DurationToStringConverter; +import org.apache.ambari.infra.json.StringToDurationConverter; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +public class ArchivingParameters implements Validatable { + private int readBlockSize; + private int writeBlockSize; + private ExportDestination destination; + private String localDestinationDirectory; + private String fileNameSuffixColumn; + private String fileNameSuffixDateFormat; + private SolrParameters solr; + private String s3AccessFile; + private String s3KeyPrefix; + private String s3BucketName; + private String s3Endpoint; + private String hdfsEndpoint; + private String hdfsDestinationDirectory; + private String start; + private String end; + @JsonSerialize(converter = DurationToStringConverter.class) + @JsonDeserialize(converter = StringToDurationConverter.class) + private Duration ttl; + + public int getReadBlockSize() { + return readBlockSize; + } + + public void setReadBlockSize(int readBlockSize) { + this.readBlockSize = readBlockSize; + } + + public int getWriteBlockSize() { + return writeBlockSize; + } + + public void setWriteBlockSize(int writeBlockSize) { + this.writeBlockSize = writeBlockSize; + } + + public ExportDestination getDestination() { + return destination; + } + + public void setDestination(ExportDestination destination) { + this.destination = destination; + } + + public String getLocalDestinationDirectory() { + return localDestinationDirectory; + } + + public void setLocalDestinationDirectory(String localDestinationDirectory) { + this.localDestinationDirectory = localDestinationDirectory; + } + + public String getFileNameSuffixColumn() { + return fileNameSuffixColumn; + } + + public void setFileNameSuffixColumn(String fileNameSuffixColumn) { + this.fileNameSuffixColumn = fileNameSuffixColumn; + } + + public String getFileNameSuffixDateFormat() { + return fileNameSuffixDateFormat; + } + + public void setFileNameSuffixDateFormat(String fileNameSuffixDateFormat) { + this.fileNameSuffixDateFormat = fileNameSuffixDateFormat; + } + + public SolrParameters getSolr() { + return solr; + } + + public void setSolr(SolrParameters solr) { + this.solr = solr; + } + + public String getS3AccessFile() { + return s3AccessFile; + } + + public void setS3AccessFile(String s3AccessFile) { + this.s3AccessFile = s3AccessFile; + } + + public String getS3KeyPrefix() { + return s3KeyPrefix; + } + + public void setS3KeyPrefix(String s3KeyPrefix) { + this.s3KeyPrefix = s3KeyPrefix; + } + + public String getS3BucketName() { + return s3BucketName; + } + + public void setS3BucketName(String s3BucketName) { + this.s3BucketName = s3BucketName; + } + + public String getS3Endpoint() { + return s3Endpoint; + } + + public void setS3Endpoint(String s3Endpoint) { + this.s3Endpoint = s3Endpoint; + } + + public String getHdfsEndpoint() { + return hdfsEndpoint; + } + + public void setHdfsEndpoint(String hdfsEndpoint) { + this.hdfsEndpoint = hdfsEndpoint; + } + + public String getHdfsDestinationDirectory() { + return hdfsDestinationDirectory; + } + + public void setHdfsDestinationDirectory(String hdfsDestinationDirectory) { + this.hdfsDestinationDirectory = hdfsDestinationDirectory; + } + + public Optional<S3Properties> s3Properties() { + if (isBlank(s3BucketName)) + return Optional.empty(); + + return Optional.of(new S3Properties( + s3AccessFile, + s3KeyPrefix, + s3BucketName, + s3Endpoint)); + } + + public String getStart() { + return start; + } + + public void setStart(String start) { + this.start = start; + } + + public String getEnd() { + return end; + } + + public void setEnd(String end) { + this.end = end; + } + + public Duration getTtl() { + return ttl; + } + + public void setTtl(Duration ttl) { + this.ttl = ttl; + } + + @Override + public void validate() { + if (readBlockSize <= 0) + throw new IllegalArgumentException("The property readBlockSize must be greater than 0!"); + + if (writeBlockSize <= 0) + throw new IllegalArgumentException("The property writeBlockSize must be greater than 0!"); + + if (isBlank(fileNameSuffixColumn)) { + throw new IllegalArgumentException("The property fileNameSuffixColumn can not be null or empty string!"); + } + + requireNonNull(destination, "The property destination can not be null!"); + switch (destination) { + case LOCAL: + if (isBlank(localDestinationDirectory)) + throw new IllegalArgumentException(String.format( + "The property localDestinationDirectory can not be null or empty string when destination is set to %s!", LOCAL.name())); + break; + + case S3: + s3Properties() + .orElseThrow(() -> new IllegalArgumentException("S3 related properties must be set if the destination is " + S3.name())) + .validate(); + break; + + case HDFS: + if (isBlank(hdfsEndpoint)) + throw new IllegalArgumentException(String.format( + "The property hdfsEndpoint can not be null or empty string when destination is set to %s!", HDFS.name())); + if (isBlank(hdfsDestinationDirectory)) + throw new IllegalArgumentException(String.format( + "The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name())); + } + + requireNonNull(solr, "No solr query was specified for archiving job!"); + solr.validate(); + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java index 8358dd0..eac31af 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java @@ -18,6 +18,14 @@ */ package org.apache.ambari.infra.job.archive; +import static org.apache.ambari.infra.job.JobsPropertyMap.PARAMETERS_CONTEXT_KEY; +import static org.apache.ambari.infra.job.archive.SolrQueryBuilder.computeEnd; +import static org.apache.commons.lang.StringUtils.isBlank; + +import java.io.File; + +import javax.inject.Inject; + import org.apache.ambari.infra.conf.InfraManagerDataConfig; import org.apache.ambari.infra.conf.security.PasswordStore; import org.apache.ambari.infra.job.AbstractJobsConfiguration; @@ -40,13 +48,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.inject.Inject; -import java.io.File; - -import static org.apache.commons.lang.StringUtils.isBlank; - @Configuration -public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<DocumentArchivingProperties> { +public class DocumentArchivingConfiguration extends AbstractJobsConfiguration<DocumentArchivingProperties, ArchivingParameters> { private static final Logger LOG = LoggerFactory.getLogger(DocumentArchivingConfiguration.class); private static final DocumentWiper NOT_DELETE = (firstDocument, lastDocument) -> { }; @@ -83,7 +86,7 @@ public Step exportStep(DocumentExporter documentExporter) { @StepScope public DocumentExporter documentExporter(DocumentItemReader documentItemReader, @Value("#{stepExecution.jobExecution.jobId}") String jobId, - @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties, + @Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingParameters parameters, InfraManagerDataConfig infraManagerDataConfig, @Value("#{jobParameters[end]}") String intervalEnd, DocumentWiper documentWiper, @@ -92,28 +95,28 @@ public DocumentExporter documentExporter(DocumentItemReader documentItemReader, File baseDir = new File(infraManagerDataConfig.getDataFolder(), "exporting"); CompositeFileAction fileAction = new CompositeFileAction(new TarGzCompressor()); - switch (properties.getDestination()) { + switch (parameters.getDestination()) { case S3: fileAction.add(new S3Uploader( - properties.s3Properties().orElseThrow(() -> new IllegalStateException("S3 properties are not provided!")), + parameters.s3Properties().orElseThrow(() -> new IllegalStateException("S3 properties are not provided!")), passwordStore)); break; case HDFS: org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - conf.set("fs.defaultFS", properties.getHdfsEndpoint()); - fileAction.add(new HdfsUploader(conf, new Path(properties.getHdfsDestinationDirectory()))); + conf.set("fs.defaultFS", parameters.getHdfsEndpoint()); + fileAction.add(new HdfsUploader(conf, new Path(parameters.getHdfsDestinationDirectory()))); break; case LOCAL: - baseDir = new File(properties.getLocalDestinationDirectory()); + baseDir = new File(parameters.getLocalDestinationDirectory()); break; } - FileNameSuffixFormatter fileNameSuffixFormatter = FileNameSuffixFormatter.from(properties); + FileNameSuffixFormatter fileNameSuffixFormatter = FileNameSuffixFormatter.from(parameters); LocalItemWriterListener itemWriterListener = new LocalItemWriterListener(fileAction, documentWiper); File destinationDirectory = new File( baseDir, String.format("%s_%s_%s", - properties.getSolr().getCollection(), + parameters.getSolr().getCollection(), jobId, isBlank(intervalEnd) ? "" : fileNameSuffixFormatter.format(intervalEnd))); LOG.info("Destination directory path={}", destinationDirectory); @@ -126,23 +129,23 @@ public DocumentExporter documentExporter(DocumentItemReader documentItemReader, return new DocumentExporter( documentItemReader, firstDocument -> new LocalDocumentItemWriter( - outFile(properties.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener), - properties.getWriteBlockSize(), jobContextRepository); + outFile(parameters.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener), + parameters.getWriteBlockSize(), jobContextRepository); } @Bean @StepScope - public DocumentWiper documentWiper(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties, + public DocumentWiper documentWiper(@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingParameters parameters, SolrDAO solrDAO) { - if (isBlank(properties.getSolr().getDeleteQueryText())) + if (isBlank(parameters.getSolr().getDeleteQueryText())) return NOT_DELETE; return solrDAO; } @Bean @StepScope - public SolrDAO solrDAO(@Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties) { - return new SolrDAO(properties.getSolr()); + public SolrDAO solrDAO(@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingParameters parameters) { + return new SolrDAO(parameters.getSolr()); } private File outFile(String collection, File directoryPath, String suffix) { @@ -154,16 +157,15 @@ private File outFile(String collection, File directoryPath, String suffix) { @Bean @StepScope public DocumentItemReader reader(ObjectSource<Document> documentSource, - @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentArchivingProperties properties) { + @Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingParameters properties) { return new DocumentItemReader(documentSource, properties.getReadBlockSize()); } @Bean @StepScope - public ObjectSource<Document> logSource(@Value("#{jobParameters[start]}") String start, - @Value("#{jobParameters[end]}") String end, + public ObjectSource<Document> logSource(@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingParameters parameters, SolrDAO solrDAO) { - return new SolrDocumentSource(solrDAO, start, end); + return new SolrDocumentSource(solrDAO, parameters.getStart(), computeEnd(parameters.getEnd(), parameters.getTtl())); } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java index b26da36..dea8acb 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java @@ -18,24 +18,24 @@ */ package org.apache.ambari.infra.job.archive; -import org.apache.ambari.infra.job.JobProperties; -import org.springframework.batch.core.JobParameters; +import static org.apache.ambari.infra.json.StringToDurationConverter.toDuration; +import static org.apache.commons.lang.StringUtils.isBlank; +import java.time.Duration; import java.util.Optional; -import static java.util.Objects.requireNonNull; -import static org.apache.ambari.infra.job.archive.ExportDestination.HDFS; -import static org.apache.ambari.infra.job.archive.ExportDestination.LOCAL; -import static org.apache.ambari.infra.job.archive.ExportDestination.S3; -import static org.apache.commons.lang.StringUtils.isBlank; +import org.apache.ambari.infra.job.JobProperties; +import org.apache.ambari.infra.json.DurationToStringConverter; +import org.springframework.batch.core.JobParameters; -public class DocumentArchivingProperties extends JobProperties<DocumentArchivingProperties> { +public class DocumentArchivingProperties extends JobProperties<ArchivingParameters> { private int readBlockSize; private int writeBlockSize; private ExportDestination destination; private String localDestinationDirectory; private String fileNameSuffixColumn; private String fileNameSuffixDateFormat; + private Duration ttl; private SolrProperties solr; private String s3AccessFile; private String s3KeyPrefix; @@ -45,10 +45,6 @@ private String hdfsEndpoint; private String hdfsDestinationDirectory; - public DocumentArchivingProperties() { - super(DocumentArchivingProperties.class); - } - public int getReadBlockSize() { return readBlockSize; } @@ -97,6 +93,14 @@ public void setFileNameSuffixDateFormat(String fileNameSuffixDateFormat) { this.fileNameSuffixDateFormat = fileNameSuffixDateFormat; } + public Duration getTtl() { + return ttl; + } + + public void setTtl(Duration ttl) { + this.ttl = ttl; + } + public SolrProperties getSolr() { return solr; } @@ -164,21 +168,6 @@ public void setHdfsDestinationDirectory(String hdfsDestinationDirectory) { this.hdfsDestinationDirectory = hdfsDestinationDirectory; } - @Override - public void apply(JobParameters jobParameters) { - readBlockSize = getIntJobParameter(jobParameters, "readBlockSize", readBlockSize); - writeBlockSize = getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize); - destination = ExportDestination.valueOf(jobParameters.getString("destination", destination.name())); - localDestinationDirectory = jobParameters.getString("localDestinationDirectory", localDestinationDirectory); - s3AccessFile = jobParameters.getString("s3AccessFile", s3AccessFile); - s3BucketName = jobParameters.getString("s3BucketName", s3BucketName); - s3KeyPrefix = jobParameters.getString("s3KeyPrefix", s3KeyPrefix); - s3Endpoint = jobParameters.getString("s3Endpoint", s3Endpoint); - hdfsEndpoint = jobParameters.getString("hdfsEndpoint", hdfsEndpoint); - hdfsDestinationDirectory = jobParameters.getString("hdfsDestinationDirectory", hdfsDestinationDirectory); - solr.apply(jobParameters); - } - private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) { String valueText = jobParameters.getString(parameterName); if (isBlank(valueText)) @@ -187,41 +176,24 @@ private int getIntJobParameter(JobParameters jobParameters, String parameterName } @Override - public void validate() { - if (readBlockSize == 0) - throw new IllegalArgumentException("The property readBlockSize must be greater than 0!"); - - if (writeBlockSize == 0) - throw new IllegalArgumentException("The property writeBlockSize must be greater than 0!"); - - if (isBlank(fileNameSuffixColumn)) { - throw new IllegalArgumentException("The property fileNameSuffixColumn can not be null or empty string!"); - } - - requireNonNull(destination, "The property destination can not be null!"); - switch (destination) { - case LOCAL: - if (isBlank(localDestinationDirectory)) - throw new IllegalArgumentException(String.format( - "The property localDestinationDirectory can not be null or empty string when destination is set to %s!", LOCAL.name())); - break; - - case S3: - s3Properties() - .orElseThrow(() -> new IllegalArgumentException("S3 related properties must be set if the destination is " + S3.name())) - .validate(); - break; - - case HDFS: - if (isBlank(hdfsEndpoint)) - throw new IllegalArgumentException(String.format( - "The property hdfsEndpoint can not be null or empty string when destination is set to %s!", HDFS.name())); - if (isBlank(hdfsDestinationDirectory)) - throw new IllegalArgumentException(String.format( - "The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name())); - } - - requireNonNull(solr, "No solr query was specified for archiving job!"); - solr.validate(); + public ArchivingParameters merge(JobParameters jobParameters) { + ArchivingParameters archivingParameters = new ArchivingParameters(); + archivingParameters.setReadBlockSize(getIntJobParameter(jobParameters, "readBlockSize", readBlockSize)); + archivingParameters.setWriteBlockSize(getIntJobParameter(jobParameters, "writeBlockSize", writeBlockSize)); + archivingParameters.setDestination(ExportDestination.valueOf(jobParameters.getString("destination", destination.name()))); + archivingParameters.setLocalDestinationDirectory(jobParameters.getString("localDestinationDirectory", localDestinationDirectory)); + archivingParameters.setFileNameSuffixColumn(jobParameters.getString("fileNameSuffixColumn", fileNameSuffixColumn)); + archivingParameters.setFileNameSuffixDateFormat(jobParameters.getString("fileNameSuffixDateFormat", fileNameSuffixDateFormat)); + archivingParameters.setS3AccessFile(jobParameters.getString("s3AccessFile", s3AccessFile)); + archivingParameters.setS3BucketName(jobParameters.getString("s3BucketName", s3BucketName)); + archivingParameters.setS3KeyPrefix(jobParameters.getString("s3KeyPrefix", s3KeyPrefix)); + archivingParameters.setS3Endpoint(jobParameters.getString("s3Endpoint", s3Endpoint)); + archivingParameters.setHdfsEndpoint(jobParameters.getString("hdfsEndpoint", hdfsEndpoint)); + archivingParameters.setHdfsDestinationDirectory(jobParameters.getString("hdfsDestinationDirectory", hdfsDestinationDirectory)); + archivingParameters.setSolr(solr.merge(jobParameters)); + archivingParameters.setStart(jobParameters.getString("start")); + archivingParameters.setEnd(jobParameters.getString("end")); + archivingParameters.setTtl(toDuration(jobParameters.getString("ttl", DurationToStringConverter.toString(ttl)))); + return archivingParameters; } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java index f9016e6..0c879bd 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/FileNameSuffixFormatter.java @@ -18,17 +18,17 @@ */ package org.apache.ambari.infra.job.archive; -import java.time.OffsetDateTime; -import java.time.format.DateTimeFormatter; - import static java.util.Objects.requireNonNull; import static org.apache.ambari.infra.job.archive.SolrDocumentIterator.SOLR_DATE_FORMAT_TEXT; import static org.apache.commons.lang.StringUtils.isBlank; +import java.time.OffsetDateTime; +import java.time.format.DateTimeFormatter; + public class FileNameSuffixFormatter { public static final DateTimeFormatter SOLR_DATETIME_FORMATTER = DateTimeFormatter.ofPattern(SOLR_DATE_FORMAT_TEXT); - public static FileNameSuffixFormatter from(DocumentArchivingProperties properties) { + public static FileNameSuffixFormatter from(ArchivingParameters properties) { return new FileNameSuffixFormatter(properties.getFileNameSuffixColumn(), properties.getFileNameSuffixDateFormat()); } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java index 2536cb5..76aa734 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/S3Uploader.java @@ -1,16 +1,30 @@ package org.apache.ambari.infra.job.archive; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.services.s3.AmazonS3Client; +import static org.apache.commons.lang.StringUtils.isNotBlank; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; + import org.apache.ambari.infra.conf.security.CompositePasswordStore; import org.apache.ambari.infra.conf.security.PasswordStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.xmlpull.v1.XmlPullParserException; -import java.io.File; - -import static org.apache.commons.lang.StringUtils.isBlank; -import static org.apache.commons.lang.StringUtils.isNotBlank; +import io.minio.MinioClient; +import io.minio.errors.ErrorResponseException; +import io.minio.errors.InsufficientDataException; +import io.minio.errors.InternalException; +import io.minio.errors.InvalidArgumentException; +import io.minio.errors.InvalidBucketNameException; +import io.minio.errors.InvalidEndpointException; +import io.minio.errors.InvalidPortException; +import io.minio.errors.NoResponseException; +import io.minio.errors.RegionConflictException; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -34,7 +48,7 @@ private static final Logger LOG = LoggerFactory.getLogger(S3Uploader.class); - private final AmazonS3Client client; + private final MinioClient client; private final String keyPrefix; private final String bucketName; @@ -48,27 +62,39 @@ public S3Uploader(S3Properties s3Properties, PasswordStore passwordStore) { if (isNotBlank((s3Properties.getS3AccessFile()))) compositePasswordStore = new CompositePasswordStore(passwordStore, S3AccessCsv.file(s3Properties.getS3AccessFile())); - BasicAWSCredentials credentials = new BasicAWSCredentials( - compositePasswordStore.getPassword(S3AccessKeyNames.AccessKeyId.getEnvVariableName()) - .orElseThrow(() -> new IllegalArgumentException("Access key Id is not present!")), - compositePasswordStore.getPassword(S3AccessKeyNames.SecretAccessKey.getEnvVariableName()) - .orElseThrow(() -> new IllegalArgumentException("Secret Access Key is not present!"))); - client = new AmazonS3Client(credentials); - if (!isBlank(s3Properties.getS3EndPoint())) - client.setEndpoint(s3Properties.getS3EndPoint()); -// Note: without pathStyleAccess=true endpoint going to be <bucketName>.<host>:<port> -// client.setS3ClientOptions(S3ClientOptions.builder().setPathStyleAccess(true).build()); + try { + client = new MinioClient(s3Properties.getS3EndPoint(), compositePasswordStore.getPassword(S3AccessKeyNames.AccessKeyId.getEnvVariableName()) + .orElseThrow(() -> new IllegalArgumentException("Access key Id is not present!")), + compositePasswordStore.getPassword(S3AccessKeyNames.SecretAccessKey.getEnvVariableName()) + .orElseThrow(() -> new IllegalArgumentException("Secret Access Key is not present!"))); + + if (!client.bucketExists(bucketName)) + client.makeBucket(bucketName); + + } catch (RegionConflictException | XmlPullParserException | InvalidBucketNameException | NoSuchAlgorithmException | InsufficientDataException | ErrorResponseException | InvalidKeyException | NoResponseException | InvalidPortException | InvalidEndpointException | InternalException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } @Override public File onPerform(File inputFile) { String key = keyPrefix + inputFile.getName(); - if (client.doesObjectExist(bucketName, key)) { - throw new UnsupportedOperationException(String.format("Object '%s' already exists in bucket '%s'", key, bucketName)); - } + try { + if (client.listObjects(bucketName, key).iterator().hasNext()) { + throw new UnsupportedOperationException(String.format("Object '%s' already exists in bucket '%s'", key, bucketName)); + } - client.putObject(bucketName, key, inputFile); - return inputFile; + try (FileInputStream fileInputStream = new FileInputStream(inputFile)) { + client.putObject(bucketName, key, fileInputStream, inputFile.length(), "application/json"); + return inputFile; + } + } catch (InvalidKeyException | NoSuchAlgorithmException | NoResponseException | XmlPullParserException | InvalidArgumentException | InvalidBucketNameException | ErrorResponseException | InternalException | InsufficientDataException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java index fba08e7..7f8fd07 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrDAO.java @@ -18,6 +18,9 @@ */ package org.apache.ambari.infra.job.archive; +import java.io.IOException; +import java.io.UncheckedIOException; + import org.apache.ambari.infra.job.SolrDAOBase; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; @@ -26,15 +29,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.io.UncheckedIOException; - public class SolrDAO extends SolrDAOBase implements DocumentWiper { private static final Logger LOG = LoggerFactory.getLogger(SolrDAO.class); - private final SolrProperties queryProperties; + private final SolrParameters queryProperties; - public SolrDAO(SolrProperties queryProperties) { + public SolrDAO(SolrParameters queryProperties) { super(queryProperties.getZooKeeperConnectionString(), queryProperties.getCollection()); this.queryProperties = queryProperties; } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrParameters.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrParameters.java new file mode 100644 index 0000000..a793c9b --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrParameters.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.archive; + +import static org.apache.commons.lang.StringUtils.isBlank; + +public class SolrParameters { + private String zooKeeperConnectionString; + private String collection; + private String queryText; + private String filterQueryText; + private String[] sortColumn; + private String deleteQueryText; + + public String getZooKeeperConnectionString() { + return zooKeeperConnectionString; + } + + public void setZooKeeperConnectionString(String zooKeeperConnectionString) { + this.zooKeeperConnectionString = zooKeeperConnectionString; + } + + public String getCollection() { + return collection; + } + + public void setCollection(String collection) { + this.collection = collection; + } + + public String getQueryText() { + return queryText; + } + + public void setQueryText(String queryText) { + this.queryText = queryText; + } + + public String getFilterQueryText() { + return filterQueryText; + } + + public void setFilterQueryText(String filterQueryText) { + this.filterQueryText = filterQueryText; + } + + public String[] getSortColumn() { + return sortColumn; + } + + public void setSortColumn(String[] sortColumn) { + this.sortColumn = sortColumn; + } + + public String getDeleteQueryText() { + return deleteQueryText; + } + + public void setDeleteQueryText(String deleteQueryText) { + this.deleteQueryText = deleteQueryText; + } + + public SolrQueryBuilder toQueryBuilder() { + return new SolrQueryBuilder(). + setQueryText(queryText) + .setFilterQueryText(filterQueryText) + .addSort(sortColumn); + } + + public void validate() { + if (isBlank(zooKeeperConnectionString)) + throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!"); + + if (isBlank(collection)) + throw new IllegalArgumentException("The property collection can not be null or empty string!"); + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java index a2a78c2..1cb2d62 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrProperties.java @@ -18,12 +18,10 @@ */ package org.apache.ambari.infra.job.archive; -import org.springframework.batch.core.JobParameters; - import java.util.ArrayList; import java.util.List; -import static org.apache.commons.lang.StringUtils.isBlank; +import org.springframework.batch.core.JobParameters; public class SolrProperties { private String zooKeeperConnectionString; @@ -81,19 +79,13 @@ public void setDeleteQueryText(String deleteQueryText) { this.deleteQueryText = deleteQueryText; } - public SolrQueryBuilder toQueryBuilder() { - return new SolrQueryBuilder(). - setQueryText(queryText) - .setFilterQueryText(filterQueryText) - .addSort(sortColumn); - } - - public void apply(JobParameters jobParameters) { - zooKeeperConnectionString = jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString); - collection = jobParameters.getString("collection", collection); - queryText = jobParameters.getString("queryText", queryText); - filterQueryText = jobParameters.getString("filterQueryText", filterQueryText); - deleteQueryText = jobParameters.getString("deleteQueryText", deleteQueryText); + public SolrParameters merge(JobParameters jobParameters) { + SolrParameters solrParameters = new SolrParameters(); + solrParameters.setZooKeeperConnectionString(jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString)); + solrParameters.setCollection(jobParameters.getString("collection", collection)); + solrParameters.setQueryText(jobParameters.getString("queryText", queryText)); + solrParameters.setFilterQueryText(jobParameters.getString("filterQueryText", filterQueryText)); + solrParameters.setDeleteQueryText(jobParameters.getString("deleteQueryText", deleteQueryText)); String sortValue; List<String> sortColumns = new ArrayList<>(); @@ -102,16 +94,8 @@ public void apply(JobParameters jobParameters) { sortColumns.add(sortValue); ++i; } + solrParameters.setSortColumn(sortColumns.toArray(new String[0])); - if (sortColumns.size() > 0) - sortColumn = sortColumns.toArray(new String[sortColumns.size()]); - } - - public void validate() { - if (isBlank(zooKeeperConnectionString)) - throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!"); - - if (isBlank(collection)) - throw new IllegalArgumentException("The property collection can not be null or empty string!"); + return solrParameters; } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java index 0e41169..40771dc 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/SolrQueryBuilder.java @@ -18,16 +18,32 @@ */ package org.apache.ambari.infra.job.archive; -import org.apache.solr.client.solrj.SolrQuery; +import static org.apache.ambari.infra.job.archive.FileNameSuffixFormatter.SOLR_DATETIME_FORMATTER; +import static org.apache.commons.lang.StringUtils.isBlank; +import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.apache.solr.client.solrj.SolrQuery.ORDER.asc; +import java.time.Duration; +import java.time.OffsetDateTime; import java.util.HashMap; import java.util.Map; -import static org.apache.commons.lang.StringUtils.isBlank; -import static org.apache.solr.client.solrj.SolrQuery.ORDER.asc; +import org.apache.solr.client.solrj.SolrQuery; public class SolrQueryBuilder { + public static String computeEnd(String end, Duration ttl) { + return computeEnd(end, OffsetDateTime.now(), ttl); + } + + public static String computeEnd(String end, OffsetDateTime now, Duration ttl) { + if (isNotBlank(end)) + return end; + if (ttl != null) + return SOLR_DATETIME_FORMATTER.format(now.minus(ttl)); + return null; + } + private static final String INTERVAL_START = "start"; private static final String INTERVAL_END = "end"; private String queryText; diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpConfiguration.java new file mode 100644 index 0000000..27f61fa --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpConfiguration.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.cleanup; + +import static org.apache.ambari.infra.job.JobsPropertyMap.PARAMETERS_CONTEXT_KEY; + +import javax.inject.Inject; + +import org.apache.ambari.infra.job.InfraJobExecutionDao; +import org.apache.ambari.infra.job.JobPropertiesHolder; +import org.apache.ambari.infra.job.JobScheduler; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.event.EventListener; + +@Configuration +public class CleanUpConfiguration { + + public static final String JOB_NAME = "clean_up"; + private final StepBuilderFactory steps; + private final JobBuilderFactory jobs; + private final JobScheduler scheduler; + private final CleanUpProperties cleanUpProperties; + + @Inject + public CleanUpConfiguration(StepBuilderFactory steps, JobBuilderFactory jobs, CleanUpProperties cleanUpProperties, JobScheduler scheduler) { + this.steps = steps; + this.jobs = jobs; + this.scheduler = scheduler; + this.cleanUpProperties = cleanUpProperties; + } + + @EventListener(ApplicationReadyEvent.class) + public void scheduleJob() { + cleanUpProperties.scheduling().ifPresent(schedulingProperties -> scheduler.schedule(JOB_NAME, schedulingProperties)); + } + + @Bean(name = "cleanUpJob") + public Job job(@Qualifier("cleanUpStep") Step cleanUpStep) { + return jobs.get(JOB_NAME).listener(new JobPropertiesHolder<>(cleanUpProperties)).start(cleanUpStep).build(); + } + + @Bean(name = "cleanUpStep") + protected Step cleanUpStep(TaskHistoryWiper taskHistoryWiper) { + return steps.get("cleanUpStep").tasklet(taskHistoryWiper).build(); + } + + @Bean + @StepScope + protected TaskHistoryWiper taskHistoryWiper( + InfraJobExecutionDao infraJobExecutionDao, + @Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") CleanUpParameters cleanUpParameters) { + return new TaskHistoryWiper(infraJobExecutionDao, cleanUpParameters.getTtl()); + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpParameters.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpParameters.java new file mode 100644 index 0000000..a4f2141 --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpParameters.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.cleanup; + +import java.time.Duration; + +import org.apache.ambari.infra.job.Validatable; +import org.apache.ambari.infra.json.DurationToStringConverter; +import org.apache.ambari.infra.json.StringToDurationConverter; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +public class CleanUpParameters implements Validatable { + + @JsonSerialize(converter = DurationToStringConverter.class) + @JsonDeserialize(converter = StringToDurationConverter.class) + private Duration ttl; + + public Duration getTtl() { + return ttl; + } + + public void setTtl(Duration ttl) { + this.ttl = ttl; + } + + @Override + public void validate() { + + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpProperties.java new file mode 100644 index 0000000..7bf9808 --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/CleanUpProperties.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.cleanup; + +import static org.apache.ambari.infra.json.StringToDurationConverter.toDuration; + +import java.time.Duration; + +import org.apache.ambari.infra.job.JobProperties; +import org.apache.ambari.infra.json.DurationToStringConverter; +import org.springframework.batch.core.JobParameters; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +@Configuration +@ConfigurationProperties(prefix = "infra-manager.jobs.clean-up") +public class CleanUpProperties extends JobProperties<CleanUpParameters> { + + private Duration ttl; + + protected CleanUpProperties() { + setEnabled(true); + } + + public Duration getTtl() { + return ttl; + } + + public void setTtl(Duration ttl) { + this.ttl = ttl; + } + + @Override + public CleanUpParameters merge(JobParameters jobParameters) { + CleanUpParameters cleanUpParameters = new CleanUpParameters(); + cleanUpParameters.setTtl(toDuration(jobParameters.getString("ttl", DurationToStringConverter.toString(ttl)))); + return cleanUpParameters; + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/TaskHistoryWiper.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/TaskHistoryWiper.java new file mode 100644 index 0000000..594515e --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/cleanup/TaskHistoryWiper.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.cleanup; + +import java.time.Duration; +import java.time.OffsetDateTime; + +import org.apache.ambari.infra.job.InfraJobExecutionDao; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; + +public class TaskHistoryWiper implements Tasklet { + + private static final Logger logger = LoggerFactory.getLogger(TaskHistoryWiper.class); + public static final Duration DEFAULT_TTL = Duration.ofHours(1); + + private final InfraJobExecutionDao infraJobExecutionDao; + private final Duration ttl; + + public TaskHistoryWiper(InfraJobExecutionDao infraJobExecutionDao, Duration ttl) { + this.infraJobExecutionDao = infraJobExecutionDao; + if (ttl == null || ttl.compareTo(DEFAULT_TTL) < 0) { + logger.info("The ttl value ({}) less than the minimum required. Using the default ({}) instead", ttl, DEFAULT_TTL); + this.ttl = DEFAULT_TTL; + } + else { + this.ttl = ttl; + } + } + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { + infraJobExecutionDao.deleteJobExecutions(OffsetDateTime.now().minus(ttl)); + return RepeatStatus.FINISHED; + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DeletingParameters.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DeletingParameters.java new file mode 100644 index 0000000..71d98e1 --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DeletingParameters.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.job.deleting; + +import static org.apache.commons.lang.StringUtils.isBlank; + +import java.time.Duration; + +import org.apache.ambari.infra.job.Validatable; +import org.apache.ambari.infra.json.DurationToStringConverter; +import org.apache.ambari.infra.json.StringToDurationConverter; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +public class DeletingParameters implements Validatable { + private String zooKeeperConnectionString; + private String collection; + private String filterField; + private String start; + private String end; + @JsonSerialize(converter = DurationToStringConverter.class) + @JsonDeserialize(converter = StringToDurationConverter.class) + private Duration ttl; + + public String getZooKeeperConnectionString() { + return zooKeeperConnectionString; + } + + public void setZooKeeperConnectionString(String zooKeeperConnectionString) { + this.zooKeeperConnectionString = zooKeeperConnectionString; + } + + public String getCollection() { + return collection; + } + + public void setCollection(String collection) { + this.collection = collection; + } + + public String getFilterField() { + return filterField; + } + + public void setFilterField(String filterField) { + this.filterField = filterField; + } + + public String getStart() { + return start; + } + + public void setStart(String start) { + this.start = start; + } + + public String getEnd() { + return end; + } + + public void setEnd(String end) { + this.end = end; + } + + public Duration getTtl() { + return ttl; + } + + public void setTtl(Duration ttl) { + this.ttl = ttl; + } + + @Override + public void validate() { + if (isBlank(zooKeeperConnectionString)) + throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!"); + + if (isBlank(collection)) + throw new IllegalArgumentException("The property collection can not be null or empty string!"); + + if (isBlank(filterField)) + throw new IllegalArgumentException("The property filterField can not be null or empty string!"); + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java index 4a68c49..f9a782c 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingConfiguration.java @@ -18,6 +18,10 @@ */ package org.apache.ambari.infra.job.deleting; +import static org.apache.ambari.infra.job.JobsPropertyMap.PARAMETERS_CONTEXT_KEY; + +import javax.inject.Inject; + import org.apache.ambari.infra.job.AbstractJobsConfiguration; import org.apache.ambari.infra.job.JobScheduler; import org.springframework.batch.core.Job; @@ -33,10 +37,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import javax.inject.Inject; - @Configuration -public class DocumentDeletingConfiguration extends AbstractJobsConfiguration<DocumentDeletingProperties> { +public class DocumentDeletingConfiguration extends AbstractJobsConfiguration<DocumentDeletingProperties, DeletingParameters> { private final StepBuilderFactory steps; private final Step deleteStep; @@ -70,9 +72,7 @@ public Step deleteStep(DocumentWiperTasklet tasklet) { @Bean @StepScope public DocumentWiperTasklet documentWiperTasklet( - @Value("#{stepExecution.jobExecution.executionContext.get('jobProperties')}") DocumentDeletingProperties properties, - @Value("#{jobParameters[start]}") String start, - @Value("#{jobParameters[end]}") String end) { - return new DocumentWiperTasklet(properties, start, end); + @Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") DeletingParameters parameters) { + return new DocumentWiperTasklet(parameters); } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java index 63b7dd2..e7ecc13 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentDeletingProperties.java @@ -18,19 +18,19 @@ */ package org.apache.ambari.infra.job.deleting; +import static org.apache.ambari.infra.json.StringToDurationConverter.toDuration; + +import java.time.Duration; + import org.apache.ambari.infra.job.JobProperties; +import org.apache.ambari.infra.json.DurationToStringConverter; import org.springframework.batch.core.JobParameters; -import static org.apache.commons.lang.StringUtils.isBlank; - -public class DocumentDeletingProperties extends JobProperties<DocumentDeletingProperties> { +public class DocumentDeletingProperties extends JobProperties<DeletingParameters> { private String zooKeeperConnectionString; private String collection; private String filterField; - - public DocumentDeletingProperties() { - super(DocumentDeletingProperties.class); - } + private Duration ttl; public String getZooKeeperConnectionString() { return zooKeeperConnectionString; @@ -56,22 +56,23 @@ public void setFilterField(String filterField) { this.filterField = filterField; } - @Override - public void apply(JobParameters jobParameters) { - zooKeeperConnectionString = jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString); - collection = jobParameters.getString("collection", collection); - filterField = jobParameters.getString("filterField", filterField); + public Duration getTtl() { + return ttl; } - @Override - public void validate() { - if (isBlank(zooKeeperConnectionString)) - throw new IllegalArgumentException("The property zooKeeperConnectionString can not be null or empty string!"); - - if (isBlank(collection)) - throw new IllegalArgumentException("The property collection can not be null or empty string!"); + public void setTtl(Duration ttl) { + this.ttl = ttl; + } - if (isBlank(filterField)) - throw new IllegalArgumentException("The property filterField can not be null or empty string!"); + @Override + public DeletingParameters merge(JobParameters jobParameters) { + DeletingParameters deletingParameters = new DeletingParameters(); + deletingParameters.setZooKeeperConnectionString(jobParameters.getString("zooKeeperConnectionString", zooKeeperConnectionString)); + deletingParameters.setCollection(jobParameters.getString("collection", collection)); + deletingParameters.setFilterField(jobParameters.getString("filterField", filterField)); + deletingParameters.setStart(jobParameters.getString("start", "*")); + deletingParameters.setEnd(jobParameters.getString("end", "*")); + deletingParameters.setTtl(toDuration(jobParameters.getString("ttl", DurationToStringConverter.toString(ttl)))); + return deletingParameters; } } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java index 463e6e0..69d8c62 100644 --- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/deleting/DocumentWiperTasklet.java @@ -18,6 +18,8 @@ */ package org.apache.ambari.infra.job.deleting; +import static org.apache.ambari.infra.job.archive.SolrQueryBuilder.computeEnd; + import org.apache.ambari.infra.job.SolrDAOBase; import org.apache.solr.client.solrj.util.ClientUtils; import org.springframework.batch.core.StepContribution; @@ -26,20 +28,19 @@ import org.springframework.batch.repeat.RepeatStatus; public class DocumentWiperTasklet extends SolrDAOBase implements Tasklet { - private final String filterField; - private final String start; - private final String end; + private final DeletingParameters parameters; - public DocumentWiperTasklet(DocumentDeletingProperties properties, String start, String end) { - super(properties.getZooKeeperConnectionString(), properties.getCollection()); - this.filterField = properties.getFilterField(); - this.start = start; - this.end = end; + public DocumentWiperTasklet(DeletingParameters deletingParameters) { + super(deletingParameters.getZooKeeperConnectionString(), deletingParameters.getCollection()); + parameters = deletingParameters; } @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { - delete(String.format("%s:[%s TO %s]", filterField, getValue(start), getValue(end))); + delete(String.format("%s:[%s TO %s]", + parameters.getFilterField(), + getValue(parameters.getStart()), + getValue(computeEnd(parameters.getEnd(), parameters.getTtl())))); return RepeatStatus.FINISHED; } diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/DurationToStringConverter.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/DurationToStringConverter.java new file mode 100644 index 0000000..0946dff --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/DurationToStringConverter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.json; + +import java.time.Duration; + +import com.fasterxml.jackson.databind.util.StdConverter; + +public class DurationToStringConverter extends StdConverter<Duration, String> { + @Override + public String convert(Duration value) { + return toString(value); + } + + public static String toString(Duration value) { + return value == null ? null : value.toString(); + } +} diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/StringToDurationConverter.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/StringToDurationConverter.java new file mode 100644 index 0000000..2a385cf --- /dev/null +++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/json/StringToDurationConverter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.json; + +import java.time.Duration; + +import com.fasterxml.jackson.databind.util.StdConverter; + +public class StringToDurationConverter extends StdConverter<String, Duration> { + @Override + public Duration convert(String value) { + return toDuration(value); + } + + public static Duration toDuration(String value) { + return value == null ? null : Duration.parse(value); + } +} diff --git a/ambari-infra-manager/src/main/resources/infra-manager.properties b/ambari-infra-manager/src/main/resources/infra-manager.properties index a0712ba..6830b81 100644 --- a/ambari-infra-manager/src/main/resources/infra-manager.properties +++ b/ambari-infra-manager/src/main/resources/infra-manager.properties @@ -28,15 +28,16 @@ infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.query_text=logt infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.filter_query_text=(logtime:${logtime} AND id:{${id} TO *]) OR logtime:{${logtime} TO ${end}] infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[0]=logtime infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.sort_column[1]=id +infra-manager.jobs.solr_data_archiving.archive_service_logs.solr.delete_query_text=logtime:[${start.logtime} TO ${end.logtime}} OR (logtime:${end.logtime} AND id:[* TO ${end.id}]) infra-manager.jobs.solr_data_archiving.archive_service_logs.read_block_size=100 infra-manager.jobs.solr_data_archiving.archive_service_logs.write_block_size=150 infra-manager.jobs.solr_data_archiving.archive_service_logs.destination=LOCAL infra-manager.jobs.solr_data_archiving.archive_service_logs.local_destination_directory=/tmp/ambariInfraManager infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_column=logtime infra-manager.jobs.solr_data_archiving.archive_service_logs.file_name_suffix_date_format=yyyy-MM-dd'T'HH-mm-ss.SSSX -infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.enabled=true +infra-manager.jobs.solr_data_archiving.archive_service_logs.ttl=PT24H +infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.enabled=false infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.cron=0 * * * * ? -infra-manager.jobs.solr_data_archiving.archive_service_logs.scheduling.intervalEndDelta=PT24H infra-manager.jobs.solr_data_archiving.archive_audit_logs.enabled=true infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_archiving.archive_audit_logs.solr.collection=audit_logs @@ -72,3 +73,6 @@ infra-manager.jobs.solr_data_deleting.delete_audit_logs.enabled=true infra-manager.jobs.solr_data_deleting.delete_audit_logs.zoo_keeper_connection_string=zookeeper:2181 infra-manager.jobs.solr_data_deleting.delete_audit_logs.collection=audit_logs infra-manager.jobs.solr_data_deleting.delete_audit_logs.filter_field=logtime +infra-manager.jobs.clean-up.ttl=PT24H +infra-manager.jobs.clean-up.scheduling.enabled=true +infra-manager.jobs.clean-up.scheduling.cron=0 * * * * ? diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/env/TestAppConfig.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/env/TestAppConfig.java new file mode 100644 index 0000000..6d07ecd --- /dev/null +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/env/TestAppConfig.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.infra.env; + +import javax.sql.DataSource; + +import org.springframework.batch.admin.service.JdbcSearchableJobExecutionDao; +import org.springframework.batch.admin.service.JdbcSearchableJobInstanceDao; +import org.springframework.batch.admin.service.SearchableJobExecutionDao; +import org.springframework.batch.admin.service.SearchableJobInstanceDao; +import org.springframework.batch.core.repository.ExecutionContextSerializer; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.dao.Jackson2ExecutionContextStringSerializer; +import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.Resource; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.DriverManagerDataSource; +import org.springframework.jdbc.datasource.init.DataSourceInitializer; +import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.support.TransactionTemplate; +import org.sqlite.SQLiteConfig; + +@Configuration +@ComponentScan(basePackages = {"org.apache.ambari.infra.env"}) +public class TestAppConfig { + + @Value("classpath:org/springframework/batch/core/schema-drop-sqlite.sql") + private Resource dropRepositoryTables; + + @Value("classpath:org/springframework/batch/core/schema-sqlite.sql") + private Resource dataRepositorySchema; + + @Bean + public DataSource dataSource() { + DriverManagerDataSource dataSource = new DriverManagerDataSource(); + dataSource.setDriverClassName("org.sqlite.JDBC"); + dataSource.setUrl("jdbc:sqlite:test.db"); + dataSource.setUsername("test"); + dataSource.setPassword("test"); + SQLiteConfig config = new SQLiteConfig(); + config.enforceForeignKeys(true); + dataSource.setConnectionProperties(config.toProperties()); + return dataSource; + } + + @Bean + public DataSourceInitializer dataSourceInitializer() { + ResourceDatabasePopulator databasePopulator = new ResourceDatabasePopulator(); + databasePopulator.addScript(dropRepositoryTables); + databasePopulator.setIgnoreFailedDrops(true); + databasePopulator.addScript(dataRepositorySchema); + databasePopulator.setContinueOnError(true); + + DataSourceInitializer initializer = new DataSourceInitializer(); + initializer.setDataSource(dataSource()); + initializer.setDatabasePopulator(databasePopulator); + + return initializer; + } + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + + @Bean + public SearchableJobInstanceDao searchableJobInstanceDao(JdbcTemplate jdbcTemplate) { + JdbcSearchableJobInstanceDao dao = new JdbcSearchableJobInstanceDao(); + dao.setJdbcTemplate(jdbcTemplate); + return dao; + } + + @Bean + public SearchableJobExecutionDao searchableJobExecutionDao(JdbcTemplate jdbcTemplate, DataSource dataSource) { + JdbcSearchableJobExecutionDao dao = new JdbcSearchableJobExecutionDao(); + dao.setJdbcTemplate(jdbcTemplate); + dao.setDataSource(dataSource); + return dao; + } + + @Bean + public ExecutionContextSerializer executionContextSerializer() { + return new Jackson2ExecutionContextStringSerializer(); + } + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) { + return new TransactionTemplate(transactionManager); + } + + @Bean + public JobRepository jobRepository(ExecutionContextSerializer executionContextSerializer, PlatformTransactionManager transactionManager) throws Exception { + JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); + factory.setDataSource(dataSource()); + factory.setTransactionManager(transactionManager); + factory.setSerializer(executionContextSerializer); + factory.afterPropertiesSet(); + return factory.getObject(); + } + +} diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/InfraJobExecutionDAOIT.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/InfraJobExecutionDAOIT.java new file mode 100644 index 0000000..7128cbb --- /dev/null +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/InfraJobExecutionDAOIT.java @@ -0,0 +1,99 @@ +package org.apache.ambari.infra.job; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; + +import java.time.OffsetDateTime; +import java.util.Date; + +import javax.inject.Inject; + +import org.apache.ambari.infra.env.TestAppConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.admin.service.SearchableJobExecutionDao; +import org.springframework.batch.admin.service.SearchableJobInstanceDao; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; +import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.repository.JobRestartException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.transaction.support.TransactionTemplate; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {TestAppConfig.class}) +public class InfraJobExecutionDAOIT { + + private static int jobCounter = 0; + + @Inject + private JdbcTemplate jdbcTemplate; + @Inject + private TransactionTemplate transactionTemplate; + @Inject + private JobRepository jobRepository; + @Inject + private SearchableJobExecutionDao searchableJobExecutionDao; + @Inject + private SearchableJobInstanceDao searchableJobInstanceDao; + private InfraJobExecutionDao infraJobExecutionDao; + + @Before + public void setUp() { + infraJobExecutionDao = new InfraJobExecutionDao(jdbcTemplate, transactionTemplate); + } + + @Test + public void testDeleteJobExecutions() throws Exception { + JobExecution yesterdayJob = newJobAt(OffsetDateTime.now().minusDays(1)); + JobExecution todayJob = newJobAt(OffsetDateTime.now()); + + infraJobExecutionDao.deleteJobExecutions(OffsetDateTime.now().minusHours(1)); + + assertThat(searchableJobExecutionDao.getJobExecution(todayJob.getId()), is(not(nullValue()))); + assertThat(searchableJobExecutionDao.getJobExecution(yesterdayJob.getId()), is(nullValue())); + + assertThat(searchableJobInstanceDao.getJobInstance(todayJob.getJobId()), is(not(nullValue()))); + assertThat(searchableJobInstanceDao.getJobInstance(yesterdayJob.getJobId()), is(nullValue())); + } + + private JobExecution newJobAt(OffsetDateTime createdAt) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException { + JobParameters jobParameters = new JobParametersBuilder().addString("test param", "test value").toJobParameters(); + JobExecution jobExecution = jobRepository.createJobExecution("test job" + jobCounter++ , jobParameters); + jobExecution.setCreateTime(Date.from(createdAt.toInstant())); + jobRepository.update(jobExecution); + + StepExecution stepExecution = new StepExecution("step1", jobExecution); + jobRepository.add(stepExecution); + + return jobExecution; + } +} \ No newline at end of file diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobPropertiesTest.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobPropertiesTest.java deleted file mode 100644 index 3b7caab..0000000 --- a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/JobPropertiesTest.java +++ /dev/null @@ -1,56 +0,0 @@ -package org.apache.ambari.infra.job; - -import org.apache.ambari.infra.job.archive.DocumentArchivingProperties; -import org.apache.ambari.infra.job.archive.SolrProperties; -import org.junit.Test; - -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.assertThat; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -public class JobPropertiesTest { - @Test - public void testDeepCopy() throws Exception { - DocumentArchivingProperties documentArchivingProperties = new DocumentArchivingProperties(); - documentArchivingProperties.setLocalDestinationDirectory("/tmp"); - documentArchivingProperties.setFileNameSuffixColumn(".json"); - documentArchivingProperties.setReadBlockSize(10); - documentArchivingProperties.setWriteBlockSize(20); - SolrProperties solr = new SolrProperties(); - solr.setZooKeeperConnectionString("localhost:2181"); - solr.setFilterQueryText("id:1167"); - solr.setQueryText("name:'Joe'"); - solr.setCollection("Users"); - solr.setSortColumn(new String[] {"name"}); - documentArchivingProperties.setSolr(solr); - - DocumentArchivingProperties parsed = documentArchivingProperties.deepCopy(); - - assertThat(parsed.getLocalDestinationDirectory(), is(documentArchivingProperties.getLocalDestinationDirectory())); - assertThat(parsed.getFileNameSuffixColumn(), is(documentArchivingProperties.getFileNameSuffixColumn())); - assertThat(parsed.getReadBlockSize(), is(documentArchivingProperties.getReadBlockSize())); - assertThat(parsed.getWriteBlockSize(), is(documentArchivingProperties.getWriteBlockSize())); - assertThat(parsed.getSolr().getZooKeeperConnectionString(), is(documentArchivingProperties.getSolr().getZooKeeperConnectionString())); - assertThat(parsed.getSolr().getQueryText(), is(solr.getQueryText())); - assertThat(parsed.getSolr().getFilterQueryText(), is(solr.getFilterQueryText())); - assertThat(parsed.getSolr().getCollection(), is(solr.getCollection())); - assertThat(parsed.getSolr().getSortColumn(), is(solr.getSortColumn())); - } -} \ No newline at end of file diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java index be8a226..b7bda57 100644 --- a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrPropertiesTest.java @@ -1,12 +1,12 @@ package org.apache.ambari.infra.job.archive; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + import org.junit.Test; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertThat; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -27,7 +27,7 @@ */ public class SolrPropertiesTest { @Test - public void testApplySortColumns() throws Exception { + public void testApplySortColumns() { JobParameters jobParameters = new JobParametersBuilder() .addString("sortColumn[0]", "logtime") .addString("sortColumn[1]", "id") @@ -35,20 +35,20 @@ public void testApplySortColumns() throws Exception { SolrProperties solrProperties = new SolrProperties(); solrProperties.setSortColumn(new String[] {"testColumn"}); - solrProperties.apply(jobParameters); - assertThat(solrProperties.getSortColumn().length, is(2)); - assertThat(solrProperties.getSortColumn()[0], is("logtime")); - assertThat(solrProperties.getSortColumn()[1], is("id")); + SolrParameters solrParameters = solrProperties.merge(jobParameters); + assertThat(solrParameters.getSortColumn().length, is(2)); + assertThat(solrParameters.getSortColumn()[0], is("logtime")); + assertThat(solrParameters.getSortColumn()[1], is("id")); } @Test - public void testApplyWhenNoSortIsDefined() throws Exception { + public void testApplyWhenNoSortIsDefined() { JobParameters jobParameters = new JobParametersBuilder() .toJobParameters(); SolrProperties solrProperties = new SolrProperties(); solrProperties.setSortColumn(new String[] {"testColumn"}); - solrProperties.apply(jobParameters); - assertThat(solrProperties.getSortColumn().length, is(1)); + SolrParameters solrParameters = solrProperties.merge(jobParameters); + assertThat(solrParameters.getSortColumn().length, is(0)); } } \ No newline at end of file diff --git a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrQueryBuilderTest.java b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrQueryBuilderTest.java index ee08279..0f7049b 100644 --- a/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrQueryBuilderTest.java +++ b/ambari-infra-manager/src/test/java/org/apache/ambari/infra/job/archive/SolrQueryBuilderTest.java @@ -18,15 +18,20 @@ */ package org.apache.ambari.infra.job.archive; -import org.apache.solr.client.solrj.SolrQuery; -import org.junit.Test; - -import java.util.HashMap; - +import static org.apache.ambari.infra.job.archive.SolrQueryBuilder.computeEnd; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; import static org.junit.Assert.assertThat; +import java.time.Duration; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.HashMap; + +import org.apache.solr.client.solrj.SolrQuery; +import org.hamcrest.core.Is; +import org.junit.Test; + public class SolrQueryBuilderTest { private static final Document DOCUMENT = new Document(new HashMap<String, String>() {{ put("logtime", "2017-10-02'T'10:00:11.634Z"); @@ -103,4 +108,27 @@ public void test_start_and_end_values_are_null() throws Exception { SolrQuery solrQuery = new SolrQueryBuilder().setQueryText("id:[${start} TO ${end}]").build(); assertThat(solrQuery.getQuery(), is("id:[* TO *]")); } + + @Test + public void testComputeEndReturnsNullIsNoEndAndNoTTLWasGiven() { + assertThat(computeEnd(null, OffsetDateTime.now(), null), Is.is(nullValue())); + } + + @Test + public void testComputeEndReturnsEndIfOnlyEndWasGiven() { + String end = "2018-10-09T10:11:12.000Z"; + assertThat(computeEnd(end, OffsetDateTime.now(), null), Is.is(end)); + } + + @Test + public void testComputeEndReturnsNowMinusTtlIfOnlyTtlWasGiven() { + OffsetDateTime now = OffsetDateTime.of(2018, 10, 9, 10, 11, 12, 0, ZoneOffset.UTC); + assertThat(computeEnd(null, now, Duration.ofDays(5)), Is.is("2018-10-04T10:11:12.000Z")); + } + + @Test + public void testComputeEndReturnsEndIfBothWasGiven() { + String end = "2018-10-09T10:11:12.000Z"; + assertThat(computeEnd(end, OffsetDateTime.now(), Duration.ofDays(5)), Is.is(end)); + } } diff --git a/ambari-infra-solr-plugin/pom.xml b/ambari-infra-solr-plugin/pom.xml index a3619cd..9de8e72 100644 --- a/ambari-infra-solr-plugin/pom.xml +++ b/ambari-infra-solr-plugin/pom.xml @@ -44,6 +44,11 @@ <version>${solr.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> <resources> diff --git a/pom.xml b/pom.xml index 9402114..ab82c4b 100644 --- a/pom.xml +++ b/pom.xml @@ -305,7 +305,7 @@ <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> - <version>4.10</version> + <version>4.12</version> </dependency> <dependency> <groupId>commons-cli</groupId> @@ -363,6 +363,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + </dependency> </dependencies> </dependencyManagement> ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services