Dubbo: 升级到2.7.2后,怎么没有RpcResult类了?用什么代替呢?

Created on 12 Jun 2019  ·  12Comments  ·  Source: apache/dubbo

2.7.1版本还可以使用RpcResult类,2.7.2就没有这个类了,用什么代替?

Most helpful comment

本人亲测有效:具体代码如下
CompletableFuture b = new CompletableFuture ();
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(b,invocation);
asyncRpcResult.setValue(baseObject);
return asyncRpcResult;
可以返回成功,实现dubbo接口拦截,并自由组装返回值

All 12 comments

目前我用 AppResponse 这个类代替了。

2.7.1版本还可以使用RpcResult类,2.7.2就没有这个类了,用什么代替?

为什么会用到RpcResult类那?是类似Filter扩展中的强转吗

2.7.1版本还可以使用RpcResult类,2.7.2就没有这个类了,用什么代替?

为什么会用到RpcResult类那?是类似Filter扩展中的强转吗

是的。

package com.umltech.dubbo.provider.filter;

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUnit;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.setting.dialect.Props;
import com.alibaba.fastjson.JSON;
import com.umltech.dubbo.api.pojo.params.DidList;
import com.umltech.dubbo.api.pojo.params.Token;
import com.umltech.dubbo.api.pojo.params.VinList;
import com.umltech.dubbo.api.pojo.result.BaseResult;
import com.umltech.dubbo.provider.pojo.ErrorCode;
import com.umltech.dubbo.provider.pojo.InvokeInfo;
import com.umltech.dubbo.provider.pojo.RedisKey;
import com.umltech.dubbo.provider.util.CommonUtil;
import org.apache.dubbo.config.spring.extension.SpringExtensionFactory;
import org.apache.dubbo.rpc.*;
import org.springframework.context.ApplicationContext;
import sunyu.toolkit.redis.RedisClusterTool;

import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 调用拦截扩展
 * 用于在调用方法前,拦截,进行权限判断、并发判断、调用次数判断,以及保存一些信息
 *
 * @author 孙宇
 */
public class MyMethodFilter implements Filter {
    Props prop = new Props("application.properties");
    int searchBatch = prop.getInt("search.batch");
    List<String> needAuth = Arrays.asList(prop.getStr("need.auth").split(","));

    ApplicationContext context = null;
    CommonUtil commonUtil;
    RedisClusterTool redisClusterTool;

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        init();//初始化

        String concurrentKey = null;//接口并发量控制的 redis key
        InvokeInfo invokeInfo = new InvokeInfo();//调用信息
        Integer errCode = null;//错误编码
        String errMsg = null;//错误描述
        RpcContext context = RpcContext.getContext();//rpc上下文
        invokeInfo.setRemoteHost(context.getRemoteHost());//调用者的ip
        invokeInfo.setService(invoker.getUrl().getPath());//调用的接口
        invokeInfo.setMethodName(invocation.getMethodName());//调用的方法
        Object[] args = invocation.getArguments();//传入的参数
        Token token = null;//当前调用传入的token
        String interfaceMethod = invokeInfo.getService() + "#" + invokeInfo.getMethodName();//接口名称#方法名称
        invokeInfo.setArgs(args);
        Class<?>[] argTypes = invocation.getParameterTypes();//传入参数的类型
        invokeInfo.setArgTypes(argTypes);

        if (needAuth.contains(invokeInfo.getService())) {
            if (args.length > 0) {
                for (int i = 0; i < args.length; i++) {
                    if (argTypes[i].getTypeName().equals("com.umltech.dubbo.api.pojo.params.Token")) {
                        token = (Token) args[i];//要求token必须是第一个参数才行

                        //判断token是否被禁用
                        //interface:disabled hash类型,field:tokenId,value:禁用时间
                        if (redisClusterTool.hexists("interface:disabled", token.getTokenId())) {
                            errCode = ErrorCode.TOKEN_DISABLED.errorCode;
                            errMsg = ErrorCode.TOKEN_DISABLED.errorMessage;
                            break;
                        }

                        //判断token是否存在
                        if (MapUtil.isEmpty(commonUtil.getTokenDidVin(token))) {
                            errCode = ErrorCode.TOKEN_ID_NOT_EXISTS.errorCode;
                            errMsg = ErrorCode.TOKEN_ID_NOT_EXISTS.errorMessage;
                            break;
                        }
                    } else if (argTypes[i].getTypeName().equals("com.umltech.dubbo.api.pojo.params.VinList")) {
                        VinList paramsVinList = (VinList) args[i];
                        if (paramsVinList.size() > searchBatch) {
                            errCode = ErrorCode.SEARCH_BATCH_QUOTA.errorCode;
                            errMsg = String.format(ErrorCode.SEARCH_BATCH_QUOTA.errorMessage, searchBatch);
                        } else {
                            Map<String, String> tokenDidVinMap = commonUtil.getDidVinMap(token, paramsVinList);
                            for (Object vin : paramsVinList) {
                                if (!tokenDidVinMap.containsValue(vin)) {//如果token的对应关系里面,没有当前vin
                                    errCode = ErrorCode.NO_PERMISSION_QUERY.errorCode;
                                    errMsg = String.format(ErrorCode.NO_PERMISSION_QUERY.errorMessage, vin);
                                    break;
                                }
                            }
                        }
                    } else if (argTypes[i].getTypeName().equals("com.umltech.dubbo.api.pojo.params.DidList")) {
                        DidList paramsDidList = (DidList) args[i];
                        if (paramsDidList.size() > searchBatch) {
                            errCode = ErrorCode.SEARCH_BATCH_QUOTA.errorCode;
                            errMsg = String.format(ErrorCode.SEARCH_BATCH_QUOTA.errorMessage, searchBatch);
                        } else {
                            Map<String, String> tokenDidVinMap = commonUtil.getDidVinMap(token, paramsDidList);
                            for (Object did : paramsDidList) {
                                if (!tokenDidVinMap.containsKey(did)) {//如果token的对应关系里,没有当前did
                                    errCode = ErrorCode.NO_PERMISSION_QUERY.errorCode;
                                    errMsg = String.format(ErrorCode.NO_PERMISSION_QUERY.errorMessage, did);
                                    break;
                                }
                            }
                        }
                    }
                }
            }

            //判断配额以及并发量,更新调用次数
            if (errCode == null && token != null) {
                Integer[] quota = commonUtil.getQuota(token, interfaceMethod);//当前token,当前接口方法的配额[调用次数限制,并发量限制]

                if (quota[1].intValue() < 1) {
                    errCode = ErrorCode.INTERFACE_CONCURRENT_EXCESS.errorCode;
                    errMsg = ErrorCode.INTERFACE_CONCURRENT_EXCESS.errorMessage;
                }
                if (errCode == null) {//并发量配额大于0
                    //interface:concurrent:tokenId,key是hash类型,field为接口名#方法名,value为并发量incr
                    concurrentKey = RedisKey.interfaceConcurrentPre + token.getTokenId();
                    Integer concurrent = (Integer) redisClusterTool.hincrby(concurrentKey, interfaceMethod, 1);//并发量+1
                    if (concurrent > quota[1].intValue()) {//如果并发量大于配额限制
                        errCode = ErrorCode.INTERFACE_CONCURRENT_EXCESS.errorCode;
                        errMsg = ErrorCode.INTERFACE_CONCURRENT_EXCESS.errorMessage;
                    }
                }

                if (quota[0].intValue() < 1) {
                    errCode = ErrorCode.INTERFACE_QUOTA_INSUFFICIENT.errorCode;
                    errMsg = ErrorCode.INTERFACE_QUOTA_INSUFFICIENT.errorMessage;
                }
                if (errCode == null) {//调用次数配额大于0
                    //interface:call:yyyyMMdd:tokenId:appId,key是hash类型,field为接口名#方法名,value为调用次数incr,一天超时
                    String callKey = RedisKey.interfaceCallPre + new DateTime().toString("yyyyMMdd") + ":" + token.getTokenId();
                    if (StrUtil.isNotBlank(token.getAppId())) {
                        callKey += ":" + token.getAppId();
                    }
                    Integer calls = null;//调用次数
                    if (redisClusterTool.exists(callKey)) {
                        calls = (Integer) redisClusterTool.hincrby(callKey, interfaceMethod, 1);//调用次数+1
                    } else {
                        calls = (Integer) redisClusterTool.hincrby(callKey, interfaceMethod, 1, 1, TimeUnit.DAYS);//调用次数+1
                    }
                    if (calls > quota[0].intValue()) {//如果调用次数大于配额限制
                        errCode = ErrorCode.INTERFACE_QUOTA_INSUFFICIENT.errorCode;
                        errMsg = ErrorCode.INTERFACE_QUOTA_INSUFFICIENT.errorMessage;
                    }
                }
            }
        }


        Date startTime = new Date();//调用开始时间
        Result result = null;//返回数据
        if (errCode == null) {//如果没有发现错误
            try {
                result = invoker.invoke(invocation);//执行接口实现
            } catch (RpcException e) {
                Result rpcResult = new AppResponse();
                BaseResult r = new BaseResult();
                r.setCode(e.getCode());
                r.setMessage(e.getMessage());
                rpcResult.setValue(r);
                result = rpcResult;
            } finally {
                if (result == null) {
                    Result rpcResult = new AppResponse();
                    BaseResult r = new BaseResult();
                    r.setCode(ErrorCode.INVOKE_ERROR.errorCode);
                    r.setMessage(ErrorCode.INVOKE_ERROR.errorMessage);
                    rpcResult.setValue(r);
                    result = rpcResult;
                }
            }
        } else {//出现错误
            Result rpcResult = new AppResponse();
            BaseResult r = new BaseResult();
            r.setCode(errCode);
            r.setMessage(errMsg);
            rpcResult.setValue(r);
            result = rpcResult;
        }
        Date endTime = new Date();//调用结束时间
        if (StrUtil.isNotBlank(concurrentKey)) {
            Integer concurrent = (Integer) redisClusterTool.hincrby(concurrentKey, interfaceMethod, -1);//并发量-1
            invokeInfo.setConcurrent(concurrent);
        }
        BaseResult r = (BaseResult) result.getValue();
        r.setStartTime(startTime);
        r.setEndTime(endTime);
        r.setExecuteTime(DateUtil.between(startTime, endTime, DateUnit.MS));//执行了多少毫秒
        invokeInfo.setResult(r);

        //保存日志
        saveLog(invokeInfo);

        return result;
    }

    /**
     * 保存接口调用日志信息
     *
     * @param invokeInfo
     */
    private void saveLog(InvokeInfo invokeInfo) {
        //StaticLog.info("{}", JSON.toJSONString(invokeInfo, true));
        //list类型,interface:log接口日志
        redisClusterTool.rpush(RedisKey.interfaceLog, JSON.toJSONString(invokeInfo));
    }

    /**
     * dubbo的类中,不可以直接注入,需要使用ServiceBean方式进行获得才行
     */
    private void init() {
        if (context == null) {
            for (ApplicationContext c : SpringExtensionFactory.getContexts()) {
                if (c != null) {
                    context = c;
                    break;
                }
            }
        }
        if (commonUtil == null) {
            commonUtil = context.getBean(CommonUtil.class);
        }
        if (redisClusterTool == null) {
            redisClusterTool = context.getBean(RedisClusterTool.class);
        }
    }

}

2.7.2 中 extends ListenableFilter 来实现自定义 Filter 好一些

2.7.2 中 extends ListenableFilter 来实现自定义 Filter 好一些

啊,还有这种操作?有例子么?或者文档位置?

2.7.2 中 extends ListenableFilter 来实现自定义 Filter 好一些

啊,还有这种操作?有例子么?或者文档位置?

看看源码里ExceptionFilter

@89333367 在2.7.2版本中,为了更好的支持异步回调。RpcResult被替换成了AppResponse,而Filter链路上传递的对象变成了AsyncRpcResult,这个修改其实是需要用户明确理解的(主要是对扩展Filter的用户),所以选择删除RpcResult其中一个重要的目的也是为了让升级者明确的感知到以上变化的存在,防止误用。

换用 AppResponse 以后报这样的错

java.lang.UnsupportedOperationException: AppResponse represents an concrete business 
response, there will be no status changes, you should get internal values directly.

代码是这样的:

@Activate(group = CommonConstants.PROVIDER, value = "base_exception")
@Slf4j
public class CustomExceptionFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Result result = invoker.invoke(invocation);
        if (invoker.getInterface() == GenericService.class) {
            return result;
        }
        if (result.hasException()) {
            return this.handleException(result);
        }
        return result;
    }

    private Result handleException(Result result) {
        Throwable e = result.getException();
        if (e instanceof ValidationException) {
            ValidationException validationException = (ValidationException) e;
            PortalResponse response = PortalResponseHelper.fromBindingErrors(validationException.getResult());
            return new AppResponse(response);
        }
        return result;
    }

}

换用 AppResponse 以后报这样的错

java.lang.UnsupportedOperationException: AppResponse represents an concrete business 
response, there will be no status changes, you should get internal values directly.

@jswxwxf 前面说了 Filter链路上传递的对象变成了AsyncRpcResult, 所以修改返回值现在只需要result.setValue(response) 应该就好了

代码是这样的:

@Activate(group = CommonConstants.PROVIDER, value = "base_exception")
@Slf4j
public class CustomExceptionFilter implements Filter {

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        Result result = invoker.invoke(invocation);
        if (invoker.getInterface() == GenericService.class) {
            return result;
        }
        if (result.hasException()) {
            return this.handleException(result);
        }
        return result;
    }

    private Result handleException(Result result) {
        Throwable e = result.getException();
        if (e instanceof ValidationException) {
            ValidationException validationException = (ValidationException) e;
            PortalResponse response = PortalResponseHelper.fromBindingErrors(validationException.getResult());
            return new AppResponse(response);
        }
        return result;
    }

}

现在 2.7.X 之后有没有什么办法禁用原有 ExceptionFilter 只让 自定义的 CustomExceptionFilter 生效?

本人亲测有效:具体代码如下
CompletableFuture b = new CompletableFuture ();
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(b,invocation);
asyncRpcResult.setValue(baseObject);
return asyncRpcResult;
可以返回成功,实现dubbo接口拦截,并自由组装返回值

Was this page helpful?
0 / 5 - 0 ratings