Dubbo统一远程调用包装工具+自定义注解对RPC调用封装

这个项目是一个数字商品+区块链相关的一个项目,涉及到很高的并发量,几乎是接触到了电商中的各种高并发场景。项目追求远程调用的性能,用到了Dubbo,并且做了一些相关的封装如下。

远程调用的包装工具类

/**
 * 远程方法调用的包装工具类
 */
public class RemoteCallWrapper {

    private static Logger logger = LoggerFactory.getLogger(RemoteCallWrapper.class);

    private static ImmutableSet<String> SUCCESS_CHECK_METHOD = ImmutableSet.of("isSuccess", "isSucceeded",
            "getSuccess");

    private static ImmutableSet<String> SUCCESS_CODE_METHOD = ImmutableSet.of("getResponseCode");

    private static ImmutableSet<String> SUCCESS_CODE = ImmutableSet.of("SUCCESS", "DUPLICATE",
            "DUPLICATED_REQUEST");

    public static <T, R> R call(Function<T, R> function, T request, boolean checkResponse) {
        return call(function, request, request.getClass().getSimpleName(), checkResponse, false);
    }

    public static <T, R> R call(Function<T, R> function, T request) {
        return call(function, request, request.getClass().getSimpleName(), true, false);
    }

    public static <T, R> R call(Function<T, R> function, T request, String requestName) {
        return call(function, request, requestName, true, false);
    }

    public static <T, R> R call(Function<T, R> function, T request, String requestName,boolean checkResponse) {
        return call(function, request, requestName, checkResponse, false);
    }

    public static <T, R> R call(Function<T, R> function, T request, boolean checkResponse, boolean checkResponseCode) {
        return call(function, request, request.getClass().getSimpleName(), checkResponse, checkResponseCode);
    }

    public static <T, R> R call(Function<T, R> function, T request, String requestName, boolean checkResponse,
                                boolean checkResponseCode) {
        StopWatch stopWatch = new StopWatch();
        R response = null;
        try {

            stopWatch.start();
            response = function.apply(request);
            stopWatch.stop();
            if (checkResponse) {

                Assert.notNull(response, REMOTE_CALL_RESPONSE_IS_NULL.name());

                if (!isResponseValid(response)) {
                    logger.error("Response Invalid on Remote Call request {} , response {}",
                            JSON.toJSONString(request),
                            JSON.toJSONString(response));

                    throw new RemoteCallException(JSON.toJSONString(response), REMOTE_CALL_RESPONSE_IS_FAILED);
                }
            }
            if (checkResponseCode) {

                Assert.notNull(response, REMOTE_CALL_RESPONSE_IS_NULL.name());

                if (!isResponseCodeValid(response)) {
                    logger.error("Response code Invalid on Remote Call request {} , response {}",
                            JSON.toJSONString(request),
                            JSON.toJSONString(response));

                    throw new RemoteCallException(JSON.toJSONString(response), REMOTE_CALL_RESPONSE_IS_FAILED);
                }
            }

        } catch (IllegalAccessException | InvocationTargetException e) {
            logger.error("Catch Exception on Remote Call :" + e.getMessage(), e);
            throw new IllegalArgumentException("Catch Exception on Remote Call " + e.getMessage(), e);
        } catch (Throwable e) {
            logger.error("request exception {}", JSON.toJSONString(request));
            logger.error("Catch Exception on Remote Call :" + e.getMessage(), e);
            throw e;
        } finally {
            if (logger.isInfoEnabled()) {

                logger.info("## Method={} ,## 耗时={}ms ,## [请求报文]:{},## [响应报文]:{}", requestName,
                        stopWatch.getTotalTimeMillis(),
                        JSON.toJSONString(request), JSON.toJSONString(response));
            }
        }

        return response;
    }

    private static <R> boolean isResponseValid(R response)
            throws IllegalAccessException, InvocationTargetException {
        Method successMethod = null;
        Method[] methods = response.getClass().getMethods();
        for (Method method : methods) {
            String methodName = method.getName();
            if (SUCCESS_CHECK_METHOD.contains(methodName)) {
                successMethod = method;
                break;
            }
        }
        if (successMethod == null) {
            return true;
        }

        return (Boolean) successMethod.invoke(response);
    }

    private static <R> boolean isResponseCodeValid(R response)
            throws IllegalAccessException, InvocationTargetException {
        Method successMethod = null;
        Method[] methods = response.getClass().getMethods();
        for (Method method : methods) {
            String methodName = method.getName();
            if (SUCCESS_CODE_METHOD.contains(methodName)) {
                successMethod = method;
                break;
            }
        }
        if (successMethod == null) {
            return true;
        }

        return SUCCESS_CODE.contains(successMethod.invoke(response));
    }
}

使用示例

/**
     * 支付成功
     * <pre>
     *     正常支付成功:
     *     1、查询订单状态
     *     2、推进订单状态到支付成功
     *     3、商品库存真正扣减
     *     4、创建持有的商品
     *     5、推进支付状态到支付成功
     *     6、持有的商品上链
     *
     *     支付幂等成功:
     *      1、查询订单状态
     *      2、推进支付状态到支付成功
     *
     *      重复支付:
     *      1、查询订单状态
     *      2、创建退款单
     *      3、重试退款直到成功
     * </pre>
     */
    @GlobalTransactional(rollbackFor = Exception.class)
    public boolean paySuccess(PaySuccessEvent paySuccessEvent) {

        PayOrder payOrder = payOrderService.queryByOrderId(paySuccessEvent.getPayOrderId());
        if (payOrder.isPaid()) {
            return true;
        }

        SingleResponse<TradeOrderVO> response = orderFacadeService.getTradeOrder(payOrder.getBizNo());
        TradeOrderVO tradeOrderVO = response.getData();

        OrderPayRequest orderPayRequest = getOrderPayRequest(paySuccessEvent, payOrder);
        OrderResponse orderResponse = RemoteCallWrapper.call(req -> orderFacadeService.paySuccess(req), orderPayRequest, "orderFacadeService.pay", false);

        //如果订单已经被其他支付推进到支付成功,或者已经关单,则启动退款流程
        if (needChargeBack(orderResponse)) {
            log.info("order already paid ,do chargeback ," + payOrder.getBizNo());

            Boolean result = payOrderService.paySuccess(paySuccessEvent);
            Assert.isTrue(result, () -> new BizException(PayErrorCode.PAY_SUCCESS_NOTICE_FAILED));
            doChargeBack(paySuccessEvent, tradeOrderVO);

            return true;
        }

        if (!orderResponse.getSuccess()) {
            log.error("orderFacadeService.pay error, response = {}", JSON.toJSONString(orderResponse));
            return false;
        }

        ///confirmSale 被废弃,详Service.confirmSale
        ///GoodsSaleRequest goodsSaleRequest = getGoodsSaleRequest(tradeOrderVO);
        ///GoodsSaleResponse goodsSaleResponse = RemoteCallWrapper.call(req -> goodsFacadeService.confirmSale(req), goodsSaleRequest, "goodsFacadeService.confirmSale");

        GoodsSaleRequest goodsSaleRequest = getGoodsSaleRequest(tradeOrderVO);
        GoodsSaleResponse goodsSaleResponse = RemoteCallWrapper.call(req -> goodsFacadeService.paySuccess(req), goodsSaleRequest, "goodsFacadeService.confirmSale");

        switch (tradeOrderVO.getGoodsType()) {
            case COLLECTION:
                //只有商品需要在支付成功后立即上链
                TransactionHookManager.registerHook(new PaySuccessTransactionHook(goodsSaleResponse.getHeldCollectionId()));
                break;
            default:
                //do nothing
        }

        Boolean result = payOrderService.paySuccess(paySuccessEvent);
        Assert.isTrue(result, () -> new BizException(PayErrorCode.PAY_SUCCESS_NOTICE_FAILED));

        return true;
    }
    @EventListener(value = BlindBoxOpenEvent.class)
    @Async("blindBoxListenExecutor")
    public void onApplicationEvent(BlindBoxOpenEvent event) {
        Long blindBoxItemId = (Long) event.getSource();

        //查询出更新后的最新值,避免后续 cas 操作失败
        BlindBoxItem blindBoxItem = blindBoxItemService.getById(blindBoxItemId);

        //创建heldCollection
        HeldCollectionCreateRequest heldCollectionCreateRequest = getHeldCollectionCreateRequest(blindBoxItem);
        var heldCollection = heldCollectionService.create(heldCollectionCreateRequest);
        Assert.notNull(heldCollection, () -> new BlindBoxException(BLIND_BOX_OPEN_FAILED));

        //上链
        ChainProcessRequest chainProcessRequest = getChainProcessRequest(blindBoxItem, heldCollection);
        //如果失败了,则依靠定时任务补偿
        ChainProcessResponse<ChainOperationData> response = RemoteCallWrapper.call(req -> chainFacadeService.mint(req), chainProcessRequest, "mint");

        //修改盲盒状态
        if (response.getSuccess()) {
            blindBoxItem.openSuccess();
            var saveResult = blindBoxItemService.updateById(blindBoxItem);
            Assert.isTrue(saveResult, () -> new BlindBoxException(BLIND_BOX_ITEM_SAVE_FAILED));
        }
    }

@Override
    @Facade
    public CollectionAirdropResponse airDrop(CollectionAirDropRequest request) {
        //检查用户是否可被空投,这里比较简单,后续如果节点比较多,可以改成责任链
        UserQueryRequest userQueryRequest = new UserQueryRequest(Long.valueOf(request.getRecipientUserId()));
        UserQueryResponse<UserInfo> userQueryResponse = userFacadeService.query(userQueryRequest);
        checkUser(userQueryResponse);
        //检查商品是否可被空投,这里比较简单,后续如果节点比较多,可以改成责任链
        Collection collection = collectionService.queryById(request.getCollectionId());
        checkCollection(collection,request.getQuantity());

        CollectionAirdropResponse response = collectionService.airDrop(request, collection);

        //执行失败或幂等成功,则直接返回,不用调上链操作了
        if (!response.getSuccess() || response.getResponseCode().equals(DUPLICATED.name())) {
            return response;
        }

        for (HeldCollectionVO heldCollection : response.getHeldCollections()) {
            ChainProcessRequest chainProcessRequest = new ChainProcessRequest();
            chainProcessRequest.setRecipient(userQueryResponse.getData().getBlockChainUrl());
            chainProcessRequest.setClassId(String.valueOf(heldCollection.getCollectionId()));
            chainProcessRequest.setClassName(heldCollection.getName());
            chainProcessRequest.setSerialNo(heldCollection.getSerialNo());
            chainProcessRequest.setBizId(heldCollection.getId().toString());
            chainProcessRequest.setBizType(ChainOperateBizTypeEnum.HELD_COLLECTION.name());
            chainProcessRequest.setIdentifier(UUID.randomUUID().toString());
            //如果失败了,则依靠定时任务补偿
            ChainProcessResponse<ChainOperationData> chainProcessResponse = RemoteCallWrapper.call(req -> chainFacadeService.mint(req), chainProcessRequest, "mint");
        }
        response.setSuccess(response.getSuccess());
        return response;
    }

自定义注解

public @interface Facade {
}

切面

/**
 * Facade的切面处理类,统一统计进行参数校验及异常捕获
 */
@Aspect
@Component
@Order(Integer.MIN_VALUE)
public class FacadeAspect {

    private static final Logger LOGGER = LoggerFactory.getLogger(FacadeAspect.class);

    @Around("@annotation(cn.mifu.turbo.rpc.facade.Facade)")
    public Object facade(ProceedingJoinPoint pjp) throws Exception {

        StopWatch stopWatch = new StopWatch();
        stopWatch.start();

        Method method = ((MethodSignature) pjp.getSignature()).getMethod();
        Object[] args = pjp.getArgs();

        Class returnType = ((MethodSignature) pjp.getSignature()).getMethod().getReturnType();

        //循环遍历所有参数,进行参数校验
        for (Object parameter : args) {
            try {
                BeanValidator.validateObject(parameter);
            } catch (ValidationException e) {
                printLog(stopWatch, method, args, "failed to validate", null, e);
                return getFailedResponse(returnType, e);
            }
        }

        try {
            // 目标方法执行
            Object response = pjp.proceed();
            enrichObject(response);
            printLog(stopWatch, method, args, "end to execute", response, null);
            return response;
        } catch (Throwable throwable) {
            // 如果执行异常,则返回一个失败的response
            printLog(stopWatch, method, args, "failed to execute", null, throwable);
            return getFailedResponse(returnType, throwable);
        }
    }

    /**
     * 日志打印
     *
     * @param stopWatch
     * @param method
     * @param args
     * @param action
     * @param response
     */
    private void printLog(StopWatch stopWatch, Method method, Object[] args, String action, Object response,
                          Throwable throwable) {
        try {
            //因为此处有JSON.toJSONString,可能会有异常,需要进行捕获,避免影响主干流程
            LOGGER.info(getInfoMessage(action, stopWatch, method, args, response, throwable), throwable);
            // 如果校验失败,则返回一个失败的response
        } catch (Exception e1) {
            LOGGER.error("log failed", e1);
        }
    }

    /**
     * 统一格式输出,方便做日志统计
     * <p>
     * *** 如果调整此处的格式,需要同步调整日志监控 ***
     *
     * @param action    行为
     * @param stopWatch 耗时
     * @param method    方法
     * @param args      参数
     * @param response  响应
     * @return 拼接后的字符串
     */
    private String getInfoMessage(String action, StopWatch stopWatch, Method method, Object[] args, Object response,
                                  Throwable exception) {

        StringBuilder stringBuilder = new StringBuilder(action);
        stringBuilder.append(" ,method = ");
        stringBuilder.append(method.getName());
        stringBuilder.append(" ,cost = ");
        stringBuilder.append(stopWatch.getTime()).append(" ms");
        if (response instanceof BaseResponse) {
            stringBuilder.append(" ,success = ");
            stringBuilder.append(((BaseResponse) response).getSuccess());
        }
        if (exception != null) {
            stringBuilder.append(" ,success = ");
            stringBuilder.append(false);
        }
        stringBuilder.append(" ,args = ");
        stringBuilder.append(JSON.toJSONString(Arrays.toString(args)));

        if (response != null) {
            stringBuilder.append(" ,resp = ");
            stringBuilder.append(JSON.toJSONString(response));
        }

        if (exception != null) {
            stringBuilder.append(" ,exception = ");
            stringBuilder.append(exception.getMessage());
        }

        if (response instanceof BaseResponse) {
            BaseResponse baseResponse = (BaseResponse) response;
            if (!baseResponse.getSuccess()) {
                stringBuilder.append(" , execute_failed");
            }
        }

        return stringBuilder.toString();
    }

    /**
     * 将response的信息补全,主要是code和message
     *
     * @param response
     */
    private void enrichObject(Object response) {
        if (response instanceof BaseResponse) {
            if (((BaseResponse) response).getSuccess()) {
                //如果状态是成功的,需要将未设置的responseCode设置成SUCCESS
                if (StringUtils.isEmpty(((BaseResponse) response).getResponseCode())) {
                    ((BaseResponse) response).setResponseCode(ResponseCode.SUCCESS.name());
                }
            } else {
                //如果状态是成功的,需要将未设置的responseCode设置成BIZ_ERROR
                if (StringUtils.isEmpty(((BaseResponse) response).getResponseCode())) {
                    ((BaseResponse) response).setResponseCode(ResponseCode.BIZ_ERROR.name());
                }
            }
        }
    }

    /**
     * 定义并返回一个通用的失败响应
     */
    private Object getFailedResponse(Class returnType, Throwable throwable)
            throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {

        //如果返回值的类型为BaseResponse 的子类,则创建一个通用的失败响应
        if (returnType.getDeclaredConstructor().newInstance() instanceof BaseResponse) {
            BaseResponse response = (BaseResponse) returnType.getDeclaredConstructor().newInstance();
            response.setSuccess(false);
            if (throwable instanceof BizException bizException) {
                response.setResponseMessage(bizException.getErrorCode().getMessage());
                response.setResponseCode(bizException.getErrorCode().getCode());
            } else if (throwable instanceof SystemException systemException) {
                response.setResponseMessage(systemException.getErrorCode().getMessage());
                response.setResponseCode(systemException.getErrorCode().getCode());
            } else {
                response.setResponseMessage(throwable.toString());
                response.setResponseCode(ResponseCode.BIZ_ERROR.name());
            }

            return response;
        }

        LOGGER.error(
                "failed to getFailedResponse , returnType (" + returnType + ") is not instanceof BaseResponse");
        return null;
    }
}

工具类

/**
 * 参数校验工具
 *
 * @author Hollis
 */
public class BeanValidator {

    private static Validator validator = Validation.byProvider(HibernateValidator.class).configure().failFast(true)
            .buildValidatorFactory().getValidator();

    /**
     * @param object object
     * @param groups groups
     */
    public static void validateObject(Object object, Class<?>... groups) throws ValidationException {
        Set<ConstraintViolation<Object>> constraintViolations = validator.validate(object, groups);
        if (constraintViolations.stream().findFirst().isPresent()) {
            throw new ValidationException(constraintViolations.stream().findFirst().get().getMessage());
        }
    }
}