事件驱动下的状态模式
copyright copyleft copywhatever
May 13, 2016
设计模式 • 敏捷开发
chng
事件驱动下的状态模式
1 场景
一个业务如果包含多个阶段,而各个阶段之间又并非顺序执行,可能存在复杂的条件跳转。
此时,运用状态模式可以帮助我们理清业务逻辑,并发现一些逻辑缺陷。而基于事件驱动的模型,可以使得系统按照预先定义的方式,在状态机中扭转(进入某个状态时,触发什么事件)。
2 状态机
运用状态机可以描述许多系统的运作方式,因为许多系统的运作,无非是从一个状态由于某种事件的发生,在满足某种条件时迁移到另一种状态,同时做一些处理动作。
如上所述,一次完整的迁移包含:
- 源状态 Source State
- 事件 Event
- 条件 Condition
- 执行动作 Action
- 目标状态 Destination State
其中,条件可以是单一的,也可以是组合条件;可以有多个分支,也可以认为条件是“非1即0”的;这些通通取决于应用场景。
3 状态机驱动器
定义好一个状态机之后,对于给定的一个事件,状态机就可以遍历所有的条件,当条件满足时执行动作,然后跳转到另一状态。然而,一个完整的业务处理过程包含多个状态迁移。这就需要一个驱动器来管理每一次的状态迁移。
由于状态的迁移是事件驱动的,于是很容易想到运用观察者模式来设计这样一个驱动器。于是,状态是Observable,由许多个Observer订阅,而每个Observer的任务是,当状态机因为某个事件而跳转到某个状态时,向状态机发送下一个事件的通知,直到状态机到达一个终止状态(在这一模型中,终止状态就是没有任何Observer订阅的状态)。
4 业务描述
有了驱动器以及在其驱动之下运行的状态机,就可以用状态机的语言来定义业务逻辑了。于是在系统的初始化阶段,需要初始化两个东西:状态机和驱动器。状态机中定义了一个个完整的状态迁移方式(源状态、事件、条件、动作、目标状态);驱动器定义各个状态之间的连接方式。
5 实现
状态机的测试代码:
/**
* Created by chn on 16/5/11.
*/
public class TestFMSTable {
@Test
public void test () {
// 初始化状态迁移表
FSMTable fsmTable = new FSMTable(STATE.INIT);
fsmTable.from(STATE.INIT)
.occur(EVENT.RECVED)
.when(null).action(()->System.out.println("resolving"))
.to(STATE.END);
System.out.println("state = " + STATE.INIT);
fsmTable.fire(EVENT.RECVED);
// // resolve()
// fsmTable.fire(EVENT.NOTIFY_RESOLVED);
// // store()
// fsmTable.fire(EVENT.NOTIFY_STORED);
// // send()
// fsmTable.fire(EVENT.NOTIFY_SENDED);
// // log
// fsmTable.fire(EVENT.NOTIFY_LOGED);
// end
}
}
状态机的定义:FSMTable.java
package fsm;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.List;
import java.util.Map;
/**
* Created by chn on 16/5/11.
*/
public class FSMTable {
// 状态机当前所处的状态
private STATE state;
/**
* 用Map-Map-List容器维护状态的迁移规则
*/
private Map<STATE, Map<EVENT, List<Transaction>>> transactions = Maps.newHashMap();
public FSMTable(STATE init) {
state = init;
}
public STATE getState() {
return state;
}
/**
* 用于添加一条迁移规则的流式API
* from(XX).when(XX).occur(XXX).action(XX).to(XX);
*/
Transaction newTrans;
public FSMTable from(STATE from) {
newTrans = new Transaction();
newTrans.curState = from;
return this;
}
public FSMTable occur(EVENT event) {
newTrans.event = event;
return this;
}
public FSMTable when(Condition condition) {
newTrans.condition = condition;
return this;
}
public FSMTable whenNOT(Condition condition) {
newTrans.condition = not(condition);
return this;
}
/**
* 求Condition的反
*/
public Condition not(final Condition c) {
return new Condition() {
public boolean test() {
if(c==null) {
return false;
}
return !c.test();
}
};
}
public FSMTable action(Action a) {
newTrans.action = a;
return this;
}
public void to(STATE to) {
newTrans.newState = to;
save();
}
private void save() {
getTransactions(newTrans.curState, newTrans.event).add(newTrans);
}
private List<Transaction> getTransactions(STATE curState, EVENT event) {
Map fromCurState = transactions.get(curState);
if(null == fromCurState) {
fromCurState = Maps.newHashMap();
fromCurState.put(event, Lists.newLinkedList());
transactions.put(curState, fromCurState);
}
if(null == fromCurState.get(event)) {
fromCurState.put(event, Lists.newLinkedList());
}
return (List<Transaction>) fromCurState.get(event);
}
/**
* do something when event occurs
* @param event
*/
public void fire(EVENT event) {
try {
// 在state下,发生event时, 在不需要条件或条件为真时, 执行action并跳到newState, 等待下一个事件
Map<EVENT, List<Transaction>> curState = transactions.get(state);
if(curState == null) {
// error log
System.out.println("STATE "+state+" is not defined");
return;
}
List<Transaction> curEvent = curState.get(event);
if (curEvent == null) {
System.out.println("EVENT "+event+" is not defined in STATE "+state);
return;
}
List<Transaction> ts = this.transactions.get(state).get(event);
if(ts == null) {
System.out.println("STATE "+state+" EVENT "+event+" has no transaction");
return;
}
for(Transaction t: ts) {
if(t.condition == null || t.condition.test()) {
if(t.action!=null) {
t.action.exec();
}
state = t.newState;
break;
}
}
System.out.println("state = " + state);
}catch (Exception e) {
e.printStackTrace(); // Action的异常由FSM来处理.
}
}
/**
* Transaction
* 封装了一个完整的迁移
*/
private class Transaction {
private STATE curState;
private EVENT event;
private Condition condition;
private Action action;
private STATE newState;
@Override
public boolean equals(Object obj) {
Transaction t = (Transaction) obj;
return t.curState == curState
&& t.event == event;
}
@Override
public int hashCode() {
return curState.hashCode() ^ event.hashCode();
}
}
}
驱动器的定义:
import com.google.common.collect.Maps;
import fsm.*;
import observer.Observer;
import observer.Subject;
import java.util.Map;
/**
* Created by chn on 16/5/11.
*/
public class EventDriver {
FSMTable fsmTable = new FSMTable(STATE.INIT);
// 每个状态都对应一个Subject(即Observable实例),被一些Observer订阅(或监听),Observer决定下一个状态如何扭转
Map<STATE, Subject> observers = Maps.newHashMap();
// 观察者autoRun负责触发一个无事件的迁移
Observer autoRun = new Observer() {
public void update() {
fsmTable.fire(EVENT.AUTO);
observers.get(fsmTable.getState()).notifyObservers();
}
};
private Condition canLog = null; // 为null的Condition表示该迁移将无条件地跳转
//根据业务规则初始化状态机的迁移表和事件的衔接规则
public EventDriver() {
initializeFSM();
initializeObservers();
}
private void initializeFSM() {
//初始化fsmTable
// 正常流程
fsmTable.from(STATE.INIT).occur(EVENT.RECVED).when(null).action(doResolve).to(STATE.AFTER_RESOLVE);
fsmTable.from(STATE.AFTER_RESOLVE).occur(EVENT.AUTO).when(canStore).action(doStore).to(STATE.AFTER_STORE);
fsmTable.from(STATE.AFTER_RESOLVE).occur(EVENT.AUTO).whenNOT(canStore).action(null).to(STATE.SEND);
fsmTable.from(STATE.AFTER_STORE).occur(EVENT.AUTO).when(null).action(null).to(STATE.SEND);
fsmTable.from(STATE.SEND).occur(EVENT.AUTO).when(canSend).action(doSend).to(STATE.AFTER_SEND);
fsmTable.from(STATE.SEND).occur(EVENT.AUTO).when(canLog).action(doLog).to(STATE.LOG);
fsmTable.from(STATE.SEND).occur(EVENT.AUTO).whenNOT(canLog).action(null).to(STATE.LOG);
fsmTable.from(STATE.AFTER_SEND).occur(EVENT.AUTO).when(canLog).action(doLog).to(STATE.LOG);
fsmTable.from(STATE.AFTER_SEND).occur(EVENT.AUTO).whenNOT(canLog).action(null).to(STATE.LOG);
// ...
}
private void initializeObservers() {
// 收到消息时, 启动状态机, 按照观察者模式扭转
observers.put(STATE.INIT,
new Subject().registerObserver(new Observer() {
public void update() {
fsmTable.fire(EVENT.RECVED);
observers.get(STATE.AFTER_RESOLVE).notifyObservers();
}
}));
observers.put(STATE.AFTER_RESOLVE, new Subject().registerObserver(autoRun));
observers.put(STATE.AFTER_STORE, new Subject().registerObserver(autoRun));
observers.put(STATE.SEND, new Subject().registerObserver(autoRun));
observers.put(STATE.AFTER_SEND, new Subject().registerObserver(autoRun));
observers.put(STATE.LOG, new Subject());
}
//启动一次业务
public void start() {
System.out.println("state = " + STATE.INIT);
observers.get(STATE.INIT).notifyObservers();
}
/**
* 一些依赖于具体业务的职能方法,
* 后续应当移动到相应的职能类中
*/
MessageProcessor messageProcessor = new MessageProcessor(); // new违反DIP原则, 换成Spring注入
LogWriter logWriter = new LogWriter();
Action doResolve = new Action() {
public void exec() {
messageProcessor.resolve();
}
};
Condition canStore = new Conditions()
.add(new Condition() {
public boolean test() {
return !Configs.degradeStoreMessage;
}
})
.add(new Condition() {
public boolean test() {
return Message.isCompleted(Context.msg);
}
});
Action doStore = new Action() {
public void exec() {
messageProcessor.store();
}
};
Condition canSend = new Conditions()
.add(new Condition() {
public boolean test() {
return !Configs.degradeSendMessage;
}
})
.add(new Condition() {
public boolean test() {
return Message.isCompleted(Context.msg);
}
});
Action doSend = new Action() {
public void exec() {
messageProcessor.send();
}
};
Action doLog = new Action() {
public void exec() {
logWriter.write();
}
};
}
在驱动器中定义好所有的规则之后,系统的启动就变得简单了,不再需要像TestFSMTable一样显式地触发一个个状态迁移,而是:
import org.junit.Test;
/**
* Created by chn on 16/5/11.
*/
public class TestEventDriver {
@Test
public void test () {
EventDriver eventDriver = new EventDriver();
eventDriver.start();
}
}
总结
- 针对一个复杂的业务流程,将其清晰地划分成多个子阶段,可以使得系统更为清晰易维护。
- 如果各子阶段是顺序执行的,那么简单的设计就可以满足需求。
- 如果各子阶段之间还有复杂的跳转规则,就需要更加灵活的框架提供支持。例如在上面这个消息中转的例子中,某些消息也许只存储不发送,有的只发送不存储;有时需要对存储降级,有时需要对发送降级,如果IO的负担较大,还可能要对Log降级。
- 如果系统是异步的,同时需要处理大量的业务,按照这一模型,则是每个进程包含一个状态机;如果整个业务流程包含两个异步的过程,例如消息是异步发送的,则需要考虑某些数据的线程封闭性。
- 这一模型除了解藕、便于系统的理解和加强灵活性优点之外,还带来了一些额外Bonus
- 职能模块抛出的一般性异常,都可以一直往上抛,在FSMTable中进行处理(只能做一般性处理,例如打日志)。当然,对于需要特殊处理的异常,还需职能模块自己处理(例如消息存储或发送失败后重试有限次)。
- 状态机统一管理着每一次状态迁移,因此系统的状态是可跟踪的,也便于业务监控和问题排查。