The magic of RXJS sharing operators and their differences

  • 2019-02-07 10:28 AM
  • 111

The magic of RXJS sharing operators and their differences

Before diving into sharing operators first we need to determinate what kind of observables are out there in RxJs. There are usually two kind of observables, hot and cold. There is a great article Hot vs Cold Observables, but in general the main difference is that

Notification producer in cold observables is created by the observable itself and only when observer subscribers to it.

For example interval() creates cold observable. Data is created within the observable and for each new subscription new interval will be created

Notification producer in hot observables is created outside of observable and regardless if there are subscribers or not

For example fromEvent() creates hot observable as notification producer is in DOM and exists regardless of number of subscribers.

Sometimes we need to make cold observable behave as hot, for example with http requests. Consider following http request example in Angular

ngOnInit() {
   this.user$ = this.http.get(`api/user/1`)

   this.name$ = this.user$.pipe(
      map(user => user.name)
   );

   this.age$ = this.user$.pipe(
      map(user => user.age)
   );

}

and we are displaying user’s name and age in the template using async pipe (assume in different places so wrapping it in one async pipe is not possible)

<div>{{name$ | async}}</div>
<div>{{age$ | async}}</div>

In browser’s network tab we will see two requests. The reason is that Angular’s Http creates cold observable so each new subscriber is equal to new request. We definitely don’t want to have several requests. First let’s solve this issue and then see how it works.

Solving this actually is really easy. All we need to do is to add share() or publish(), refCount() like so

this.user$ = this.http.get(`api/user/1`).pipe(
  share()
);

//or 

this.user$ = this.http.get(`api/user/1`).pipe(
 publish(),
 refCount()
);

And now in network tab we have one request because the data was shared among all subscribers. So how does share() or publish() magically solve this problem and what is the difference between them if they both do the same thing ?

TL;DR share(), publish() and other multicasting operators make cold observables behave as hot

In order to understand how sharing operators work we need to understand what is multicasting.

Hot observables are multicast as all subscribers get data from the same producer. Cold observables are unicast as each subscriber gets data from different producer.

multicast()

RxJs has multicast() operator which takes subject or subject factory and returns ConnectableObservable . The subject passed as argument acts like a middleman in a multicast observable. It just relays data from source observable to all subscribers. ConnectableObservable is a regular observable, but it won’t subscribe to the source until connect() method is called. Let’s change the example above with multicast() to understand how it works.

this.user$ = this.http.get(`api/user/1`).pipe(
  multicast(new Subject())
);

This alone won’t work because we need to manually call connect()

this.name$ = this.user$.pipe(
      map(user => user.name)
   );

this.age$ = this.user$.pipe(
      map(user => user.age)
   );

this.user$.connect();

After this we gonna see same behavior, there will be only one http call instead of two. Connecting and disconnecting manually might be hard to implement, therefore there is a refCount() operator that will automatically connect() with first subscription, keep count of subscriptions and keep Subject connected to the Source as long as there is at least one subscriber. When S_ubscriptions_ count drops to zero, Subject will be disconnected from the Source.

Source observable in our example is observable returned by this.http.get()
Subject is the internal subject passed as argument to multicast()
Subscriptions or observers are this.name$ and this.age$ and other observers that are subscribed to Subject.

In simple words all our subscribers will subscribe to subject X (passed to multicast) and subject X itself will subscribe to our http call. When observable returned by http call emits, our subject X will take that value and share among all subscribers.

The general idea of multicasting is that Subject subscribes to Source and multiple Observers subscribe to Subject.

The magic of RXJS sharing operators and their differences

Now let’s modify our code to use refCount(), so we don’t need to manually connect.

this.user$ = this.http.get(`api/user/1`).pipe(
  multicast(new Subject()),
  refCount()
);

Now we don’t need to manually call connect() and worry about disconnecting. refCount() will connect Subject to the Source upon first subscription to it and will disconnect when there are no observers anymore. 
In fact earlier we used combination of publish(), refCount() and that is totally the same as multicast(new Subject()), refCount()

publish()

RxJs has publish() operator and if we look at source code we will see that it uses multicast with Subject()

publish() === multicast(new Subject())

publish() optionally accepts selector function and in fact it changes the behavior of the operator and deserves its own article. We will skip that part and consider using publish() without selector function.

So when we use publish() we actually use the old multicast() with Subject() and so we either need to manually carry about connecting and disconnecting, or use refCount() to automate that process.

Because we mostly use publish() with refCount() there is very similar operator which uses refCount() internally and behaves similarly . That is share()

share()

share() === multicast(() => new Subject()).refCount()

In our first example we saw that share() does the same thing as publish(), refCount() and in most cases they are the same. share() is an operator that uses refCount() internally, so we don’t need to call it. share() just like publish() uses multicast() but the difference is in the argument passed to multicast().

publish() uses Subject instance — multicast(new Subject())

share() uses factory function that returns Subject instance — multicast(() => new Subject()).refCount()

That’s the only reason why we can’t say that share() is the same as publish() + refCount() . This difference cause different behavior for late subscribers when Source has completed.

Differences between share() and publish() + refCount()

They both use refCount() for managing subscriptions however

publish() + refCount()— as long as there is at least one subscriber to Subject, it will emit values. Once there are no subscribers Subject will be disconnected from Source. For all new subscribers if Source has completed they will receive “completed” emits, but if Source hasn’t completed Subject will re-subscribe to Source

share() — as long as there is at least one subscriber to Subject, it will emit values. Once there are no subscribers Subject will be disconnected from Source. For any new subscriber no matter if Source has been completed or not it will subscribe to Source again using new Subject

The difference is subtle but very important. Let’s modify our code to have a button which will update user’s data. When clicking it will re-fetch the data from server. 
First let’s use share()

ngOnInit() {
   this.user$ = this.http.get(`api/user/1`).pipe(
      share()
   )

this.name$ = this.user$.pipe(
      map(user => user.name)
   );

this.age$ = this.user$.pipe(
      map(user => user.age)
   );

}

update() {
  this.name$ = this.user$.pipe(
    map(user => user.name + 'update')
  );
  this.age$ = this.user$.pipe(
    map(user => user.age + 'updated')
  );
}

When we initially load the data refCount() will count all references. So we will have two references to Subject. Once we get data from server Subject will get that data from Source and complete. Both our subscribers will get the data from Subject and complete too, which means reference number in refCount() will be 0. In this case Subject will be disconnected from Source. 
When we execute update() method, new Subject() instance will be created and subscribed to Source. So each execution of update() will actually send request to the server.

Now consider same example with publish(), refCount()

this.user$ = this.http.get(`api/user/1`).pipe(
 publish(),
 refCount()
);

Again we will have refCount() counter set to 2 and once Source emits and completes counter will be 0. But when we execute update() method nothing will happen, no requests will be made to the server. As written above new subscribers will only get ‘complete’ notifications if the Source is completed.

The reason why they behave so is in the multicast(). Because publish() uses Subject instance, when Source completes, Subject will complete too, so any new subscriber to that Subject will receive only ‘complete’ notification.
share() uses factory function which returns Subject instance. When Source completes, Subject will complete too, but for new subscribers new Subject instance will be created and subscribed to Source.

multicast() with different subject type

Until now we discussed about multicast using Subject. There are few other types of Subject — ReplaySubject, BehaviorSubject and AsyncSubject. Passing different subject to multicast will return ConnectableObservable, but their behavior will differ.

First let’s look at ReplaySubject(n) , it takes number as argument which is the count of emits it will keep in buffer. For any new subscriber it will replay n-emits.

If we pass ReplaySubject(n) to multicast() all new subscribers will get n replayed values.

publishReplay()

publishReplay() === multicast(new ReplaySubject())

publishReplay() returns ConnectableObservable so we either need to use connect() or use refCount() for managing connections. Let’s modify our example so that every new subscriber will just get buffered value. So when we click update() we won’t receive new data, but get cached value.

this.user$ = this.http.get(`api/user/1`).pipe(
 publishReplay(1),
 refCount()
);

All subscribers to ReplaySubject before completion of Source will get emitted values (in our case just 1 value as Http emits only once). For all new subscribers ReplaySubject will replay N buffered values.

Because we mostly use publishReplay() with refCount(), there is very similar operator which uses reference counting mechanism internally and behaves similarly. That is shareReplay()

shareReplay()

shareReplay() is very interesting operator. It can behave similar way as publishReplay() + refCount() but it depends on how we use this operator.

Before RxJs version 6.4.0 the reference counting mechanism in shareReplay() worked in a different way. Starting from 6.4.0 we can explicitly pass an argument to shareReplay() to use ‘normal’ reference counting mechanism. Let’s see in more detail

shareReplay({refCount: true}) (RXJS 6.4.0 or newer)

refCount: true tells shareReplay() to use reference counting mechanism, similar to refCount() . In this case shareReplay({refCount: true}) is almost the same as publishReplay() + refCount() . Let’s modify our example to use shareReplay.

this.user$ = this.http.get(`api/user/1`).pipe(
 shareReplay({refCount: true, bufferSize: 1})
);

As you can see we don’t use refCount() anymore because shareReplay({refCount: true}) uses its own reference counting mechanism .
Result will be the same. All subscribers to ReplaySubject will get values as long as it emits them. All new subscribers will get N buffered values.
Before talking about other ways of using shareReplay, let’s see their differences.

Differences between shareReplay({refCount: true}) and publishReplay() + refCount()

They both use ReplaySubject() but shareReplay() is not implemented with multicast() .

publishReplay(n) + refCount() —as long as there is at least one subscriber to source ReplaySubject will emit values, once there are no subscribers ReplaySubject will be disconnected from Source. Any new subscriber will get the last N values from ReplaySubject and re-subscribe to source again using same ReplaySubject if the Source hasn’t completed yet.

_shareReplay({refCount: true, bufferSize: n}) _— as long as there is at least one subscriber ReplaySubject will emit values, once there are no subscribers ReplaySubject will be disconnected from Source. For new subscribers if Source has completed it will emit last N values from ReplaySubject, but if Source hasn’t completed it will only subscribe to source again using new ReplaySubject

To see the difference let’s use interval() so that for new subscribers Source will not be completed

this.source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);
const sub1 = this.source.subscribe(x => console.log('sub 1', x));
const sub2 = this.source.subscribe(x => console.log('sub 2', x));

setTimeout(() => {
  sub1.unsubscribe();
  sub2.unsubscribe();
}, 2000);

We have 2 subscription to ReplaySubject, sub1 and sub2. After 2 seconds both of them will unsubscribe from Subject. Because we are using refCount() when there will be no more subscribers (e.g. reference count drops to zero) it will disconnect ReplaySubject from Source. So far in the console we will see.

  sub 1– 0
 sub 2– 0
 sub 1– 1
 sub 2– 1

Now suppose we will create new subscriber to ReplaySubject with a click of button. (after refCount drops to zero)

newSub() {
 const sub3 = this.source.subscribe(x => console.log(‘sub 3’, x));
}

When newSub() gets executed sub3 will get last buffered value from ReplaySubject (which will be 1) and check is Source has completed. If completed, sub3 will receive ‘completed’ notification and complete as well. However because we are using interval(), Source won’t be completed and internal ReplaySubject will re-subscribe to Source again. The result will be

 sub 1– 0
 sub 2– 0
 sub 1– 1
 sub 2– 1
/**** execution of newSub() ****/
 sub 3– 1 <- replayed value
 sub 3– 0 <- new subscription
 sub 3– 1
 sub 3– 2
...

Internal ReplaySubject replays buffered values to new observers and either completes, or re-subscribes to Source depending on the status of Source completion.

Now same interval() example using shareReplay({refCount:true })

this.source = interval(1000).pipe(
  shareReplay({refCount: true, bufferSize: 1})
);
const sub1 = this.source.subscribe(x => console.log('sub 1', x));
const sub2 = this.source.subscribe(x => console.log('sub 2', x));

setTimeout(() => {
  sub1.unsubscribe();
  sub2.unsubscribe();
}, 2000);

//execute newSub() after sub1 and sub2 unsubscribe
newSub() {
 const sub3 = this.source.subscribe(x => console.log(‘sub 3’, x));
}

shareReplay() is not implemented with multicast but it uses a factory function internally and if we use it with refCount: true and ref count drops to zero, for any new subscriber if Source has completed it will replay buffered values and emit ‘completed’ notification. If Source has not completed for any new subscriber new ReplaySubject will be created and subscribed to Source. 
As a result after running code above and executing newSub() we will see.

 sub 1– 0
 sub 2– 0
 sub 1– 1
 sub 2– 1
/**** execution of newSub() ****/
 sub 3– 0 <- new subscription
 sub 3– 1
 sub 3– 2
...

As you can see no value was replayed for sub3. The reason is that when sub1 and sub2 unsubscribed, ref count will be zero and if Source has completed all new subscribers like sub3 will get all buffered values and ‘completed’ notification, but as we are using interval() and Source won’t be completed, ReplaySubject will be destroyed and any new subscriber like sub3 will create new ReplaySubject instance and subscribe to Source again.

shareReplay() without refCount

Till now we used shareReplay with refCount: true . We can use shareReplay with refCount set to false or not set at all and specify only buffer size — for example shareReplay({refCount: false, bufferSize: 3}) and shareReplay(3) are the same. This means that ReplaySubject will emit last 3 values and refCount will be false. This doesn’t mean that there will be no reference counting mechanism, it will just behave differently.

By default refCount is set to false in shareReplay()

refCount: false means that upon first subscription to ReplaySubject it will subscribe to Source. But it won’t disconnect ReplaySubject from Source where there are no more subscribers to ReplaySubject. Let’s modify our example again using refCount false

this.source = interval(1000).pipe(
  shareReplay({refCount: false, bufferSize: 2})
  //or just shareReplay(2)
);
const sub1 = this.source.subscribe(x => console.log('sub 1', x));
const sub2 = this.source.subscribe(x => console.log('sub 2', x));

setTimeout(() => {
  sub1.unsubscribe();
  sub2.unsubscribe();
}, 2000);

setTimeout(() => {
 const sub3 = this.source.subscribe(x => console.log(‘sub 3’, x));
}, 4000);

sub1 and sub2 subscribe to ReplaySubject and ReplaySubject subscribes to Source interval. After 2 second sub1 and sub2 will unsubscribe, but in this case ReplaySubject will NOT unsubscribe from Source. Source will continue to emit values even though there are no subscribers to catch theses values. 
After 4 seconds new subscriber will subscribe to ReplaySubject and will get last 2 buffered values and continue getting values from Source. The result will be

sub 1– 0
 sub 2– 0
 sub 1– 1
 sub 2– 1
/**** after 4 seconds  ****/
 sub 3– 2 <- replayed values
 sub 3– 3 <-
 sub 3– 4 <- continues receiving values
 sub 3– 5
...

sub1 and sub2 subscribed, printed values and after two seconds unsubscribed, but because Source has not completed yet, ReplaySubject will receive data from Source and so when after 4 seconds sub3 subscribes to ReplaySubject it will get not the 0 and 1 as buffered values, but 2 and 3,becausein that time ReplaySubject managed to get new values from Source and update its buffer. The only case where ReplaySubject will unsubscribe from Source is when Source completes or errors. Any new subscriber in that case will get replayed values and complete.

_shareReplay(n) __ no matter if there are active subscribers or not ReplaySubject will keep emitting values and keep connection with Source until Source will complete or error_. Any new Subscriber will get last N values. If Source hasn’t completed yet, new subscribers will continue getting values from Source

publishBehavior()

publishBehavior() uses multicast with another subject. BehaviorSubject

publishBehavior() === multicast(new BehaviorSubject())

publishBehavior() returns ConnectableObservable so we need to use refCount() or connect manually.

publishBehavior() accepts default value as parameter and will emit that value to all subscribers if Source hasn’t emitted. Consider this example

this.source = interval(1000).pipe(
 publishBehavior(47),
  refCount()
);
const sub1 = this.source.subscribe(x => console.log('sub 1', x));
const sub2 = this.source.subscribe(x => console.log('sub 2', x));

setTimeout(() => {
  sub1.unsubscribe();
  sub2.unsubscribe();
}, 2000);

setTimeout(() => {
  const sub3 = this.source.subscribe(x => console.log('sub 3', x));
}, 4000);

The result will be

 sub 1– 47 <- default values
 sub 2– 47 <-
 sub 1– 0
 sub 2– 0
 sub 1– 1
 sub 2– 1
/**** after 4 seconds ****/
 sub 3– 1 <- last buffered value
 sub 3– 1
 sub 3– 2
...

Because interval is async, when sub1 and sub2 subscribe to BehaviorSubject, at that time Source hasn’t emitted yet, so sub1 and sub2 will get default values from BehaviorSubject. After two seconds sub1 and sub2 will unsubscribe from BehaviorSubject and BehaviorSubject itself will unsubscribe from Source. After 4 seconds sub3 will subscribe to BehaviorSubject and because Source has not completed yet, sub3 will get the last emitted value and re-subscribe to Source using same BehaviorSubject.

_publishBehavior(default_value) __— _when subscription to BehaviorSubject is made before Source has emitted a value, BehaviorSubject will pass default value to that subscriber. As long as there is at least one subscriber to source BehaviorSubject will emit values. Once there are no subscribers BehaviorSubject will be disconnected from Source. If the Source hasn’t completed yet, new subscribers will get the last value from BehaviorSubject and re-subscribe to source using same BehaviorSubject. If Source has completed all new subscribers will get only ‘completed’ notification.

publishLast()

publishLast() uses multicast with AsyncSubject

publishLast() === multicast(new AsyncSubject())

As with all multicast operators publishLast() is best used with refCount() . 
AsyncSubject used in this operator is very interesting. It won’t emit values if subscribed until completes after which emits the last value.

this.source = interval(1000).pipe(
 take(2),
 publishLast(),
 refCount()
);

const sub1 = this.source.subscribe(x => console.log('sub 1', x));
const sub2 = this.source.subscribe(x => console.log('sub 2', x));

setTimeout(() => {
  const sub3 = this.source.subscribe(x => console.log('sub 3', x));
}, 7000);

Because interval is infinite observable we used take(2) so it will emit 2 values and complete. This is the result

sub 1– 1 //completed
 sub 2– 1 //completed
/**** after 7 seconds ****/
 sub 3– 1 //completed

when sub1 and sub2 subscribe to AsyncSubject they will not receive any values until Source completes. When Source completes AsyncSubject will pass last value to all observers and complete too. After 7 seconds sub3 subscribes to AsyncSubject and because it is completed it will pass last value and ‘completed’ notification to sub3 too.

_publishLast() _— No matter how many subscribers are connected to 
AsyncSubject,
it won’t emit any value until Source completes, however any side effect will be executed. When Source completes AsyncSubject completes too and****emits last value and ‘complete’ notification to all subscribers, current and new.

There is a lot going on in Rxjs and multicasting is on of most important things in the library. Hope this article helped you to understand how sharing operators work and what is their difference. Thanks for reading.

Learn More

The Complete JavaScript Course 2018: Build Real Projects!

Become a JavaScript developer - Learn (React, Node,Angular)

JavaScript: Understanding the Weird Parts

Vue JS 2 - The Complete Guide (incl. Vue Router & Vuex)

The Full JavaScript & ES6 Tutorial - (including ES7 & React)

JavaScript - Step By Step Guide For Beginners

The Web Developer Bootcamp

MERN Stack Front To Back: Full Stack React, Redux & Node.js

Original source: https://itnext.io

Suggest