Rxjs小试
发布于 4 年前 作者 xia83 1944 次浏览 来自 分享

Rxjs简单入门,带你熟悉一些重要概念,一些常用操作符,然后用Rxjs实现一个贪吃蛇的案例

背景

在浏览器页面上,我们获取任何资源都是非同步的,比如说获取一个html页面,要先发送一个请求,然后等页面返回,再执行对这个页面的操作,这是一个非同步的行为。JavaScript也提供了一些异步的写法,如Callback、Promise以及async/await。但是随着页面需求的复杂化,我们编写异步代码还是比较困难。

比如说竞态条件:当我们发了一个请求更新某个资源,然后又立即发送另一个请求获取这个资源,这时第一个请求和第二个请求的先后顺序就会影响到接受结果的不同;

再比如说内存泄露,当我们给dom添加监听事件的时候,我们很有可能会忘记移除监听事件,而且如果移除事件,还要手动去调用removeEventListener方法;比较麻烦。

说到这里,我今天要给大家介绍一个能够帮助我们解决上述问题,能够帮助我们更好的处理异步行为的一个库,Rxjs。

简介

Rxjs 是Reactive Extensions for JavaScript 的缩写,是Reactive Extension (简称RX)的实现。对于什么是Rx,在Rx的官网上有这样一段介绍的文字: An API for asynchronous programming with observable streams. 翻译过来就是Rx是一套通过可监听流来做异步编程的API。所以Rxjs可以理解成处理异步行为的 Lodash

下面开始介绍Rxjs中一些比较重要的概念

核心功能介绍与使用

Observable和Observer

要理解 Rxjs ,先要理解两最重要的概念:Observable和Observer,可以说Rxjs的运行就是Observable和Observer之间的互动游戏。

每个Observable对象,代表的就是在一段时间内发生的一系列事件。Rxjs结合了观察者模式和迭代者模式,其中的Observable可以用下面的这种公式表示: Observable=Publisher+Iterator

下面的代码创建和使用了一个简单的Observable对象


import {Observable} from "rxjs";

const onSubscribe = observer => {

    let number = 1;

    const handle = setInterval(() => {

        observer.next(number++);

        if (number > 3) {

            clearInterval(handle)

            observer.complete()

        }

    }, 1000);

}

const source$ = new Observable(onSubscribe);

const theObserver = {

    next: item => console.log(item),

    error: err => console.log(err),

    complete: () => console.log('No More Data')

}

source$.subscribe(theObserver);

这段代码依次输出最小的三个正整数,结果如下:


1

2

3

No More Data

上述代码中创建了一个Observable(可以理解成一个事件流),它依次往下游传输1,2,3三个数据,当调用 complete() 方法时,会将这个流终结。而这个事件流的订阅者对象中, next 是接收事件流传输下来的数据的回调, complete 是事件流结束的回调, error 是事件流发生异常的回调。可以在Observable中调用 observer.error() 方法来抛出一个异常,注意,当抛出异常后,该时间流会直接结束。它会调用订阅者的 error 回调,但是不会调用 complete 回调

退订Observable

现在已经了解Observable和Observer之间如何建立关系,两者之间除了要通过subscribe建立关系,有时候还需要断开两者的关系。这就涉及一个退订的概念

修改上述代码,来实现一个退订的动作


const onSubscribe = observer => {

    let number = 1;

    const handle = setInterval(() => {

        observer.next(number++);

        if (number > 3) {

            clearInterval(handle)

            observer.complete()

        }

    }, 1000);

    return {

        unsubscribe: () => {

            clearInterval(handle);

        }

    }

}

const subscription = source$.subscribe(theObserver);

setTimeout(() => {

    subscription.unsubscribe();

})

在上面的代码中,subscribe函数返回一个subscription。在完成subscribe的3.5秒之后,调用unsubscribe函数会实现一个退订的动作。退订后,订阅者就不会接收发布者的消息了

Hot Observable和Cold Observable

Observable对象就是一个数据流,可以在一个时间范围内吐出一系列数据,如果只是存在一个Observer,一切都很简单,但是对于存在多个Observer的场景,情况就变得很复杂了。

假设有这样一个场景,一个Observable对象有两个Observer对象订阅。第一个订阅者在源数据流未开始发射数据流就订阅了,第二个订阅者在源数据流发射几个数据后才开始订阅。那么问题来了,第二个订阅者应该接收到“错过”的那些数据呢?

选择A:错过就错过了,只需要接受从订阅那一刻开始产生的数据
选择B:不能错过,需要接受订阅之前产生的数据

Rxjs也考虑到这种场景,让Observable支持这两种不同的需求。对于选择A,这样的Observable为Hot Observable;对于选择B,称为Cold Observable。上文中通过 new Observable() 创建的Observable就是cold observable,hot observable的创建在下文中的subject中会介绍到

高阶Observable

高阶Observable中的高阶并不是高级的意思,高阶Observable依然是一个Observable,只是数据的形式比较特殊。所谓的高阶Observable,指的是产生的数据依然是Observable。高阶Observable的创建在下文中的操作符会介绍到

操作符

基础的Observable和Observer可以解决我们日常一些普通的异步场景,如事件回调。但是涉及到一些比较复杂的异步场景是就需要花费一些功夫了。Rxjs提供了大量的操作符帮助我们解决各种复杂的异步场景。操作符就是解决某个具体应用问题的模式

如果把Observable数据流理解成一条河流,那操作符可以理解成河道中的水利阀门,它可以对上游流下来的水进行过滤、拦截、合并等等一些列操作

根据功能,操作符可以分为以下类别

创建类
转化类
过滤类
合并类
多播类
错误处理类
辅助工具类
条件分支类
数据和合计类

下面介绍几个常用的操作符,更多操作符请参考 https://rxjs.dev/api

Of

of操作符可以轻松创建指定数据集合的Observable对象


import {of} from "rxjs";

const source$ = of(1, 2, 3);

source$.subscribe(

    console.log,

    null,

    () => console.log('complete')

)

上述代码中的数据源会依次往下游吐出1,2,3三个数据

interval

interval类似setInterval的功能


import {interval} from "rxjs";

const source$ = interval(1000);

上述代码中的数据源会每隔一秒依次往下游吐出从0开始递增的数据

timer

timer类似setTimeout的功能


import {timer} from "rxjs";

const source$ = timer(1000);

上述代码的数据源会在1秒后吐出0这个数据

fromPromise、fromEvent

fromPromise会将一个promise转成一个Observable对象

fromEvent会将一个dom事件(如点击事件,resize事件)转成一个Observable对象


const source1$ = fromPromise(Promise.resolve('good'));

const source2$ = fromEvent(document.querySelector('#clickMe'), 'click');

map

map是最简单的一个转化类操作符,它可以将上游的数据转成一个新的数据,然后传给下游


const source$ = of(1).pipe(

    map(res => res + 1)

)

上述代码中,map操作符将上游的数据进行了加一操作

switchMap

switchMap会返回一个高阶的observable,并且只会订阅最新的Observable


const source2$ = fromEvent(document.querySelector('#clickMe') as any, 'click').pipe(

    switchMap(() => ajax('requestUrl'))

);

上述代码会在点击事件触发后发送一个ajax请求,如果第二次点击时,上一次的请求还未返回,那么上一次的请求订阅便会退订,然后订阅第二次点击触发的请求

Subject

Subject是一个hot observable。同时,Subject有observer的 nexterrorcomplete 方法,所以Subject间距Observable和Observer的性质


import {Subject} from "rxjs";

const subject = new Subject();

const subscription1 = subject.subscribe(

    val => console.log('on observer 1 data:', val),

    err => console.log('on observer 1 error:', err),

    () => console.log('on observer 1 complete')

);

subject.next(1);

subject.subscribe(

    val => console.log('on observer 2 data:', val),

    err => console.log('on observer 2 error:', err),

    () => console.log('on observer 2 complete')

);

subject.next(2);

subscription1.unsubscribe();

subject.complete()

代码运行结果如下


on observer 1 data: 1

on observer 1 data: 2

on observer 2 data: 2

on observer 2 complete

实践

介绍了Rxjs的一些重要概念以及一些常用的操作符,下面我们用Rxjs来实现一个贪吃蛇Demo

效果图

1、创建游戏区域

app.component.html


<canvas #canvas [width]="CANVAS_WIDTH" [height]="CANVAS_HEIGHT"></canvas>

app.component.ts


@ViewChild('canvas')

canvas!: ElementRef<HTMLCanvasElement>;

private ctx!: CanvasRenderingContext2D;

CANVAS_WIDTH = CANVAS_WIDTH;

CANVAS_HEIGHT = CANVAS_HEIGHT;

ngAfterViewInit(): void {

    this.ctx = this.canvas.nativeElement.getContext('2d')!;

 }

2、确定Observable数据流

首先我们要确认那些在游戏期间会不断发生变化的点,首先能确认的点有玩家的键盘事件、蛇移动的方向、蛇的长度、蛇的位置、苹果的位置、还有分数。

蛇移动的方向是由玩家控制上下左右衍生而来的数据流,所以直接用一个方向流来表示

当苹果被吃时,蛇的长度需要增加相应的值。而蛇的长度数据流是蛇的初始长度衍生而来的数据源,蛇的长度会在吃了苹果后,在蛇的初始长度上增加相应的值。而在苹果被吃后,蛇的初始长度数据流需要往下吐出一个长度的值,然后蛇的长度接受到上游留下的值后增加对应的长度。这里两个地方都需要用到蛇的初始长度,所以在这里将蛇的初始长度单独提取成一个长度数据流

游戏场景每次都是一起绘制蛇,苹果,分数,所以将这三个数据流合并成一个场景数据流

蛇移动的方向
蛇的初始长度
蛇的长度
蛇的位置
苹果的位置
分数
场景

2.1、蛇移动的方向


private keydown$ = fromEvent<KeyboardEvent>(document, 'keydown');

const direction$ = this.keydown$.pipe(

      map((event: KeyboardEvent) => DIRECTIONS[event.keyCode]), // 将用户输入的案件keycode转成对应的坐标点方向

      filter((direction) => !!direction), // 由于用户输入的可能不是上下左右,所以需要过滤无效的按键输入

      startWith(INITIAL_DIRECTION), // 给一个初始的方向

      scan(nextDirection), // 过滤掉用户输入对立方向的操作 (如:本来的行走方向是右,然后用户输入的左方向)

      distinctUntilChanged() // 当数据(方向)改变时,数据才往下流

 );

2.2、蛇的初始长度


const length$ = new BehaviorSubject<number>(SNAKE_LENGTH);//记录长度

2.3、蛇的长度

蛇的长度是由蛇的初始长度为初始值,然后在没吃掉一个苹果后增加相应的长度衍生而来的数据流


// 蛇的长度

const snakeLength$ = length$.pipe(

   scan((step, snakeLength) => snakeLength + step),// scan操作符类似es6中的reduce方法,此处是累加初始长度和每次吃掉苹果增加的长度

   share() // 因为蛇的长度数据流有多个消费者,所以该操作符将该数据流变成一个hot observable

);

2.4、蛇的坐标

蛇由于是不断移动的,他需要没n秒移动一次,这里我们设为200ms。所以蛇的坐标的数据源是一个每200ms产生一个数据的数据源,然后它需要根据蛇的初始坐标,以及方向还有蛇的长度来产生最终蛇的坐标


//记录蛇的坐标

const snake$ = interval(200).pipe( // 每200ms产生一个不断递增的数字

  withLatestFrom<number, Observable<Point2D>, Observable<number>, [Point2D, number]>

  (direction$, snakeLength$, (_, direction, snakeLength) => [direction, snakeLength]),// 每当上游产生数据时,才接受方向数据流,蛇长度数据流的数据,然后返回[direction, snakeLength]格式的数据

  scan(move, generateSnake()), // 根据蛇的初始位置和上游的数据计算对应的坐标,然后流向下游

  share()// 将该数据流变成一个hot observable

);

2.5、分数

分数的值是在蛇的长度在减去初始长度的基础上,蛇每增加一个长度,然后分数就增加对应的值


//分数

const score$ = snakeLength$.pipe(

  startWith(0), // 初始值为0

  scan((score, _) => score + POINTS_PER_APPLE) // 蛇没增加一次长度,分数就累加一次

);

2.6、场景

蛇,苹果,分数这三个数据流,其他两个数据流都是跟着蛇的数据流的节奏来产生相应变化的,所以用combineLatest操作符将这三个数据流合并起来


// 场景,包括蛇,苹果,分数

const scene$ = combineLatest([snake$, apples$, score$]).pipe(

  map(([snake, apples, score]) => ({snake, apples, score}))

);

主要的数据流我们都已经确定了。这里我们为了提高性能,引入了一个类似tick$流,目的是为了控制游戏画布多久渲染一次,这里我们将它定为1000/60,也就是每秒刷新60次


interval(1000 / FPS, animationFrameScheduler) // animationFrame 是rxjs提供的调度器,此调度器在 window.requestAnimationFrame触发时执行任务

接下来我们就需要将tick$流和场景流结合起来了


private game$ = of('Start Game').pipe(

   map(() => interval(1000 / FPS, animationFrameScheduler)),

   switchMap(this.createGame),

   takeWhile(scene => !isGameOver(scene))

 )

上述代码中的createGame是创建上述确定的主要的数据流,然后返回tick$流结合场景数据流

这里的game$数据流是一个cold observable,也就是每订阅一次,它的所有数据都会重新再产生一次,也就是说createGame中所创建的数据流都会重新创建一遍。这里这么做,是因为游戏结束后,重新开始游戏后,所有的数据流都要回复初始状态,所以重新创建一遍

此外游戏结束后,我们需要在用户点击游戏区域重新开始游戏


private startGame() {

    this.game$.subscribe({

      next: (scene) => renderScene(this.ctx, scene),

      complete: () => {

        renderGameOver(this.ctx)

        this.click$.pipe( // 游戏区域的点击事件

          first()// 这里只取游戏结束后第一次点击吐出的数据

        ).subscribe(() => {

          this.startGame()

        })

      }

    })

  }

这样,我们就完成了整个贪吃蛇

下面是主要代码

主要代码


	@ViewChild('canvas')

  canvas!: ElementRef<HTMLCanvasElement>;

  private ctx!: CanvasRenderingContext2D;

  CANVAS_WIDTH = CANVAS_WIDTH;

  CANVAS_HEIGHT = CANVAS_HEIGHT;

  private keydown$ = fromEvent<KeyboardEvent>(document, 'keydown');

  private click$!: Observable<Event>;

  private createGame = (fps$: Observable<number>) => {//传进来一个定时的流,也就是多少秒canvas刷新一次

    const direction$ = this.keydown$.pipe(

      map((event: KeyboardEvent) => DIRECTIONS[event.keyCode]), // 将用户输入的案件keycode转成对应的坐标点方向

      filter((direction) => !!direction), // 由于用户输入的可能不是上下左右,所以需要过滤无效的按键输入

      startWith(INITIAL_DIRECTION), // 给一个初始的方向

      scan(nextDirection), // 过滤掉用户输入对立方向的操作 (如:本来的行走方向是右,然后用户输入的左方向)

      distinctUntilChanged() // 当数据(方向)改变时,数据才往下流

    );

    const length$ = new BehaviorSubject<number>(SNAKE_LENGTH);//记录长度

    // 蛇的长度

    const snakeLength$ = length$.pipe(

      scan((step, snakeLength) => snakeLength + step),// 吃一个苹果,蛇增加对应的长度

      share()

    );

    //分数

    const score$ = snakeLength$.pipe(

      startWith(0), // 分数初始值为0

      scan((score, _) => score + POINTS_PER_APPLE) // 蛇的长度增加,分数也相应增加

    );

    //记录蛇的坐标

    const snake$ = interval(200).pipe( // 蛇0.2s移动一次

      withLatestFrom<number, Observable<Point2D>, Observable<number>, [Point2D, number]> 

      (direction$, snakeLength$, (_, direction, snakeLength) => [direction, snakeLength,]), // 0.2s获取一次方向,蛇的长度的数据

      scan(move, generateSnake()), // 根据方向,改变蛇的坐标

      share()

    );

    // 记录苹果的坐标

    const apples$ = snake$.pipe(

      scan(eat, generateApples()),  // 吃掉苹果会立马生成新的苹果

      distinctUntilChanged(), // 去数据和上一个数据不同时,数据才往下流

      share()

    );

    // 苹果被吃掉的流

    apples$

      .pipe(

        skip(1), // 跳过第一个条数据,也就是蛇吃掉苹果,生成新的苹果,数据才往下流

        tap(() => length$.next(POINTS_PER_APPLE)) // 吃掉一个苹果,蛇要增加相应的长度

      )

      .subscribe();

    // 场景,包括蛇,苹果,分数

    const scene$ = combineLatest([snake$, apples$, score$]).pipe(

      map(([snake, apples, score]) => ({snake, apples, score}))

    );

    return fps$.pipe(

      withLatestFrom(scene$, (_, scene) => scene)

    );

  }

  private game$ = of('Start Game').pipe(

    map(() => interval(1000 / FPS, animationFrameScheduler)),

    switchMap(this.createGame),

    takeWhile(scene => !isGameOver(scene)) // 当蛇头没有碰到蛇身体时,数据才往下流

  )

  private startGame() {

    this.game$.subscribe({

      next: (scene) => renderScene(this.ctx, scene), // 渲染场景,包括渲染蛇、分数、苹果、背景色

      complete: () => {

        renderGameOver(this.ctx) // 渲染游戏结束的场景

        this.click$.pipe(

          first() // 获取游戏结束后第一次点击事件

        ).subscribe(() => {

          this.startGame()  // 开始游戏

        })

      }

    })

  }

  ngAfterViewInit(): void {

    this.click$ = fromEvent(this.canvas.nativeElement, 'click'); // 监听canvas的点击事件

    this.ctx = this.canvas.nativeElement.getContext('2d')!;

    this.startGame()

  }

完整代码见 https://github.com/VincentChris/rxjs-snake

总结

总的来说,Rxjs使用数据流的方式来处理那些错综复杂的数据,能避免我们创建很多的状态变量,让我们的代码更加精简、优雅。同时它还提供了大量的操作符帮助我们解决各种复杂的场景,极大的提高我们的效率。所以掌握这门技能对于我们的日常开发还是很有帮助的。

由于篇幅有限,所以本篇文章只是带大家粗略的了解下Rxjs的基本概念以及实现一个demo,感兴趣的可以一起深入学习讨论。

1 回复
回到顶部