Dubbo统一远程调用包装工具+自定义注解对RPC调用封装
这个项目是一个数字商品+区块链相关的一个项目,涉及到很高的并发量,几乎是接触到了电商中的各种高并发场景。项目追求远程调用的性能,用到了Dubbo,并且做了一些相关的封装如下。
远程调用的包装工具类
/**
* 远程方法调用的包装工具类
*/
public class RemoteCallWrapper {
private static Logger logger = LoggerFactory.getLogger(RemoteCallWrapper.class);
private static ImmutableSet<String> SUCCESS_CHECK_METHOD = ImmutableSet.of("isSuccess", "isSucceeded",
"getSuccess");
private static ImmutableSet<String> SUCCESS_CODE_METHOD = ImmutableSet.of("getResponseCode");
private static ImmutableSet<String> SUCCESS_CODE = ImmutableSet.of("SUCCESS", "DUPLICATE",
"DUPLICATED_REQUEST");
public static <T, R> R call(Function<T, R> function, T request, boolean checkResponse) {
return call(function, request, request.getClass().getSimpleName(), checkResponse, false);
}
public static <T, R> R call(Function<T, R> function, T request) {
return call(function, request, request.getClass().getSimpleName(), true, false);
}
public static <T, R> R call(Function<T, R> function, T request, String requestName) {
return call(function, request, requestName, true, false);
}
public static <T, R> R call(Function<T, R> function, T request, String requestName,boolean checkResponse) {
return call(function, request, requestName, checkResponse, false);
}
public static <T, R> R call(Function<T, R> function, T request, boolean checkResponse, boolean checkResponseCode) {
return call(function, request, request.getClass().getSimpleName(), checkResponse, checkResponseCode);
}
public static <T, R> R call(Function<T, R> function, T request, String requestName, boolean checkResponse,
boolean checkResponseCode) {
StopWatch stopWatch = new StopWatch();
R response = null;
try {
stopWatch.start();
response = function.apply(request);
stopWatch.stop();
if (checkResponse) {
Assert.notNull(response, REMOTE_CALL_RESPONSE_IS_NULL.name());
if (!isResponseValid(response)) {
logger.error("Response Invalid on Remote Call request {} , response {}",
JSON.toJSONString(request),
JSON.toJSONString(response));
throw new RemoteCallException(JSON.toJSONString(response), REMOTE_CALL_RESPONSE_IS_FAILED);
}
}
if (checkResponseCode) {
Assert.notNull(response, REMOTE_CALL_RESPONSE_IS_NULL.name());
if (!isResponseCodeValid(response)) {
logger.error("Response code Invalid on Remote Call request {} , response {}",
JSON.toJSONString(request),
JSON.toJSONString(response));
throw new RemoteCallException(JSON.toJSONString(response), REMOTE_CALL_RESPONSE_IS_FAILED);
}
}
} catch (IllegalAccessException | InvocationTargetException e) {
logger.error("Catch Exception on Remote Call :" + e.getMessage(), e);
throw new IllegalArgumentException("Catch Exception on Remote Call " + e.getMessage(), e);
} catch (Throwable e) {
logger.error("request exception {}", JSON.toJSONString(request));
logger.error("Catch Exception on Remote Call :" + e.getMessage(), e);
throw e;
} finally {
if (logger.isInfoEnabled()) {
logger.info("## Method={} ,## 耗时={}ms ,## [请求报文]:{},## [响应报文]:{}", requestName,
stopWatch.getTotalTimeMillis(),
JSON.toJSONString(request), JSON.toJSONString(response));
}
}
return response;
}
private static <R> boolean isResponseValid(R response)
throws IllegalAccessException, InvocationTargetException {
Method successMethod = null;
Method[] methods = response.getClass().getMethods();
for (Method method : methods) {
String methodName = method.getName();
if (SUCCESS_CHECK_METHOD.contains(methodName)) {
successMethod = method;
break;
}
}
if (successMethod == null) {
return true;
}
return (Boolean) successMethod.invoke(response);
}
private static <R> boolean isResponseCodeValid(R response)
throws IllegalAccessException, InvocationTargetException {
Method successMethod = null;
Method[] methods = response.getClass().getMethods();
for (Method method : methods) {
String methodName = method.getName();
if (SUCCESS_CODE_METHOD.contains(methodName)) {
successMethod = method;
break;
}
}
if (successMethod == null) {
return true;
}
return SUCCESS_CODE.contains(successMethod.invoke(response));
}
}
使用示例
/**
* 支付成功
* <pre>
* 正常支付成功:
* 1、查询订单状态
* 2、推进订单状态到支付成功
* 3、商品库存真正扣减
* 4、创建持有的商品
* 5、推进支付状态到支付成功
* 6、持有的商品上链
*
* 支付幂等成功:
* 1、查询订单状态
* 2、推进支付状态到支付成功
*
* 重复支付:
* 1、查询订单状态
* 2、创建退款单
* 3、重试退款直到成功
* </pre>
*/
@GlobalTransactional(rollbackFor = Exception.class)
public boolean paySuccess(PaySuccessEvent paySuccessEvent) {
PayOrder payOrder = payOrderService.queryByOrderId(paySuccessEvent.getPayOrderId());
if (payOrder.isPaid()) {
return true;
}
SingleResponse<TradeOrderVO> response = orderFacadeService.getTradeOrder(payOrder.getBizNo());
TradeOrderVO tradeOrderVO = response.getData();
OrderPayRequest orderPayRequest = getOrderPayRequest(paySuccessEvent, payOrder);
OrderResponse orderResponse = RemoteCallWrapper.call(req -> orderFacadeService.paySuccess(req), orderPayRequest, "orderFacadeService.pay", false);
//如果订单已经被其他支付推进到支付成功,或者已经关单,则启动退款流程
if (needChargeBack(orderResponse)) {
log.info("order already paid ,do chargeback ," + payOrder.getBizNo());
Boolean result = payOrderService.paySuccess(paySuccessEvent);
Assert.isTrue(result, () -> new BizException(PayErrorCode.PAY_SUCCESS_NOTICE_FAILED));
doChargeBack(paySuccessEvent, tradeOrderVO);
return true;
}
if (!orderResponse.getSuccess()) {
log.error("orderFacadeService.pay error, response = {}", JSON.toJSONString(orderResponse));
return false;
}
///confirmSale 被废弃,详Service.confirmSale
///GoodsSaleRequest goodsSaleRequest = getGoodsSaleRequest(tradeOrderVO);
///GoodsSaleResponse goodsSaleResponse = RemoteCallWrapper.call(req -> goodsFacadeService.confirmSale(req), goodsSaleRequest, "goodsFacadeService.confirmSale");
GoodsSaleRequest goodsSaleRequest = getGoodsSaleRequest(tradeOrderVO);
GoodsSaleResponse goodsSaleResponse = RemoteCallWrapper.call(req -> goodsFacadeService.paySuccess(req), goodsSaleRequest, "goodsFacadeService.confirmSale");
switch (tradeOrderVO.getGoodsType()) {
case COLLECTION:
//只有商品需要在支付成功后立即上链
TransactionHookManager.registerHook(new PaySuccessTransactionHook(goodsSaleResponse.getHeldCollectionId()));
break;
default:
//do nothing
}
Boolean result = payOrderService.paySuccess(paySuccessEvent);
Assert.isTrue(result, () -> new BizException(PayErrorCode.PAY_SUCCESS_NOTICE_FAILED));
return true;
}
@EventListener(value = BlindBoxOpenEvent.class)
@Async("blindBoxListenExecutor")
public void onApplicationEvent(BlindBoxOpenEvent event) {
Long blindBoxItemId = (Long) event.getSource();
//查询出更新后的最新值,避免后续 cas 操作失败
BlindBoxItem blindBoxItem = blindBoxItemService.getById(blindBoxItemId);
//创建heldCollection
HeldCollectionCreateRequest heldCollectionCreateRequest = getHeldCollectionCreateRequest(blindBoxItem);
var heldCollection = heldCollectionService.create(heldCollectionCreateRequest);
Assert.notNull(heldCollection, () -> new BlindBoxException(BLIND_BOX_OPEN_FAILED));
//上链
ChainProcessRequest chainProcessRequest = getChainProcessRequest(blindBoxItem, heldCollection);
//如果失败了,则依靠定时任务补偿
ChainProcessResponse<ChainOperationData> response = RemoteCallWrapper.call(req -> chainFacadeService.mint(req), chainProcessRequest, "mint");
//修改盲盒状态
if (response.getSuccess()) {
blindBoxItem.openSuccess();
var saveResult = blindBoxItemService.updateById(blindBoxItem);
Assert.isTrue(saveResult, () -> new BlindBoxException(BLIND_BOX_ITEM_SAVE_FAILED));
}
}
@Override
@Facade
public CollectionAirdropResponse airDrop(CollectionAirDropRequest request) {
//检查用户是否可被空投,这里比较简单,后续如果节点比较多,可以改成责任链
UserQueryRequest userQueryRequest = new UserQueryRequest(Long.valueOf(request.getRecipientUserId()));
UserQueryResponse<UserInfo> userQueryResponse = userFacadeService.query(userQueryRequest);
checkUser(userQueryResponse);
//检查商品是否可被空投,这里比较简单,后续如果节点比较多,可以改成责任链
Collection collection = collectionService.queryById(request.getCollectionId());
checkCollection(collection,request.getQuantity());
CollectionAirdropResponse response = collectionService.airDrop(request, collection);
//执行失败或幂等成功,则直接返回,不用调上链操作了
if (!response.getSuccess() || response.getResponseCode().equals(DUPLICATED.name())) {
return response;
}
for (HeldCollectionVO heldCollection : response.getHeldCollections()) {
ChainProcessRequest chainProcessRequest = new ChainProcessRequest();
chainProcessRequest.setRecipient(userQueryResponse.getData().getBlockChainUrl());
chainProcessRequest.setClassId(String.valueOf(heldCollection.getCollectionId()));
chainProcessRequest.setClassName(heldCollection.getName());
chainProcessRequest.setSerialNo(heldCollection.getSerialNo());
chainProcessRequest.setBizId(heldCollection.getId().toString());
chainProcessRequest.setBizType(ChainOperateBizTypeEnum.HELD_COLLECTION.name());
chainProcessRequest.setIdentifier(UUID.randomUUID().toString());
//如果失败了,则依靠定时任务补偿
ChainProcessResponse<ChainOperationData> chainProcessResponse = RemoteCallWrapper.call(req -> chainFacadeService.mint(req), chainProcessRequest, "mint");
}
response.setSuccess(response.getSuccess());
return response;
}
自定义注解
public @interface Facade {
}
切面
/**
* Facade的切面处理类,统一统计进行参数校验及异常捕获
*/
@Aspect
@Component
@Order(Integer.MIN_VALUE)
public class FacadeAspect {
private static final Logger LOGGER = LoggerFactory.getLogger(FacadeAspect.class);
@Around("@annotation(cn.mifu.turbo.rpc.facade.Facade)")
public Object facade(ProceedingJoinPoint pjp) throws Exception {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
Object[] args = pjp.getArgs();
Class returnType = ((MethodSignature) pjp.getSignature()).getMethod().getReturnType();
//循环遍历所有参数,进行参数校验
for (Object parameter : args) {
try {
BeanValidator.validateObject(parameter);
} catch (ValidationException e) {
printLog(stopWatch, method, args, "failed to validate", null, e);
return getFailedResponse(returnType, e);
}
}
try {
// 目标方法执行
Object response = pjp.proceed();
enrichObject(response);
printLog(stopWatch, method, args, "end to execute", response, null);
return response;
} catch (Throwable throwable) {
// 如果执行异常,则返回一个失败的response
printLog(stopWatch, method, args, "failed to execute", null, throwable);
return getFailedResponse(returnType, throwable);
}
}
/**
* 日志打印
*
* @param stopWatch
* @param method
* @param args
* @param action
* @param response
*/
private void printLog(StopWatch stopWatch, Method method, Object[] args, String action, Object response,
Throwable throwable) {
try {
//因为此处有JSON.toJSONString,可能会有异常,需要进行捕获,避免影响主干流程
LOGGER.info(getInfoMessage(action, stopWatch, method, args, response, throwable), throwable);
// 如果校验失败,则返回一个失败的response
} catch (Exception e1) {
LOGGER.error("log failed", e1);
}
}
/**
* 统一格式输出,方便做日志统计
* <p>
* *** 如果调整此处的格式,需要同步调整日志监控 ***
*
* @param action 行为
* @param stopWatch 耗时
* @param method 方法
* @param args 参数
* @param response 响应
* @return 拼接后的字符串
*/
private String getInfoMessage(String action, StopWatch stopWatch, Method method, Object[] args, Object response,
Throwable exception) {
StringBuilder stringBuilder = new StringBuilder(action);
stringBuilder.append(" ,method = ");
stringBuilder.append(method.getName());
stringBuilder.append(" ,cost = ");
stringBuilder.append(stopWatch.getTime()).append(" ms");
if (response instanceof BaseResponse) {
stringBuilder.append(" ,success = ");
stringBuilder.append(((BaseResponse) response).getSuccess());
}
if (exception != null) {
stringBuilder.append(" ,success = ");
stringBuilder.append(false);
}
stringBuilder.append(" ,args = ");
stringBuilder.append(JSON.toJSONString(Arrays.toString(args)));
if (response != null) {
stringBuilder.append(" ,resp = ");
stringBuilder.append(JSON.toJSONString(response));
}
if (exception != null) {
stringBuilder.append(" ,exception = ");
stringBuilder.append(exception.getMessage());
}
if (response instanceof BaseResponse) {
BaseResponse baseResponse = (BaseResponse) response;
if (!baseResponse.getSuccess()) {
stringBuilder.append(" , execute_failed");
}
}
return stringBuilder.toString();
}
/**
* 将response的信息补全,主要是code和message
*
* @param response
*/
private void enrichObject(Object response) {
if (response instanceof BaseResponse) {
if (((BaseResponse) response).getSuccess()) {
//如果状态是成功的,需要将未设置的responseCode设置成SUCCESS
if (StringUtils.isEmpty(((BaseResponse) response).getResponseCode())) {
((BaseResponse) response).setResponseCode(ResponseCode.SUCCESS.name());
}
} else {
//如果状态是成功的,需要将未设置的responseCode设置成BIZ_ERROR
if (StringUtils.isEmpty(((BaseResponse) response).getResponseCode())) {
((BaseResponse) response).setResponseCode(ResponseCode.BIZ_ERROR.name());
}
}
}
}
/**
* 定义并返回一个通用的失败响应
*/
private Object getFailedResponse(Class returnType, Throwable throwable)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException {
//如果返回值的类型为BaseResponse 的子类,则创建一个通用的失败响应
if (returnType.getDeclaredConstructor().newInstance() instanceof BaseResponse) {
BaseResponse response = (BaseResponse) returnType.getDeclaredConstructor().newInstance();
response.setSuccess(false);
if (throwable instanceof BizException bizException) {
response.setResponseMessage(bizException.getErrorCode().getMessage());
response.setResponseCode(bizException.getErrorCode().getCode());
} else if (throwable instanceof SystemException systemException) {
response.setResponseMessage(systemException.getErrorCode().getMessage());
response.setResponseCode(systemException.getErrorCode().getCode());
} else {
response.setResponseMessage(throwable.toString());
response.setResponseCode(ResponseCode.BIZ_ERROR.name());
}
return response;
}
LOGGER.error(
"failed to getFailedResponse , returnType (" + returnType + ") is not instanceof BaseResponse");
return null;
}
}
工具类
/**
* 参数校验工具
*
* @author Hollis
*/
public class BeanValidator {
private static Validator validator = Validation.byProvider(HibernateValidator.class).configure().failFast(true)
.buildValidatorFactory().getValidator();
/**
* @param object object
* @param groups groups
*/
public static void validateObject(Object object, Class<?>... groups) throws ValidationException {
Set<ConstraintViolation<Object>> constraintViolations = validator.validate(object, groups);
if (constraintViolations.stream().findFirst().isPresent()) {
throw new ValidationException(constraintViolations.stream().findFirst().get().getMessage());
}
}
}