[Node.js]简化学习everyauth的异步流程控制
2012-10-22
看了everyauth源码,第一次看解决异步流程问题的实现方法,感到很惊讶,为了更好地学习,实现了这个流程的简化版。
例子
先看看这段代码,功能是从本地文件读到一个url—请求这个url—把结果写入另一个文件。
var fs = require('fs'), http = require('http'); fs.readFile('./url.txt', 'utf8', function (err,data) { http.get(data, function(res){ var body = ''; res.on('data', function(c){ body += c; }).on('end', function(){ fs.writeFile('./fetchResult', data + body, function(e) { if (e) console.log('error', e); else console.log('done'); }); }); }).on('error', function(e){ console.log(e); }); });
这段代码包括了三个步骤三个功能,但耦合在一起,可读性差,难以修改,对任意一部分修改或增加都要看完整坨代码才能找到,即时把每个callback都抽成一个变量,这一整个流程也是无法分离的。
改进版
对这种情况,everyauth使用了一种方法,可以把整个流程的实现代码写成这样:
engine .do('fetchHtml') .step('readUrl') .accepts('') .promises('url') .step('getHtml') .accepts('url') .promises('html') .step('saveHtml') .accepts('url html') .promises(null) .readUrl(function(){ //read url from file ... }) .getHtml(function(url){ //send http request ... }) .saveHtml(function(url, html){ //save to file ... }) .start('fetchHtml')
do是一串流水方法的开始,step指定每一个步骤对应的方法名,promises表示此步骤返回的变量名,accepts表示此步骤接受的参数(由前面step的方法提供的变量)。接下来是链式地实现每一个step的方法。
整个过程很清晰,程序的自我描述很好,把一段异步的流程按同步的方式写出来了。若要修改其中某个步骤,直接定位到某个步骤对应的方法就行,无需把整个流程的代码通读。若要增加步骤,也只需要在那些step流程上插入新的step然后实现具体方法,可以获取前面step提供的任何参数。
how it works
实现它用到四个对象:promise/step/sequence/engine
promise是基础,相信很多人熟悉它的概念,简单来说就是把多个callback放到一个promise对象里,在适当的地方通过这个对象调用这些callback。在这里的作用是:当step执行结束时,通知队列执行下一个step。具体地说就是把下一个step的函数保存到前一个step的promise里,前一个step完成任务时,带着数据回调下一个step进入执行。
step负责执行一个步骤,传入对应参数,并把执行结果(return值)按指定的promises名保存起来,以供下一个step使用。
sequence管理step链,让注册的step可以一步步往下执行。
engine是提供对外接口的对象,管理保存每一个do请求里的step和sequence,通过configurable配置自身的可动态添加的方法。
具体看代码:https://gist.github.com/3930621
var fs = require('fs'), http = require('http'); var Promise = function(values) { this._callbacks = []; this._errbacks = []; if (arguments.length > 0) { this.fulfill.apply(this, values); } } Promise.prototype = { callback: function(fn, scope) { //已有values表示promise已fulfill,立即执行 if (this.values) { fn.apply(scope, this.values); return this; } this._callbacks.push([fn, scope]); return this; } , errback: function(fn, scope) { if (this.err) { fn.call(scope, this.err); return this; } this._errbacks.push([fn, scope]); return this; } , fulfill: function () { if (this.isFulfilled || this.err) return; this.isFulfilled = true; var callbacks = this._callbacks; this.values = arguments; for (var i = 0, l = callbacks.length; i < l; i++) { callbacks[i][0].apply(callbacks[i][1], arguments); } return this; } , fail: function (err) { var errbacks = this._errbacks; for (var i = 0, l = errbacks.length; i < l; i++) { errbacks[i][0].call(errbacks[i][1], err); } return this; } } var Step = function(name, _engine) { this.name = name; this.engine = _engine; } Step.prototype = { exec : function(seq) { var args = this._unwrapArgs(seq) , promises = this.promises var ret = this.engine[this.name]().apply(this.engine, args); //若函数返回不是Promise,即是函数里直接返回值无异步操作。为流程一致,包装一个立即执行的promise ret = (ret instanceof Promise) ? ret : this.engine.Promise([ret]); ret.callback(function() { var returned = arguments , vals = seq.values; //step执行结束后把返回值写入seq.values供下一个step使用 if (promises !== null) promises.forEach( function (valName, i) { vals[valName] = returned[i]; }); }) //加上默认的错误回调方法 ret.errback(this.engine.errback(), this.engine); return ret; } , _unwrapArgs: function (seq) { if (!this.accepts) return []; return this.accepts.reduce( function (args, accept) { //根据accept名取出对应变量 args.push(seq.values[accept]); return args; }, []); } } var Sequence = function(name, engine) { this.name = name; this.engine = engine; this.stepNames = []; this.values = {}; } Sequence.prototype = { _bind : function(priorPromise, nextStep) { var nextPromise = new Promise() , seq = this; priorPromise.callback( function () { var resultPromise = nextStep.exec(seq); resultPromise.callback(function () { nextPromise.fulfill(); }); }); return nextPromise; } , start : function() { var steps = this.steps; var priorStepPromise = steps[0].exec(this); for (var i = 1, l = steps.length; i < l; i++) { //绑定step链 priorStepPromise = this._bind(priorStepPromise, steps[i]); } return priorStepPromise; } } Object.defineProperty(Sequence.prototype, 'steps', { get: function () { var allSteps = this.engine._steps; return this.stepNames.map(function (stepName) { return allSteps[stepName]; }) } }); var engine = { configurable : function(name) { this[name] = function(setTo) { var k = '_' + name; if (arguments.length) { this[k] = setTo; return this; } return this[k]; } return this; } , step : function(name) { var steps = this._steps , sequence = this._currSeq; sequence.stepNames.push(name); this._currentStep = steps[name] || (steps[name] = new Step(name, this)); this.configurable(name); return this; } , accepts : function(input) { this._currentStep.accepts = input && input.split(/\s+/) || null; return this; } , promises : function(output) { this._currentStep.promises = output && output.split(/\s+/) || null; return this; } , do : function(name) { this._currSeq = this._stepSequences[name] || (this._stepSequences[name] = new Sequence(name, this)); return this; } , Promise : function(values) { return values ? new Promise(values) : new Promise(); } , _stepSequences: {} , _steps: {} , start : function(seqName) { var seq = this._stepSequences[seqName]; seq.start(); } } engine .configurable('errback') .errback(function(err) { console.log('errback', err); }); engine .do('fetchHtml') .step('readUrl') .accepts('') .promises('url') .step('getHtml') .accepts('url') .promises('html') .step('saveHtml') .accepts('url html') .promises(null) .readUrl(function(){ var p = this.Promise(); //url.txt保存了一个网址 fs.readFile('./url.txt', 'utf8', function (err,data) { if (err) p.fail(err); else p.fulfill(data); }); return p; }) .getHtml(function(url){ var p = this.Promise(); http.get(url, function(res){ var body = ''; res.on('data', function(c){ body += c; }).on('end', function(){ p.fulfill(body); }); }).on('error', function(err){ p.fail(err) }); return p; }) .saveHtml(function(url, html){ fs.writeFile('./fetchResult', url + html, function(e) { if (e) console.log('error', e); else console.log('done'); }); }) .start('fetchHtml')
如果不是特别复杂,逻辑变动不是非常频繁的,我一般都自己写个小函数:
/**
* Create the "next" function
*
* @param {Array} tasks
* @param {Number} index
* @param {Number} last
*/
var next = function(tasks, index, last) {
if (index == last) {
return tasks[index + 1];
}
else {
return function(data) {
var nextIndex = index + 1;
tasks[nextIndex](next(tasks, nextIndex, last), data);
};
}
};
/**
* Invoke functions in line.
*/
module.exports = function() {
var tasks = arguments,
last = tasks.length - 2;
tasks[0](next(tasks, 0, last));
};
这样来解决“意大利面”代码:
line(function(next) {
db.find(function(err, items) {
if (err) {
return;
}
next(items);
});
}, function(next, items) {
// codes
next()
}, functino() {
// codes
});