从旧博客上转移文章:

目录:


使用RP思考,例子

让我们做些实际的事情,用一个真实世界的例子(而不是半成品的概念),一步一步指导你如何响应式的思考。在这个教程的最后,我们会构建一个真实可运行的代码,同时彻底了解这些代码背后的思想。

我选择了JavaScript和RxJS库来作为这个例子的语言和工具,原因是JS是我当前手上正在使用的语言,而Rx*家族的库在很多语言上都有(.net, Java, Scala, ...),无论你是用那个具体语言的库,你都能从这个教程里收益。

开发“Who to follow”推荐模块

在Twitter里你会看到如下的推荐关注人的小模块:

我们接下来将关注实现它的核心功能:

  • 初始化时,通过API读取账户数据,并显示3个作为推荐列表
  • 点“刷新”时,读取3个新的账户显示在推荐列表中
  • 当点击某个账户的右上角‘x’时,只清除这个账户,然后显示一个新的
  • 每行显示用户的头像和链接

我们可以忽略界面上的其他所有按钮和链接。并且,由于twitter最近关闭了他的用户数据API,我们的UI将基于Github的用户信息。接口定义在这里

教程的完整代码在这里,如果你想先偷看下的话。

请求和响应

你如何使用响应式编程的思想来解决这个需求呢?作为一个好的开始,你必须牢记响应式编程的口头禅:“(几乎)所有的东西都被当作流”。

让我们从最简单的需求开始:“初始化时,从API中读取3个用户数据”。这里没有什么特殊的东西,简单的三个步骤就可以完成:

  1. 发送请求
  2. 得到响应
  3. 渲染响应到页面上

所以,我们会把请求先作为一个基本的流。(也许你觉得把单独的请求作为流太过了,不过我们目标就是从基本的开始)

一开始,我们只需要一个请求,如果我们把它转化成一个数据流,那会是一个只有一个发射值的流(以后会有更多的请求,但目前我们只有一个),如下:

--a------|->

a代表了字符串 'https://api.github.com/users'

这是我们希望请求的URL组成的流,当一个请求发生的时候,它告诉我们两件事情:When和What。When:什么时候请求被执行?就是请求事件被发射的时候。What:怎么样的请求应该被发射?一个包含URL的字符串。

如何利用Rx*库创建这样一个单值的流简直是小菜一碟。以官方术语,流这个概念被定义为“可被观察的对象(Observable)” (Enix:因为实际就是个观察模式?),我发现这是个愚蠢的名字,我还会继续把他叫做流。

var requestStream = Rx.Observable.just('https://api.github.com/users');

当然,目前为止,这还只是一个包含字符串的流,没有绑定任何操作。所以,下一步就是想办法在某个值发射的时候,执行某些动作。这个可以通过订阅来完成。

requestStream.subscribe(function(requestUrl) {
  // execute the request
  jQuery.getJSON(requestUrl, function(responseData) {
    // ...
  });
}

注意到这段代码里面使用了jQuery的Ajax回调函数(假设你对此有一定的了解)来处理异步的请求。但是等一等,响应式编程不是就是处理异步数据流的么?对于请求对应的响应,难道不应该是一个在未来某个时间点出发的数据流么?好吧,概念上来说,我们可以这么写:

requestStream.subscribe(function(requestUrl) {
  // 执行请求
  var responseStream = Rx.Observable.create(function (observer) {
    jQuery.getJSON(requestUrl)
    .done(function(response) { observer.onNext(response); })
    .fail(function(jqXHR, status, error) { observer.onError(error); })
    .always(function() { observer.onCompleted(); });
  });

  responseStream.subscribe(function(response) {
    // 对响应做处理
  });
}

其中Rx.Observable.create()这个方法用来创建自定义流,并通知每个观察者(订阅者)数据事件(onNext())或者错误(onError())。我们所做的,只是封装了下jQuery里Ajax的Promise。等等,这是否意味着,Promise就是一个可被观察的对象(Observable)?

答对了。

Observable是符合Promise++规范的,在Rx里面有专门的方法把一个Promise转换成Observable:“var stream = Rx.Observable.fromPromise(promise)”。唯一的区别是Observable不兼容Promises/A+标准,但理论上是不会造成程序冲突的。Promise只是Observable只发射单值的特例而已。Observable或者说Rx的流,允许发射多值,返回多值。

所以,Observable至少和Promise一样强大,如果你喜欢Promise,那你一定更喜欢Observable能做的。

下面回到我们的例子,你可能已经发现,我们在一个订阅(subscribe() )里面调用了另外一个订阅。看上去和JS著名的回调地狱(callback hell)有点像。另外,你可能也注意到了responseStream的创建依赖于requestStream。而像你已经听说的一样,Rx的有非常简单的机制把一个流转换/创建出另外一个新的流。下面我们就来利用下Rx的这个机制。

这里我们会用到一个非常基础的函数map(f),它接受一个流A里面的每个值,调用函数f,然后产生一个新的流B。所以,对于我们的例子里request和response的流来说,我们可以把请求的URL和响应的Promise对应起来(Promise是Observable的特例)。

var responseMetastream = requestStream
  .map(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

我们创建了一个叫做“元流”的野兽:流的流。请不要恐慌,这只是以为着一个流的每一个发射的值都会变成另外一个新的流而已。和编程概念里面的指针概念类似,在我们的例子里,每个请求的URL会被指到一个包含相应结果的响应流上。

一个响应结果的元数据流看起来很让人疑惑。我们真实的需求是要一个包含所有响应结果的流,里面的每个值都是响应的JSON对象,而不是JSON对象的Promise。这时候我们就要用到一个新的方法Mr. Flatmapmap()的一个变种,通过把所有“分支”发射出的结果都发射到一个“主干”上,使结果“扁平化”。记住,这个扁平化的map并不是对map方法的一种修正,元数据并不是一种需要修正的错误,它只是Rx处理异步响应的一个工具而已。

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

很好,我们的响应流是由请求流定义的,以后如果我们有更多的事件,只需要在请求流中添加,对应的响应流就自动生成了。

requestStream:  --a-----b--c------------|->
responseStream: -----A--------B-----C---|->

(小写字母代表请求,大写字母是他们对应的响应)

现在,我们很简单就能根据响应流来渲染数据了

responseStream.subscribe(function(response) {
  // 渲染结果到页面
});

把上面所有的代码和到一起:

var requestStream = Rx.Observable.just('https://api.github.com/users');

var responseStream = requestStream
  .flatMap(function(requestUrl) {
    return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
  });

responseStream.subscribe(function(response) {
  // 渲染结果到页面
});

刷新按钮

我没有提到那个获取用户列表的API返回的JSON里面包含了100个用户的信息。API只允许我们传送位移信息,而不是页的大小。所以,我们一次拿了100个用户,但是只用到了3个,浪费了97个。我们暂时先不管这个问题,后面我们会看到如何缓存这些结果。

每次刷新的按钮被点击的时候,请求流应该会发射一个新的URL,这样我们就会获得一个新的响应。为了达成目标,我们需要2个东西:一个点击事件的流绑定在刷新按钮上(口头禅:任何事物都是流),我们的请求流必须变成依赖点击流。幸运的是,Rx提供了从页面事件监听器中创建流的方法。

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

由于点击事件中不包含任何请求URL的信息,我们要把每个点击映射到一个特定的URL。现在我们通过把点击流映射到一个有随机位移的请求URL来创建我们的请求流。

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

虽然我愚蠢的没有写任何自动测试,但是我还是发现我破坏了我在前一章里面创建的初始化页面的功能。3个用户信息只在我点击刷新按钮时出来,而不是刚打开页面就刷出来了。但是其实我需要的是两个表现的累加:发送请求当页面初始化或者当点击刷新按钮的时候。

当然,我们可以分别创建连个流来达成这个目标。

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

我们能不能把连个流合并成一个呢?归根结底他们做的是同样的事情啊。Rx提供了merge()方法。下面的示意图展示了这个方法的作用:

stream A: ---a--------e-----o----->
stream B: -----B---C-----D-------->
          vvvvvvvvv merge vvvvvvvvv
          ---a-B---C--e--D--o----->

后面的就简单了

var requestOnRefreshStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

var requestStream = Rx.Observable.merge(
  requestOnRefreshStream, startupRequestStream
);

还有更简洁的方式,不使用临时流

var requestStream = refreshClickStream
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  })
  .startWith('https://api.github.com/users');

这里面的 startWith() 方法做的事情和你猜的一样,不管你的流最后长成啥样,流的头上都包含了一个你给出的发射信号。但我还是违反了DRY原则,数据接口的API出现了两次。一个解决方案是把startWith()方法移动到前面,clickStream的后面,来触发一个初始的点击事件。

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

很好,如果回到“我愚蠢的没有写自动化测试”那段,你会发现这里唯一的增加代码就是startWith()方法。

建模推荐用户列表

直到现在为止,我们只接触到了通过subscribe()在UI上渲染结果流。现在随着刷新按钮的实现,我们遇到了新的问题:当你点击刷新按钮的时候,当前的3个建议用户没有被清除,而清除和新的用户信息渲染只会在HTTP响应之后才被执行。为了使页面看上去更顺畅,我们需要在点击刷新按钮时就马上清除当前的用户列表。

refreshClickStream.subscribe(function() {
  // clear the 3 suggestion DOM elements 
});

不,不,不要这么着急。这是个坏的做法,因为这样一来,我们就有两个订阅者同时对用户列表(DOM)做操作。而这明显违反了关注分离的设计原则。还记得响应式编程的口头禅么?

所以,让我把推荐用户列表本身也转换成一个流。流里面的每个发射值都是个包含用户信息的JSON对象。我们会为3个推荐用户分别创建3个相应的流。比如说第一个张这样:

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  });

其他的2个,“建议流2”和“建议流3”基本就是这个流的复制粘贴,这不符合DRY原则,但会使我们这个教程里的例子简单化,另外,这也是一个很好的练习机会让大家思考下如何在这个例子里避免重复。

前面的在响应流里面渲染结果的代码会被替换成在这个新的流里面订阅:

suggestion1Stream.subscribe(function(suggestion) {
  // render the 1st suggestion to the DOM
});

回到需求:“点击刷新时,清除当前的用户列表”。我们可以简单的把点击事件map到一个null的发射值,发射到上面的建议流里面。

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // 随机获得一个用户
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  );

渲染页面时,我们把null值理解为“没有数据,清除UI”。

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // 隐藏界面
  }
  else {
    // 显示第一个用户到界面
  }
});

从全景上来说,现在系统是这么工作的:

refreshClickStream: ----------o--------o---->
     requestStream: -r--------r--------r---->
    responseStream: ----R---------R------R-->   
 suggestion1Stream: ----s-----N---s----N-s-->
 suggestion2Stream: ----q-----N---q----N-q-->
 suggestion3Stream: ----t-----N---t----N-t-->

其中大写的N代表null。

作为这个模式的一个好处,我们可以在页面初始化时渲染null,通过调用startWith(null)。

var suggestion1Stream = responseStream
  .map(function(listUsers) {
    // get one random user from the list
    return listUsers[Math.floor(Math.random()*listUsers.length)];
  })
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

所以全局来看,就是这样的:

refreshClickStream: ----------o---------o---->
     requestStream: -r--------r---------r---->
    responseStream: ----R----------R------R-->   
 suggestion1Stream: -N--s-----N----s----N-s-->
 suggestion2Stream: -N--q-----N----q----N-q-->
 suggestion3Stream: -N--t-----N----t----N-t-->

关闭单个建议用户和缓存机制

我们还剩下最后一个功能要开发。每个推荐用户右上角的‘X’,点击它会关闭当前用户,刷新一个新的用户。第一反应,你可能会说,我们应该发送一个新的请求,当‘X’被点击时。

var close1Button = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
// 第二第三个用户的关闭按钮也如此

var requestStream = refreshClickStream.startWith('startup click')
  .merge(close1ClickStream) // 我们新增的代码
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

但这个代码并没有如我们预期的运行,点击关闭会刷新所有的用户,而不只是当前的那个。有好多种方式可以解决这个问题,为了使内容变得更有趣,我们选择重用已经获得的响应内容。如果你还记得的话,API每次会返回100个用户,我们只用了3个,还有97个可用的,完全没有必要去发新的请求。

再一次的,我们用流来思考问题。当“关闭1”点击事件被触发时,我们希望利用最近的被发出的在响应流里面的响应内容。从里面再随机获取一个。

    requestStream: --r--------------->
   responseStream: ------R----------->
close1ClickStream: ------------c----->
suggestion1Stream: ------s-----s----->

在Rx*里面,组合器函数combineLatest看起来符合我们的要求。它接受两个流A和B作为参数,当其中的任何一个流发射一个值的时候,这个函数组合两个流中最新的值a和b,发射值c = f(x,y),其中f是你给出的函数。看下图可能更容易理解:

stream A: --a-----------e--------i-------->
stream B: -----b----c--------d-------q---->
          vvvvvvvv combineLatest(f) vvvvvvv
          ----AB---AC--EC---ED--ID--IQ---->

其中f是我们给出的函数

我们可以把这个combineLatest函数应用到关闭按钮1的流和响应流上。任何时候,当关闭按钮被点击,我们得到了最近的用户数据响应然后在用户1的流上发布一个新的组合值。从另一方面来讲,combineLatest方法是对称的:当任何用户信息响应被更新时,他会把最近的关闭按钮1的事件一起作为一个组合值发布到用户1的流上。很有趣吧,这就允许我们把用户1的流简化如下:

var suggestion1Stream = close1ClickStream
  .combineLatest(responseStream,             
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

程序的拼图仍然缺少最后一块,combineLatest方法使用2个来源最近的值来做组合,但如何一个流里面空空如也,一个值都没有,这个方法就没办法发出任何新的值。如果你仔细观察前一个图示的话,当流A发出第一个值a的时候,是不会在新的流里产生值的。只有当流B也发出了第一个值b的时候,新的流才会产生值AB。

有很多方法能解决这个问题,我们用最简单的,在初始化的时候,模拟一个关闭事件。

var suggestion1Stream = close1ClickStream.startWith('startup click') // 新增代码
  .combineLatest(responseStream,             
    function(click, listUsers) {l
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);

 

结束语

收工,完整的代码如下

var refreshButton = document.querySelector('.refresh');
var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

var closeButton1 = document.querySelector('.close1');
var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
// and the same logic for close2 and close3

var requestStream = refreshClickStream.startWith('startup click')
  .map(function() {
    var randomOffset = Math.floor(Math.random()*500);
    return 'https://api.github.com/users?since=' + randomOffset;
  });

var responseStream = requestStream
  .flatMap(function (requestUrl) {
    return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
  });

var suggestion1Stream = close1ClickStream.startWith('startup click')
  .combineLatest(responseStream,             
    function(click, listUsers) {
      return listUsers[Math.floor(Math.random()*listUsers.length)];
    }
  )
  .merge(
    refreshClickStream.map(function(){ return null; })
  )
  .startWith(null);
// and the same logic for suggestion2Stream and suggestion3Stream

suggestion1Stream.subscribe(function(suggestion) {
  if (suggestion === null) {
    // hide the first suggestion DOM element
  }
  else {
    // show the first suggestion DOM element
    // and render the data
  }
});

你可以在如下地址找到可运行的代码:http://jsfiddle.net/staltz/8jFJH/48/

上面的代码虽然小,但是五脏俱全。它用关注分离的原则管理了多个事件的发生源,甚至建立了缓存机制。函数式编程的风格使代码看起来更“声明式”,而不是“命令式”。我们并没有给出一个执行命令的序列,而是通过不同流之间的对应关系来给出每个东西的定义。举例来说,通过Rx库,我们告诉计算机用户1的流是由关闭1的流与最近用户API响应里的一个用户组合而成的,并且当初始化或者点击刷新按钮的时候,这个流的值为null。

请注意到所有命令式的流程控制语句如:if,for,while完全没有被用到,同样没有用到的是典型的基于回调机制的JavaScript应用流程控制。你可以轻易的把上面代码中的if和else替换成Rx的方法filter(),如果你想的话。在响应式编程中,我们使用map,filter,scan,merge,combineLatest等一系列函数来在事件驱动的程序中控制流程。这个工具集能让你很方便的用更少的代码实现更强大的功能。