[Node.js]简化学习everyauth的异步流程控制

2012-10-22

看了everyauth源码,第一次看解决异步流程问题的实现方法,感到很惊讶,为了更好地学习,实现了这个流程的简化版。

例子

先看看这段代码,功能是从本地文件读到一个url—请求这个url—把结果写入另一个文件。

var fs = require('fs'),
http = require('http');

fs.readFile('./url.txt', 'utf8', function (err,data) {
  http.get(data, function(res){
    var body = '';
    res.on('data', function(c){
      body += c;
    }).on('end', function(){
      fs.writeFile('./fetchResult', data + body, function(e) {
        if (e) console.log('error', e);
        else console.log('done');
      });
    });
  }).on('error', function(e){
    console.log(e);
  });
});

这段代码包括了三个步骤三个功能,但耦合在一起,可读性差,难以修改,对任意一部分修改或增加都要看完整坨代码才能找到,即时把每个callback都抽成一个变量,这一整个流程也是无法分离的。

改进版

对这种情况,everyauth使用了一种方法,可以把整个流程的实现代码写成这样:

engine
  .do('fetchHtml')
    .step('readUrl')
      .accepts('')
      .promises('url')
    .step('getHtml')
      .accepts('url')
      .promises('html')
    .step('saveHtml')
      .accepts('url html')
      .promises(null)

  .readUrl(function(){
    //read url from file
    ...
  })
  .getHtml(function(url){
    //send http request
    ...
  })
  .saveHtml(function(url, html){
    //save to file
    ...
  })

  .start('fetchHtml')

do是一串流水方法的开始,step指定每一个步骤对应的方法名,promises表示此步骤返回的变量名,accepts表示此步骤接受的参数(由前面step的方法提供的变量)。接下来是链式地实现每一个step的方法。

整个过程很清晰,程序的自我描述很好,把一段异步的流程按同步的方式写出来了。若要修改其中某个步骤,直接定位到某个步骤对应的方法就行,无需把整个流程的代码通读。若要增加步骤,也只需要在那些step流程上插入新的step然后实现具体方法,可以获取前面step提供的任何参数。

how it works

实现它用到四个对象:promise/step/sequence/engine

promise是基础,相信很多人熟悉它的概念,简单来说就是把多个callback放到一个promise对象里,在适当的地方通过这个对象调用这些callback。在这里的作用是:当step执行结束时,通知队列执行下一个step。具体地说就是把下一个step的函数保存到前一个step的promise里,前一个step完成任务时,带着数据回调下一个step进入执行。

step负责执行一个步骤,传入对应参数,并把执行结果(return值)按指定的promises名保存起来,以供下一个step使用。

sequence管理step链,让注册的step可以一步步往下执行。

engine是提供对外接口的对象,管理保存每一个do请求里的step和sequence,通过configurable配置自身的可动态添加的方法。

具体看代码:https://gist.github.com/3930621

var fs = require('fs'),
  http = require('http');

var Promise = function(values) {
  this._callbacks = [];
  this._errbacks = [];
  if (arguments.length > 0) {
    this.fulfill.apply(this, values);
  }
}
Promise.prototype = {
    callback: function(fn, scope) {
    	//已有values表示promise已fulfill,立即执行
      if (this.values) {
        fn.apply(scope, this.values);
        return this;
      }
      this._callbacks.push([fn, scope]);
      return this;
    }
  , errback: function(fn, scope) {
      if (this.err) {
        fn.call(scope, this.err);
        return this;
      }
      this._errbacks.push([fn, scope]);
      return this;
    }
  , fulfill: function () {
      if (this.isFulfilled || this.err) return;
      this.isFulfilled = true;
      var callbacks = this._callbacks;
      this.values = arguments;
      for (var i = 0, l = callbacks.length; i < l; i++) {
        callbacks[i][0].apply(callbacks[i][1], arguments);
      }
      return this;
    }
  , fail: function (err) {
      var errbacks = this._errbacks;
      for (var i = 0, l = errbacks.length; i < l; i++) {
        errbacks[i][0].call(errbacks[i][1], err);
      }
      return this;
    }
}

var Step = function(name, _engine) {
  this.name = name;
  this.engine = _engine;
}

Step.prototype = {
    exec : function(seq) {
      var args = this._unwrapArgs(seq)
        ,  promises = this.promises

      var ret = this.engine[this.name]().apply(this.engine, args);

      //若函数返回不是Promise,即是函数里直接返回值无异步操作。为流程一致,包装一个立即执行的promise
      ret = (ret instanceof Promise)
          ?  ret
          : this.engine.Promise([ret]);

      ret.callback(function() {
        var returned = arguments
          , vals = seq.values;
        //step执行结束后把返回值写入seq.values供下一个step使用
        if (promises !== null) promises.forEach( function (valName, i) {
          vals[valName] = returned[i];
        });
      })

      //加上默认的错误回调方法
      ret.errback(this.engine.errback(), this.engine);
      return ret;
    }
  , _unwrapArgs: function (seq) {
      if (!this.accepts) return [];
      return this.accepts.reduce( function (args, accept) {
      	//根据accept名取出对应变量
        args.push(seq.values[accept]);
        return args;
      }, []);
    }
}

var Sequence = function(name, engine) {
  this.name = name;
  this.engine = engine;
  this.stepNames = [];
  this.values = {};
}

Sequence.prototype = {
    _bind : function(priorPromise, nextStep) {
      var nextPromise = new Promise()
        , seq = this;

      priorPromise.callback( function () {
        var resultPromise = nextStep.exec(seq);
        resultPromise.callback(function () {
          nextPromise.fulfill();
        });
      });
      return nextPromise;
    }

  , start : function() {
      var steps = this.steps;
      var priorStepPromise = steps[0].exec(this);

      for (var i = 1, l = steps.length; i < l; i++) {
      	//绑定step链
        priorStepPromise = this._bind(priorStepPromise, steps[i]);
      }
      return priorStepPromise;
    }
}

Object.defineProperty(Sequence.prototype, 'steps', {
  get: function () {
      var allSteps = this.engine._steps;
      return this.stepNames.map(function (stepName) {
        return allSteps[stepName];
      })
  }
});

var engine = {
    configurable : function(name) {
      this[name] = function(setTo) {
        var k = '_' + name;
        if (arguments.length) {
          this[k] = setTo;
          return this;
        }
        return this[k];
      }
      return this;
    }
  , step : function(name) {
      var steps = this._steps
        ,  sequence = this._currSeq;

      sequence.stepNames.push(name);
      this._currentStep =
        steps[name] || (steps[name] = new Step(name, this));

      this.configurable(name);
      return this;
    }
  , accepts : function(input) {
      this._currentStep.accepts = input && input.split(/\s+/)  || null;
      return this;
    }
  , promises : function(output) {
      this._currentStep.promises = output && output.split(/\s+/)  || null;
      return this;
    }
  , do : function(name) {
      this._currSeq =
        this._stepSequences[name] || (this._stepSequences[name] = new Sequence(name, this));
      return this;
    }
  , Promise : function(values) {
      return values ? new Promise(values) : new Promise();
    }
  , _stepSequences: {}
  , _steps: {}

  , start : function(seqName) {
      var seq = this._stepSequences[seqName];
      seq.start();
    }
}

engine
  .configurable('errback')
  .errback(function(err) {
    console.log('errback', err);
  });

engine
  .do('fetchHtml')
    .step('readUrl')
      .accepts('')
      .promises('url')
    .step('getHtml')
      .accepts('url')
      .promises('html')
    .step('saveHtml')
      .accepts('url html')
      .promises(null)

  .readUrl(function(){
    var p = this.Promise();
    //url.txt保存了一个网址
    fs.readFile('./url.txt', 'utf8', function (err,data) {
      if (err) p.fail(err);
      else p.fulfill(data);
    });
    return p;
  })

  .getHtml(function(url){
    var p = this.Promise();
    http.get(url, function(res){
      var body = '';
      res.on('data', function(c){
        body += c;
      }).on('end', function(){
        p.fulfill(body);
      });
    }).on('error', function(err){
      p.fail(err)
    });
    return p;
  })

  .saveHtml(function(url, html){
    fs.writeFile('./fetchResult', url + html, function(e) {
      if (e) console.log('error', e);
      else console.log('done');
    });
  })

  .start('fetchHtml')
评论

*

*

2012年10月23日 9:15

如果不是特别复杂,逻辑变动不是非常频繁的,我一般都自己写个小函数:


/**
* Create the "next" function
*
* @param {Array} tasks
* @param {Number} index
* @param {Number} last
*/
var next = function(tasks, index, last) {

if (index == last) {

return tasks[index + 1];
}
else {

return function(data) {

var nextIndex = index + 1;
tasks[nextIndex](next(tasks, nextIndex, last), data);
};
}
};

/**
* Invoke functions in line.
*/
module.exports = function() {

var tasks = arguments,
last = tasks.length - 2;

tasks[0](next(tasks, 0, last));
};

这样来解决“意大利面”代码:


line(function(next) {

db.find(function(err, items) {

if (err) {
return;
}

next(items);
});
}, function(next, items) {

// codes

next()
}, functino() {

// codes
});

2012年10月23日 9:16

哎呀,不支持 pre 和 code 啊,缩进都木有了。

2012年10月25日 12:22

呵~wordpress默认不支持,没料到会在评论写代码~

2012年11月29日 14:33

正好前段时间看了一篇循序渐进的介绍用js实现promise的方法 https://raw.github.com/kriskowal/q/master/design/README.js