Home > Software design >  RxJS - Share infinite stream produced by 'expand'
RxJS - Share infinite stream produced by 'expand'

Time:01-27

I have a paginated third-party resource living in a web service. What I want to do is turn that paginated resource into a stream of values, and let the client decide how many elements to use. That is, the client should not know that the original resource is paginated.

So far I got the following code:

import { from, Observable, of } from 'rxjs';
import { expand, mergeMap } from 'rxjs/operators';

interface Result {
  page: number;
  items: number[];
}

// Assume that this does a real HTTP request
function fetchPage(page: number = 0): Observable<Result> {
  let items = [...Array(3).keys()].map((e) => e   3 * page);
  console.log('Fetching page '   page);
  return of({ page, items });
}

// Turn a paginated request into an inifnite stream of 'number'
function mkInifiniteStream(): Observable<number> {
  return fetchPage().pipe(
    expand((res) => fetchPage(res.page   1)),
    mergeMap((res) => from(res.items))
  );
}

const infinite$ = mkInfiniteStream();

This works really well: I get a lazy infinite stream of numbers, and the client can just do infinite$.pipe(take(n)) and take the first n elements, without knowing that the underlying resource is paginated.

Now, what I want to do is to share those values when dealing with multiple subscribers, that is:

infinite$.pipe(take(10)).subscribe((v) => console.log('[1] got ', v));

// Assume that later we have new subscribers
setTimeout(() => {
  infinte$.pipe(take(5)).subscribe((v) => console.log('[2] got ', v));
}, 1000);

setTimeout(() => {
  infinte$.pipe(take(4)).subscribe((v) => console.log('[3] got ', v));
}, 1500);

As you can see, we'll have multiple subscribers to the infinite stream, and I want to reuse the values already emitted in order to reduce the number of fetchPage calls. In this example, once a client asked for 10 items (take(10)), then any client that requests less than 10 items (ex. 5 items) should result in no calls to fetchPage since those items were already emited.


I tried the following, but I could not get the desired behavior:

const infinite$ = mkInfiniteStream().pipe(share())

Does not work since late subscribers result in multiple calls to 'fetchPage'.

const infinite$ = mkInfiniteStream().pipe(shareReplay())

Forces all values in the stream even though they are not needed (no client asked for all items yet)


Any hint would be appreciated. In case anyone wants to try the code: https://stackblitz.com/edit/n4ywfw

CodePudding user response:

Since you want to the ahing of the http call,You have to move shareReplay inside your fetchPage method to make it work

return fetchPage().pipe(
 expand((res) => fetchPage(res.page   1)),
 mergeMap((res) => from(res.items)),
 shareReplay()
);

https://stackblitz.com/edit/n4ywfw-xkajmp?file=index.ts

Here is another example that i have updated for you, you can notice that the call was made only once

https://stackblitz.com/edit/angular-ivy-o9pjw6?file=src/app/app.component.ts

CodePudding user response:

It looks like we need to find a way to store some state, e.g. the last page number read and the items fetched, and make sure this state is replayed when new subscriptions arrive.

A possibility could be to work with closures this way.

First you create an infiniteStreamFactoryGenerator function which returns a function which, via closures, holds some state, specifically the lastPage and the itemsRead. Such state is updated within the function returned by the infiniteStreamFactoryGenerator function.

function infiniteStreamFactoryGenerator() {
  let lastPage = 0;
  let itemsRead = [];
  return () => {
    // if  there are items read we return them first and then we start
    // the infinite stream
    return concat(
      from(itemsRead),
      fetchPage(lastPage).pipe(
        expand((res) => {
          return res.page < 10 ? fetchPage(res.page   1) : EMPTY;
        }),
        tap((res) => {
          // here we update the state
          lastPage  ;
          itemsRead = [...itemsRead, ...res.items];
        }),
        mergeMap((res) => from(res.items))
      )
    );
  };
}

Then we invoke the infiniteStreamFactoryGenerator to create the real factory function and use such factory function to instantiate the various streams, like this

const infiniteStreamFactory$ = infiniteStreamFactoryGenerator();

infiniteStreamFactory$()
  .pipe(take(10))
  .subscribe((v) => console.log("[1] got ", v));

// Assume that later we have new subscribers
setTimeout(() => {
  infiniteStreamFactory$()
    .pipe(take(5))
    .subscribe((v) => console.log("[2] got ", v));
}, 1000);

setTimeout(() => {
  infiniteStreamFactory$()
    .pipe(take(4))
    .subscribe((v) => console.log("[3] got ", v));
}, 1500);

setTimeout(() => {
  infiniteStreamFactory$()
    .pipe(take(20))
    .subscribe((v) => console.log("[4] got ", v));
}, 2000);

As you see I have added also a fourth subscriber which requires to load additional pages.

According to this stackblitz this solution seems to work.

To be honest, the idea of having to build a factory generator function suggests me there may be a simpler way to solve this interesting problem, but I have not found it yet.

  •  Tags:  
  • Related