This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch TUBEMQ-336 in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/TUBEMQ-336 by this push: new 1c177a7 [TUBEMQ-364] uniform response format for exception state (#278) 1c177a7 is described below commit 1c177a7edee6d7506e961ab5b32064c595828c70 Author: Yuanbo Liu <yua...@apache.org> AuthorDate: Sun Sep 27 18:33:26 2020 +0800 [TUBEMQ-364] uniform response format for exception state (#278) --- .../org/apache/tubemq/manager/TubeMQManager.java | 44 ++++++---- .../tubemq/manager/backend/AbstractDaemon.java | 97 ---------------------- .../controller/ManagerControllerAdvice.java | 46 ++++++++++ .../{ => business}/BusinessController.java | 45 ++++++---- .../controller/{ => business}/BusinessResult.java | 6 +- .../apache/tubemq/manager/entry/BusinessEntry.java | 7 +- .../TubeMQManagerException.java} | 23 ++--- .../AsyncService.java} | 33 ++------ .../manager/controller/TestBusinessController.java | 17 +++- 9 files changed, 142 insertions(+), 176 deletions(-) diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java index 5df581d..a25897b 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/TubeMQManager.java @@ -16,30 +16,44 @@ */ package org.apache.tubemq.manager; -import org.apache.tubemq.manager.backend.AbstractDaemon; +import java.util.concurrent.Executor; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; import org.springframework.data.jpa.repository.config.EnableJpaAuditing; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @SpringBootApplication @EnableJpaAuditing -public class TubeMQManager extends AbstractDaemon { - public static void main(String[] args) throws Exception { - TubeMQManager manager = new TubeMQManager(); - manager.startThreads(); - SpringApplication.run(TubeMQManager.class); - // web application stopped, then stop working threads. - manager.stopThreads(); - manager.join(); - } +@EnableAsync +public class TubeMQManager { - @Override - public void startThreads() throws Exception { + @Value("${manager.async.core.pool.size:2}") + private int asyncCorePoolSize; - } + @Value("${manager.async.max.pool.size:20}") + private int asyncMaxPoolSize; + + @Value("${manager.async.queue.capacity:100}") + private int asyncQueueCapacity; - @Override - public void stopThreads() throws Exception { + @Value("${manager.async.thread.prefix:AsyncThread-}") + private String threadPrefix; + + public static void main(String[] args) throws Exception { + SpringApplication.run(TubeMQManager.class); + } + @Bean(name = "asyncExecutor") + public Executor asyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(asyncCorePoolSize); + executor.setMaxPoolSize(asyncMaxPoolSize); + executor.setQueueCapacity(asyncQueueCapacity); + executor.setThreadNamePrefix(threadPrefix); + executor.initialize(); + return executor; } } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/AbstractDaemon.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/AbstractDaemon.java deleted file mode 100644 index 2db9318..0000000 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/AbstractDaemon.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tubemq.manager.backend; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Abstract daemon with a batch of working thread. - */ -public abstract class AbstractDaemon implements ThreadStartAndStop { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDaemon.class); - - // worker thread pool - private final ExecutorService workerServices; - private final List<CompletableFuture<?>> workerFutures; - private boolean runnable = true; - - public AbstractDaemon() { - this.workerServices = Executors - .newCachedThreadPool(new TubeMQManagerFactory(this.getClass().getSimpleName())); - this.workerFutures = new ArrayList<>(); - } - - /** - * Whether threads can in running state with while loop. - * - * @return - true if threads can run - */ - public boolean isRunnable() { - return runnable; - } - - /** - * Stop running threads. - */ - public void stopRunningThreads() { - runnable = false; - } - - /** - * Submit work thread to thread pool. - * - * @param worker - work thread - */ - public void submitWorker(Runnable worker) { - CompletableFuture<?> future = CompletableFuture.runAsync(worker, this.workerServices); - workerFutures.add(future); - LOGGER.info("{} running worker number is {}", this.getClass().getName(), - workerFutures.size()); - } - - /** - * Wait for threads finish. - */ - public void join() { - for (CompletableFuture<?> future : workerFutures) { - future.join(); - } - } - - /** - * Stop thread pool and running threads if they're in the running state. - * - * @param timeout - max wait time - * @param timeUnit - time unit - */ - public void waitForTerminate(long timeout, TimeUnit timeUnit) throws Exception { - // stopping working threads. - if (isRunnable()) { - stopRunningThreads(); - workerServices.shutdown(); - workerServices.awaitTermination(timeout, timeUnit); - } - } -} diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java new file mode 100644 index 0000000..33369ca --- /dev/null +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/ManagerControllerAdvice.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tubemq.manager.controller; +import javax.servlet.http.HttpServletRequest; +import org.apache.tubemq.manager.controller.business.BusinessResult; +import org.apache.tubemq.manager.exceptions.TubeMQManagerException; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.RestControllerAdvice; + +/** + * Controller advice for handling exceptions + */ +@RestControllerAdvice +public class ManagerControllerAdvice { + + /** + * handling business TubeMQManagerException, and return json format string. + * + * @param request - http request + * @param ex - exception + * @return entity + */ + @ExceptionHandler(TubeMQManagerException.class) + public BusinessResult handlingBusinessException(HttpServletRequest request, + TubeMQManagerException ex) { + BusinessResult result = new BusinessResult(); + result.setMessage(ex.getMessage()); + result.setCode(-1); + return result; + } +} diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessController.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java similarity index 68% rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessController.java rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java index 934c215..c8190a8 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessController.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessController.java @@ -14,14 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tubemq.manager.controller; +package org.apache.tubemq.manager.controller.business; import java.util.List; import java.util.Optional; +import lombok.extern.slf4j.Slf4j; import org.apache.tubemq.manager.entry.BusinessEntry; +import org.apache.tubemq.manager.exceptions.TubeMQManagerException; import org.apache.tubemq.manager.repository.BusinessRepository; +import org.apache.tubemq.manager.service.AsyncService; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; @@ -32,11 +34,15 @@ import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(path = "/business") +@Slf4j public class BusinessController { @Autowired private BusinessRepository businessRepository; + @Autowired + private AsyncService asyncService; + /** * add new business. * @@ -44,9 +50,9 @@ public class BusinessController { * @throws Exception - exception */ @PostMapping("/add") - public ResponseEntity<?> addBusiness(@RequestBody BusinessEntry entry) throws Exception { - // businessRepository.saveAndFlush(entry); - return ResponseEntity.ok().build(); + public BusinessResult addBusiness(@RequestBody BusinessEntry entry) { + businessRepository.saveAndFlush(entry); + return new BusinessResult(); } /** @@ -56,8 +62,8 @@ public class BusinessController { * @throws Exception */ @PostMapping("/update") - public ResponseEntity<?> updateBusiness(@RequestBody BusinessEntry entry) throws Exception { - return ResponseEntity.ok().build(); + public BusinessResult updateBusiness(@RequestBody BusinessEntry entry) { + return new BusinessResult(); } /** @@ -67,10 +73,10 @@ public class BusinessController { * @throws Exception */ @GetMapping("/check") - public ResponseEntity<?> checkBusinessByName( - @RequestParam String businessName) throws Exception { + public BusinessResult checkBusinessByName( + @RequestParam String businessName) { List<BusinessEntry> result = businessRepository.findAllByBusinessName(businessName); - return ResponseEntity.ok().build(); + return new BusinessResult(); } /** @@ -81,13 +87,20 @@ public class BusinessController { * @throws Exception */ @GetMapping("/get/{id}") - public ResponseEntity<BusinessEntry> getBusinessByID( - @PathVariable Long id) throws Exception { + public BusinessResult getBusinessByID( + @PathVariable Long id) { Optional<BusinessEntry> businessEntry = businessRepository.findById(id); - if (businessEntry.isPresent()) { - return ResponseEntity.ok().build(); - } else { - return ResponseEntity.notFound().build(); + BusinessResult result = new BusinessResult(); + if (!businessEntry.isPresent()) { + result.setCode(-1); + result.setMessage("business not found"); } + return result; + } + + + @GetMapping("/throwException") + public BusinessResult throwException() { + throw new TubeMQManagerException("exception for test"); } } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessResult.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java similarity index 89% rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessResult.java rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java index 6e4a6f6..88c39ae 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/BusinessResult.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/controller/business/BusinessResult.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tubemq.manager.controller; +package org.apache.tubemq.manager.controller.business; import lombok.Data; @@ -23,6 +23,6 @@ import lombok.Data; */ @Data public class BusinessResult { - private int state; - private String msg; + private String message; + private int code = 0; } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java index d56b0f2..88e8e1e 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/entry/BusinessEntry.java @@ -120,9 +120,6 @@ public class BusinessEntry { @Size(max = 32) private String issueMethod; - private BusinessEntry() { - - } public BusinessEntry(String businessName, String schemaName, String username, String passwd, String topic, String encodingType) { @@ -133,4 +130,8 @@ public class BusinessEntry { this.topic = topic; this.encodingType = encodingType; } + + public BusinessEntry() { + + } } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/ThreadStartAndStop.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/exceptions/TubeMQManagerException.java similarity index 67% rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/ThreadStartAndStop.java rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/exceptions/TubeMQManagerException.java index d3cacef..46c888c 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/ThreadStartAndStop.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/exceptions/TubeMQManagerException.java @@ -14,24 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.tubemq.manager.backend; + +package org.apache.tubemq.manager.exceptions; /** - * Interface for starting and stopping backend threads. + * TubeMQ runtime exception. */ -public interface ThreadStartAndStop { - /** - * start all threads. - */ - void startThreads() throws Exception; - - /** - * stop all threads. - */ - void stopThreads() throws Exception; +public class TubeMQManagerException extends RuntimeException { - /** - * wait for all thread finishing. - */ - void join() throws Exception; + public TubeMQManagerException(final String message) { + super(message); + } } diff --git a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/TubeMQManagerFactory.java b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java similarity index 50% rename from tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/TubeMQManagerFactory.java rename to tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java index ca72901..335f1d0 100644 --- a/tubemq-manager/src/main/java/org/apache/tubemq/manager/backend/TubeMQManagerFactory.java +++ b/tubemq-manager/src/main/java/org/apache/tubemq/manager/service/AsyncService.java @@ -15,32 +15,17 @@ * limitations under the License. */ -package org.apache.tubemq.manager.backend; +package org.apache.tubemq.manager.service; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; /** - * Thread factory for tubeMQ manager. + * Service for running async tasks. + * https://howtodoinjava.com/spring-boot2/rest/enableasync-async-controller/ */ -public class TubeMQManagerFactory implements ThreadFactory { +@Service +@Slf4j +public class AsyncService { - private static final Logger LOGGER = LoggerFactory.getLogger(TubeMQManagerFactory.class); - - private final AtomicInteger mThreadNum = new AtomicInteger(1); - - private final String threadType; - - public TubeMQManagerFactory(String threadType) { - this.threadType = threadType; - } - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, threadType + "-running-thread-" + mThreadNum.getAndIncrement()); - LOGGER.info("{} created", t.getName()); - return t; - } -} \ No newline at end of file +} diff --git a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java index e934081..2ddfb67 100644 --- a/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java +++ b/tubemq-manager/src/test/java/org/apache/tubemq/manager/controller/TestBusinessController.java @@ -17,7 +17,10 @@ package org.apache.tubemq.manager.controller; import java.net.URI; +import java.util.Objects; import lombok.extern.slf4j.Slf4j; +import org.apache.tubemq.manager.controller.business.BusinessController; +import org.apache.tubemq.manager.controller.business.BusinessResult; import org.apache.tubemq.manager.entry.BusinessEntry; import org.junit.Before; import org.junit.Test; @@ -79,8 +82,18 @@ public class TestBusinessController { HttpHeaders headers = new HttpHeaders(); HttpEntity<BusinessEntry> request = new HttpEntity<>(entry, headers); - ResponseEntity<?> responseEntity = - client.postForEntity(uri, request, ResponseEntity.class); + ResponseEntity<BusinessResult> responseEntity = + client.postForEntity(uri, request, BusinessResult.class); assertThat(responseEntity.getStatusCode().is2xxSuccessful()).isEqualTo(true); } + + @Test + public void testControllerException() throws Exception { + final String baseUrl = "http://localhost:" + randomServerPort + "/business/throwException"; + URI uri = new URI(baseUrl); + ResponseEntity<BusinessResult> responseEntity = + client.getForEntity(uri, BusinessResult.class); + assertThat(Objects.requireNonNull(responseEntity.getBody()).getCode()).isEqualTo(-1); + assertThat(responseEntity.getBody().getMessage()).isEqualTo("exception for test"); + } }