事件驱动下的状态模式

1 场景

一个业务如果包含多个阶段,而各个阶段之间又并非顺序执行,可能存在复杂的条件跳转。

此时,运用状态模式可以帮助我们理清业务逻辑,并发现一些逻辑缺陷。而基于事件驱动的模型,可以使得系统按照预先定义的方式,在状态机中扭转(进入某个状态时,触发什么事件)。

2 状态机

运用状态机可以描述许多系统的运作方式,因为许多系统的运作,无非是从一个状态由于某种事件的发生,在满足某种条件时迁移到另一种状态,同时做一些处理动作。

如上所述,一次完整的迁移包含:

其中,条件可以是单一的,也可以是组合条件;可以有多个分支,也可以认为条件是“非1即0”的;这些通通取决于应用场景。

3 状态机驱动器

定义好一个状态机之后,对于给定的一个事件,状态机就可以遍历所有的条件,当条件满足时执行动作,然后跳转到另一状态。然而,一个完整的业务处理过程包含多个状态迁移。这就需要一个驱动器来管理每一次的状态迁移。

由于状态的迁移是事件驱动的,于是很容易想到运用观察者模式来设计这样一个驱动器。于是,状态是Observable,由许多个Observer订阅,而每个Observer的任务是,当状态机因为某个事件而跳转到某个状态时,向状态机发送下一个事件的通知,直到状态机到达一个终止状态(在这一模型中,终止状态就是没有任何Observer订阅的状态)。

4 业务描述

有了驱动器以及在其驱动之下运行的状态机,就可以用状态机的语言来定义业务逻辑了。于是在系统的初始化阶段,需要初始化两个东西:状态机和驱动器。状态机中定义了一个个完整的状态迁移方式(源状态、事件、条件、动作、目标状态);驱动器定义各个状态之间的连接方式。

5 实现

状态机的测试代码:

/**
 * Created by chn on 16/5/11.
 */
public class TestFMSTable {

    @Test
    public void test () {

        // 初始化状态迁移表
        FSMTable fsmTable = new FSMTable(STATE.INIT);
        fsmTable.from(STATE.INIT)
        			.occur(EVENT.RECVED)
        			.when(null).action(()->System.out.println("resolving"))
        			.to(STATE.END);

        System.out.println("state = " + STATE.INIT);
        fsmTable.fire(EVENT.RECVED);
//        // resolve()
//        fsmTable.fire(EVENT.NOTIFY_RESOLVED);
//        // store()
//        fsmTable.fire(EVENT.NOTIFY_STORED);
//        // send()
//        fsmTable.fire(EVENT.NOTIFY_SENDED);
//        // log
//        fsmTable.fire(EVENT.NOTIFY_LOGED);
        // end

    }
}

状态机的定义:FSMTable.java

package fsm;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;

/**
 * Created by chn on 16/5/11.
 */
public class FSMTable {

	 // 状态机当前所处的状态
    private STATE state;
    /**
     * 用Map-Map-List容器维护状态的迁移规则
     */
    private Map<STATE, Map<EVENT, List<Transaction>>> transactions = Maps.newHashMap();

    public FSMTable(STATE init) {
        state = init;
    }

    public STATE getState() {
        return state;
    }

	 /**
	  * 用于添加一条迁移规则的流式API
	  * from(XX).when(XX).occur(XXX).action(XX).to(XX);
	  */
    Transaction newTrans;
    public FSMTable from(STATE from) {
        newTrans = new Transaction();
        newTrans.curState = from;
        return this;
    }
    public FSMTable occur(EVENT event) {
        newTrans.event = event;
        return this;
    }
    public FSMTable when(Condition condition) {
        newTrans.condition = condition;
        return this;
    }
    public FSMTable whenNOT(Condition condition) {
        newTrans.condition = not(condition);
        return this;
    }
    /**
     * 求Condition的反
     */
    public Condition not(final Condition c) {
        return new Condition() {
            public boolean test() {
                if(c==null) {
                    return false;
                }
                return !c.test();
            }
        };
    }

    public FSMTable action(Action a) {
        newTrans.action = a;
        return this;
    }
    public void to(STATE to) {
        newTrans.newState = to;
        save();
    }
    private void save() {
        getTransactions(newTrans.curState, newTrans.event).add(newTrans);
    }
    private List<Transaction> getTransactions(STATE curState, EVENT event) {
        Map fromCurState = transactions.get(curState);
        if(null == fromCurState) {
            fromCurState = Maps.newHashMap();
            fromCurState.put(event, Lists.newLinkedList());
            transactions.put(curState, fromCurState);
        }
        if(null == fromCurState.get(event)) {
            fromCurState.put(event, Lists.newLinkedList());
        }
        return (List<Transaction>) fromCurState.get(event);
    }

    /**
     * do something when event occurs
     * @param event
     */
    public void fire(EVENT event) {
        try {

            // 在state下,发生event时, 在不需要条件或条件为真时, 执行action并跳到newState, 等待下一个事件
            Map<EVENT, List<Transaction>> curState = transactions.get(state);
            if(curState == null) {
                // error log
                System.out.println("STATE "+state+" is not defined");
                return;
            }
            List<Transaction> curEvent = curState.get(event);
            if (curEvent == null) {
                System.out.println("EVENT "+event+" is not defined in STATE "+state);
                return;
            }

            List<Transaction> ts = this.transactions.get(state).get(event);
            if(ts == null) {
                System.out.println("STATE "+state+" EVENT "+event+" has no transaction");
                return;
            }
            for(Transaction t: ts) {
                if(t.condition == null || t.condition.test()) {
                    if(t.action!=null) {
                        t.action.exec();
                    }
                    state = t.newState;
                    break;
                }
            }
            System.out.println("state = " + state);
        }catch (Exception e) {
            e.printStackTrace(); // Action的异常由FSM来处理.
        }
    }


    /**
     *  Transaction
     *  封装了一个完整的迁移
     */
    private class Transaction {
        private STATE curState;
        private EVENT event;
        private Condition condition;
        private Action action;
        private STATE newState;

        @Override
        public boolean equals(Object obj) {
            Transaction t = (Transaction) obj;
            return t.curState == curState
                    && t.event == event;
        }

        @Override
        public int hashCode() {
            return curState.hashCode() ^ event.hashCode();
        }
    }

}

驱动器的定义:

import com.google.common.collect.Maps;
import fsm.*;
import observer.Observer;
import observer.Subject;

import java.util.Map;

/**
 * Created by chn on 16/5/11.
 */
public class EventDriver {

    FSMTable fsmTable = new FSMTable(STATE.INIT);
	 // 每个状态都对应一个Subject(即Observable实例),被一些Observer订阅(或监听),Observer决定下一个状态如何扭转
    Map<STATE, Subject> observers = Maps.newHashMap();
    // 观察者autoRun负责触发一个无事件的迁移
    Observer autoRun = new Observer() {
        public void update() {
            fsmTable.fire(EVENT.AUTO);
            observers.get(fsmTable.getState()).notifyObservers();
        }
    };
    private Condition canLog = null; // 为null的Condition表示该迁移将无条件地跳转

	 //根据业务规则初始化状态机的迁移表和事件的衔接规则
    public EventDriver() {
        initializeFSM();
        initializeObservers();
    }

    private void initializeFSM() {
        //初始化fsmTable
        // 正常流程
        fsmTable.from(STATE.INIT).occur(EVENT.RECVED).when(null).action(doResolve).to(STATE.AFTER_RESOLVE);

        fsmTable.from(STATE.AFTER_RESOLVE).occur(EVENT.AUTO).when(canStore).action(doStore).to(STATE.AFTER_STORE);
        fsmTable.from(STATE.AFTER_RESOLVE).occur(EVENT.AUTO).whenNOT(canStore).action(null).to(STATE.SEND);

        fsmTable.from(STATE.AFTER_STORE).occur(EVENT.AUTO).when(null).action(null).to(STATE.SEND);

        fsmTable.from(STATE.SEND).occur(EVENT.AUTO).when(canSend).action(doSend).to(STATE.AFTER_SEND);
        fsmTable.from(STATE.SEND).occur(EVENT.AUTO).when(canLog).action(doLog).to(STATE.LOG);
        fsmTable.from(STATE.SEND).occur(EVENT.AUTO).whenNOT(canLog).action(null).to(STATE.LOG);

        fsmTable.from(STATE.AFTER_SEND).occur(EVENT.AUTO).when(canLog).action(doLog).to(STATE.LOG);
        fsmTable.from(STATE.AFTER_SEND).occur(EVENT.AUTO).whenNOT(canLog).action(null).to(STATE.LOG);

        // ...
    }

    private void initializeObservers() {
        // 收到消息时, 启动状态机, 按照观察者模式扭转
        observers.put(STATE.INIT,
                new Subject().registerObserver(new Observer() {
                    public void update() {
                        fsmTable.fire(EVENT.RECVED);
                        observers.get(STATE.AFTER_RESOLVE).notifyObservers();
                    }
                }));
        observers.put(STATE.AFTER_RESOLVE, new Subject().registerObserver(autoRun));
        observers.put(STATE.AFTER_STORE, new Subject().registerObserver(autoRun));
        observers.put(STATE.SEND, new Subject().registerObserver(autoRun));
        observers.put(STATE.AFTER_SEND, new Subject().registerObserver(autoRun));
        observers.put(STATE.LOG, new Subject());
    }

	//启动一次业务
    public void start() {
        System.out.println("state = " + STATE.INIT);
        observers.get(STATE.INIT).notifyObservers();
    }


    /**
     * 一些依赖于具体业务的职能方法, 
     * 后续应当移动到相应的职能类中
     */
    MessageProcessor messageProcessor = new MessageProcessor(); // new违反DIP原则, 换成Spring注入
    LogWriter logWriter = new LogWriter();

    Action doResolve = new Action() {
        public void exec() {
            messageProcessor.resolve();
        }
    };

    Condition canStore = new Conditions()
            .add(new Condition() {
                public boolean test() {
                    return !Configs.degradeStoreMessage;
                }
            })
            .add(new Condition() {
                public boolean test() {
                    return Message.isCompleted(Context.msg);
                }
            });

    Action doStore = new Action() {
        public void exec() {
            messageProcessor.store();
        }
    };

    Condition canSend = new Conditions()
            .add(new Condition() {
                public boolean test() {
                    return !Configs.degradeSendMessage;
                }
            })
            .add(new Condition() {
                public boolean test() {
                    return Message.isCompleted(Context.msg);
                }
            });

    Action doSend = new Action() {
        public void exec() {
            messageProcessor.send();
        }
    };

    Action doLog = new Action() {
        public void exec() {
            logWriter.write();
        }
    };

}

在驱动器中定义好所有的规则之后,系统的启动就变得简单了,不再需要像TestFSMTable一样显式地触发一个个状态迁移,而是:

import org.junit.Test;

/**
 * Created by chn on 16/5/11.
 */
public class TestEventDriver {

    @Test
    public void test () {

        EventDriver eventDriver = new EventDriver();
        eventDriver.start();
    }
}

总结