系统设计(2)

深渊向深渊呼唤

基于之前写的文章:https://my.oschina.net/u/257801/blog/3080141;在公司业务迭代过程中,该系统也重构了多个版本;就此,对系统的演变进行记录;

之前的流程很简单:

通过入参路由到具体的flow(有多个flow,每个flow由不同的handler组成)
               |
               |
               |
执行flow中的execute方法(flow负责调度每个handler,其实就是循环调用每个handler的process方法)
               |
               |
               |
执行完所有handler方法后,将结果以mq的形式调用上游系统;如果是同步返回的话,如同责任链模式,直接返回就行了;

但随着业务的发展,对handler有不同维度的新增

public enum HandlerType {
    /**
     * 当handler节点是异步请求外部系统,需接收mq返回的结果,根据结果判断接入到流程的节点被认为是接入节点
     */
    ACCESS("接入节点", 1),
    /**
     * 在每个flow初始化时定义的handler节点
     */
    PROCESS("流程节点", 2);
    private String desc;
    private int code;

    HandlerType(String desc, int code) {
        this.desc = desc;
        this.code = code;
    }
}
public enum FeatureType {
    NORMAL("普通节点", 1), // 根据初始化的顺序已经指定上游节点和下游节点
    OVER("结束节点", 2), // 指定节点是结束节点,如需异步请求外部系统节点,等待mq消息返回;或者最后一个节点
    CONDITION("条件节点", 3); // 需根据该功能节点产生的数据动态判断下个节点,或者是不是结束节点

    private String desc;
    private int code;

    FeatureType(String desc, int code) {
        this.desc = desc;
        this.code = code;
    }
}

我们为了不影响现有的功能handler节点同时,对功能节点handler进行封装在DefaultHandlerContext内;通过委托方式进行调用handler方法;

同时新增handler的静态特性和动态特性;

静态特性:当前handler是属于(普通节点/结束节点/条件节点)、(接入节点/流程节点); 动态特性:当前节点是条件节点时,执行条件语句,获取到下个handler节点的beanName;或者执行条件语句后,当前节点变成结束节点;

之前设计的flow是通过while循环调度每个handler执行的顺序,这种设计比较单一,扩展短板比较明显;我们对每个DefaultHandlerContext新增了两个节点,分别是prev和next节点;

普通节点:在初始化时即可指定上游节点和下游节点 结束节点:下游节点为DefaultHandlerContext.NULL_OBJECT 条件节点:通过执行条件语句判断下一个节点beanName

通过以上的方式构造了有状态的DefaultHandlerContext,而且影响内部每个handler的单一功能指责;

DefaultHandlerContext:

public class DefaultHandlerContext<T> extends AbstractHandlerContext {
    public static final DefaultHandlerContext NULL_OBJECT = new DefaultHandlerContext();

    private ProcessHandler<T> handler; // 功能handler
    private String beanName; // beanName
    protected FeatureType featureType = FeatureType.NORMAL; // 默认普通节点
    protected HandlerType handlerType = HandlerType.PROCESS; // 默认流程内节点
    protected DefaultHandlerContext prev; // 上一个节点
    protected DefaultHandlerContext next; // 下一个节点
    protected Function<T, String> conditionFun; // 条件节点判断方法

    ...
}

每个flow初始化时包含属于自身的handlerPipeLine,并将属于该flow中的DefaultHandlerContext按链路顺序压入;

// 默认初始化pipeLine,每个flow都有相同的start、terminate节点       
HandlerContextPipeLine pipeLine = new HandlerContextPipeLine(new DefaultHandlerContext(start), new DefaultHandlerContext(terminate, FeatureType.OVER));

// 条件节点,通过handler结果获取下个节点的beanName
private Function<Boolean, String> inTransitFun = (existOrder) -> existOrder ?
            "preMqSend" : "fraudQuery";

        pipeLine
                .add(new DefaultHandlerContext<>(HandlerUtil.getHandler("inTransitOrder"), inTransitFun)) // 条件节点, 条件语句inTransitFun
                .add(new DefaultHandlerContext<>(HandlerUtil.getHandler("fraudQuery"), FeatureType.OVER)) // 结束节点 —— 调用外部系统接口
                .add(new DefaultHandlerContext<>(HandlerUtil.getHandler("fraudReceiver"), fraudReceiverFun)) // 条件节点
                .add(new DefaultHandlerContext<>(HandlerUtil.getHandler("prePaymentQuery"), prePaymentFun))
                .add(new DefaultHandlerContext(HandlerUtil.getHandler("quotaMatch"))) // 普通节点
                .add(new DefaultHandlerContext<>(HandlerUtil.getHandler("discriOver"), discrFun))
                .add(new DefaultHandlerContext(HandlerUtil.getHandler("guaranteeQuery"), FeatureType.OVER))
                .add(new DefaultHandlerContext<>(HandlerUtil.getHandler("guaranteeReceive"), guaFun))
                .add(new DefaultHandlerContext(HandlerUtil.getHandler("preMqSend")));

HandlerPipeLine:

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

public class HandlerContextPipeLine {
    // 当前flow流程包含的所有DefaultHandlerContext
    private Map<String, DefaultHandlerContext> contextMap = new HashMap<String, DefaultHandlerContext>() {{
        // 包含NULL_OBJECT
        put(null, DefaultHandlerContext.NULL_OBJECT);
    }};

    // 根据beanName查找DefaultHandlerContext
    private Function<String, DefaultHandlerContext> handlerFun =
            (beanName) -> contextMap.getOrDefault(beanName, DefaultHandlerContext.NULL_OBJECT);

    // 流程中的开始节点
    private DefaultHandlerContext head;
    // 流程中的结束节点
    private DefaultHandlerContext end;

    /**
     * HandlerContextPipeLine 构造函数
     * @param head
     * @param end
     */
    public HandlerContextPipeLine(DefaultHandlerContext head, DefaultHandlerContext end) {
        this.head = head;
        this.end = end;

        contextMap.put(head.getBeanName(), head);
        contextMap.put(end.getBeanName(), end);
        head.setNext(end);
        end.setPrev(head);
    }

    /**
     * 新增节点时: 当前节点是普通节点时,设置当前节点的next节点为end
     * ----------上游节点是普通节点时,设置当前节点为上游节点的next节点
     *
     * @param handlerContext
     * @return
     */
    public HandlerContextPipeLine add(DefaultHandlerContext handlerContext) {
        DefaultHandlerContext preContext = end.getPrev();
        end.setPrev(handlerContext);
        if (handlerContext.getFeatureType() == FeatureType.NORMAL) {
            handlerContext.setNext(end);
        }

        handlerContext.setPrev(preContext);
        if (preContext.getFeatureType() == FeatureType.NORMAL) {
            preContext.setNext(handlerContext);
        }

        contextMap.put(handlerContext.getBeanName(), handlerContext);
        return this;
    }

    /**
     * 流程的开始节点
     *
     * @param reqParam
     * @param dataUtil
     */
    public void start(ReqParam reqParam, DataUtil dataUtil) {
        head.execute(reqParam, dataUtil, handlerFun);
    }

    /**
     * 根据beanName查询获得DefaultHandlerContext
     *
     * @param beanName
     * @param reqParam
     * @param dataUtil
     */
    public void access(String beanName, ReqParam reqParam, DataUtil dataUtil) {
        DefaultHandlerContext accessHandler = handlerFun.apply(beanName);
        accessHandler.execute(reqParam, dataUtil, handlerFun);
    }

    /**
     * mq接收数据接入流程
     *
     * @param beanName access name
     * @param func     要执行的func
     * @param reqParam
     * @param dataUtil
     */
    public void access(String beanName, Function<ReqParam, String> func, ReqParam reqParam, DataUtil dataUtil) {
        DefaultHandlerContext<ReqParam> handlerContext = new DefaultHandlerContext<>(func, beanName);
        handlerContext.execute(reqParam, dataUtil, handlerFun);
    }
}

AbstractHandlerContext类:链路调用,优化之前的while循环调用

@Data
@Slf4j
public abstract class AbstractHandlerContext<T> implements ProcessHandler<T> {
  ...
    /**
     * 执行DefaultHandlerContext
     * 1.当前DefaultHandlerContext是条件节点时,执行功能handler节点;以返回结果作为入参执行条件语句获得next handler的beanName;通过beanName获得next DefaultHandlerContext
     * 2.当前不是条件节点时,直接执行功能节点的方法即可
     * 3.如果下一个节点是NULL_OBJECT时,则当前节点为结束节点,返回即可
     * 4.如果next节点不是结束节点,则执行next节点的execute方法
     * @param reqParam
     * @param dataUtil
     * @param handlerFun
     */
    void execute(ReqParam reqParam, DataUtil dataUtil, Function<String, DefaultHandlerContext> handlerFun) {
        if (FeatureType.CONDITION == featureType) {
            next = handlerFun.apply(conditionFun.apply(this.process(reqParam)));
        } else {
            this.process(reqParam);
        }
        ...
        ...
        // 判断该handler处理完是否终结
        if (next == NULL_OBJECT) {
            return;
        }

        this.next.execute(reqParam, dataUtil, handlerFun);
    }

   ...
}

以上解释了现有系统架构的链路,首先初始化每个flow的所有DefaultContextHandler;并为每个功能DefaultContextHandler增添了静态/动态特性,改变了之前的调度模式,提高扩展能力;

接入节点:

接入节点换一种表达方式,可以称为虚拟节点,它承接着mq异步消息的接收,并对结果判断接入到流程中的具体节点,同时根据不同的flow,处理的逻辑也不尽相同;由此知道,接入的逻辑判断和具体的flow也有关联;每个flow对接入有自身的判断逻辑;但可把默认的接入写在基类中,个性化的,子类自己复写就好;

    Map<String, Map<String, Function>> accessMap = new HashMap<String, Map<String, Function>>() {{
        // 反欺诈
        put("fraudAccess",
                new HashMap<String, Function>() {{
                    put("pass", (obj) -> "fraudReceiver");
                }});

        // 担保策略
        put("guaPloyAccess",
                new HashMap<String, Function>() {{
                    put("pass", (obj) -> "guaranteeReceive");
                }});

        // 资方策略
        put("tenantAccess",
                new HashMap<String, Function>() {{
                    put("pass", (obj) -> "guaranteeQuery");
                }});

        // 额度匹配(决策、电审、提额、担保)
        put("matchAccess",
                new HashMap<String, Function>() {{
                    put("pass", (obj) -> "quotaMatch");
                    put("reject", (obj) -> "allSceneFinished");
                    put("cancel", (obj) -> "allSceneFinished");
                }});
    }};

    /**
     * 从mq接入流程内
     *
     * @param access
     * @param reqParam
     * @param type
     */
    @Override
    public void access(FlowAccess access, String type, ReqParam reqParam) {
        String funcKey = access.getBeanName();
        Function func = null;
        if (accessMap.containsKey(funcKey)) {
            func = accessMap.get(funcKey).get(type);
        } else if (access instanceof MatchQuotaAccess) {
            func = accessMap.get("matchAccess").get(type);
        }

        Assert.notNull(func, access.getBeanName() + "func should not null");
        pipeLine.access(funcKey, func, reqParam, getDataUtil());
    }

接入的类调用自身所属于的flow流程内的access方法,根据type类型pass/reject/cancel获取不同的条件语句;

调用pipeline中的access方法;组装接入节点DefaultHandlerContext;调用DefaultHandlerContext的execute方法;

因为接入节点内的handler是null,所以只需要执行接入节点中的条件语句获取到next节点的beanName,并通过beanName获取到该flow中的DefaultHandlerContext;由此接入流程内;

以上是系统重构后的状态,基本能满足当前业务,并支持一定程度上的扩展;

-------------------------- 问题点 ---------------------------

在重构的过程中遇到的问题,因为每个功能handler bean内都有个属性beanName;是通过实现BeanPostProcess方式进行创建代理注入的;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        if (bean.getClass().isAnnotationPresent(Handler.class)) {
            Object proxy = new Proxy(bean, beanName).proxy();
            HandlerUtil.processHandlerMap.put(beanName, (ProcessHandler) proxy);
            return proxy;
        }

        if (bean.getClass().isAnnotationPresent(Access.class)) {
            Object proxy = new Proxy(bean, beanName).proxy();
            HandlerUtil.accessMap.put(beanName, (FlowAccess) proxy);
            return proxy;
        }
        return bean;
    }

但在baseFlow实现InitializingBean接口中初始化HandlerPipeLine时,需通过beanName获取相应的handler代理类进行组装DefaultHandlerContext;所以在此之前需完成所有实现ProcessHandler接口的功能类实例化,并包含相应的(beanName, handlerProxy)集合;

之前通过在BaseFlow中@Resource注入了head和terminated功能节点实例化HandlerPipeLine;所以会优先初始化head和terminated,创建代理类;但并不能保证全量ProcessHandler接口实现类都已经初始化;这样会导致在组装DefaultHandlerContext时,handler是null;整个链条是无效的;

现在只是偷巧的解决;通过@Autowired方式去注入head和terminated;因为用的是类型注入,所以上下文会保证执行baseflow中的afterPropertiesSet方法时,将所有的processHandler接口实现类都初始化,以此解决上述问题;但并没有从spring加载的链路上去做根本解决;

2.在DefaultHandlerContext定义的泛型是和所包含的handler一致的;导致在扩展的时候出现一定的局限性,后续会想比较好的方式消除;

栏目