Renátó Bogár
Renato's blog

Renato's blog

RxJS Pt.1 - Terms

RxJS Pt.1 - Terms

Renátó Bogár's photo
Renátó Bogár
·May 21, 2022·

9 min read

Subscribe to my newsletter and never miss my upcoming articles

Table of contents

  • What is RxJS?
  • Summary

What is RxJS?

RxJS is a short of Reactive Extensions for JavaScript. Reactive extensions were originally developed by Microsoft as Rx.NET. Since that the reactive library has been implemented for several other languages like Java, Python, Ruby and JavaScript. You can use RxJS with frameworks such as Angular, React and Vue.

According to the documentation

RxJS is a library for composing asynchronous and event-based programs by using observable sequences.

RxJS was designed to work with asynchronous actions and events, though it works with synchronous items as well. From the point of view of a client-server application the HTTP request and response is asynchronous. So Angular uses RxJS to handle the asynchronous request and response. An Angular service issues an HTTP GET request to a back-end server somewhere, and the client code continues. The data is not immediately returned, but the application is not blocked and does not wait for the response. The back-end server receives the GET request locates the data from the database and returns with the response. The application gets the notification when the data is responded and begins to process it. So this is asynchronous. The request is issued and sometime later the response is returned. This let the browser work normally, while the response is being handled.

After these we can say that

RxJS is a library for observing and reacting to datas and events by using observable sequences.

RxJS helps us better manage and manipulate the data needed in our application, especially asynchronous and event-based data.

Other ways for asynchronous data

There are other ways to manage asynchronous data.

  • Callback -> can be difficult to manage when working with nested async operators
  • Promise -> it can only handle a single emission and is not cancellable
  • Async/await -> special syntax that allows writing asynchronous code that looks synchronous. This also can handle single emission and is not cancellable.

By the way, JavaScript is based on asynchronous behavior. This is how it was designed and it is not a good practice to make JavaScript more like a synchronous programming language, since until it is waiting at the await keyword, it can not do other work.

So why use RxJS?

Using RxJS has several advantages. RxJS provides a single technique to work with any type of data. We often work with multiple sources of data, events from the keyboard, mouse, or routes and data from arrays files databases or third party APIs. With RxJS we can use the same techniques and operators for these. RxJS is lazy: evaluation does not start until subscription, so we can create recipes that only execute when we need the result. RxJS has built-in error handling. Aaand RxJS operators are cancellable which means if a user clicks on product A and then quickly on B, we can cancel the request and only return product B.

RxJS terms

Observer

According to RxJS documentation az Observer is

a collection of callbacks that knows how to listen to values delivered by the Observable the documentation also mentions that

An observer is a consumer of values delivered by an Observable

So the Observable emits notifications and associated values, and it is the observer that responses to those notifications and consumes those values. Internally, in RxJS an observer is also defined as an interface with next(), error(), and complete() methods. That ensures that any class created as an observer, implements these methods. One such class that implements the observer interface is a subscriber. While observer is the class we used to observe emitted items, inside RxJS each observer is converted to a subscriber. A subscriber is basically an observer with additional features to unsubscribe from an observable.

Now lets see an Observer example in code

const observer = {
  next: car => console.log(`Car emitted ${car}`),
  error : err => console.log(`Car emitted ${err}`),
  complete: () => console.log("No more cars")
}

An observer is an object that handles three types of notifications specified as methods. We declare a variable called observer and assign it to an object literal, then define our three methods. next() specifies the method to execute each time an item is emitted to the stream. error() details the methods to execute if an error occurs complete() for any final processing or cleanup

Observable

Simply stated, an observable is a collection of events or values emitted over time. Events can come from user actions (keypress etc.), or from the application such as router or forms. Each time an event occurs or value is received, the observable emits a notification with the associated item. The Observers those are subscribed to the Observable gets notification and can react to the emission. And if we have multiple subscribers, each of them receives the notification and can react to that emission. An observable can emit any time of data. It can emit primitives, events, objects, arrays or http responses. We can even emit another observable. Observables can be synchronous, meaning the items are emitted immediately and in sequence, for example emitting items from an existing array. Or they can be asynchronous for example a http response. Observables can run finite number of times or an infinite number of times such as a count when a timer goes off every second forever. What does this observable look like in code ?

const car$ = new Observable ( carSubscriber => {
  carSubscriber.next('Car 1');
  carSubscriber.next('Car 2');
  carSubscriber.complete()
})

Its a convention that we add a dollar sign at the end of the observable meaning that this variable can be observed as it emits items. In the constructor we optionally provide a function that is called when the observable is subscribed. This function is given the subscriber. Within the function, we can use that subscriber to call next to emit values, error to raise an error, or complete to provide notification that there are no more items to emit.

Subscribing

With RxJS we call the subscribe method on the observable to start receiving notifications and pass in the observer so the observable knows notifications and pass in the observer so the observable knows where to send those notifications to. We must subscribe to an observable, otherwise we receive no emissions and have nothing to observe.

There are two types of Observables The ones does are providing the values even if there are no subscribers The ones those are providing only if they have at least one subscriber - for example arrays -> cold observables

So how we do subscribing ? Lets take the codes we saw already

const car$ = new Observable ( carSubscriber => {
  carSubscriber.next('Car 1');
  carSubscriber.next('Car 2');
  carSubscriber.complete()
})

const observer = {
  next: car => console.log(`Car emitted ${car}`),
  error : err => console.log(`Car emitted ${err}`),
  complete: () => console.log("No more cars")
}

and the way we subscribe is

const sub = car$.subscribe(observer)

As a part of the subscription process, we tell the observable how we will observe it. The subscribe methods returns the subscription, which represents the execution of the observable. Now that we subscribed the constructor of the car$ executes. It first calls next, and the observer gets the next notification, and displays the next method message in the console. It calls next again and displays the second message in the console. Lastly the code calls the complete method. The observer gets the complete notification and displays our complete message. When you hear the term observer, think set of callbacks to observe, handling next and complete.

Now normally this is not the way how we work with observables, but we pass the observer object with next, error, and complete methods directly in the subscribe methods like below

const sub = car$.subscribe({
    next: car => console.log(`Car emitted ${car}`),
    error : err => console.log(`Car emitted ${err}`),
    complete: () => console.log("No more cars")
})

If we only want to implement the next method, not the error or complete we can use the shortcut syntax. Instead of passing the observer object, we can pass the next method directly to the subscribe

const sub = car$.subscribe({
    car => console.log(`Car emitted ${car}`)
})

When you call complete() method it automatically cancels all subscriptions. But what if you would like to cancel a subscription directly?

Unsubscribing

With RxJS, to stop receiving notifications, we call unsubscribe on the subscription returned from the subscribe method. Every time we subscribe to start we should unsubscribe to stop (one exception is using reactive , and async pipe). This avoids potential memory leaks in our applications. There are actually several ways we can stop receiving notifications from an observable. calling complete() method on the subscriber automatically cancels all subscriptions

  • some creation functions and operators automatically complete after emitting all of their items. completion cancels all subscriptions
  • any uncaught error executes the observer's error method and cancels all subscriptions
  • or we can stop receiving notifications from an observable calling unsubscribe() method

Since sub method returns a subscription we can use the unsubscribe method.

sub.unsubscribe()

Properly unsubscribing from each Observable helps avoid memory leaks.

Creation functions

When using RxJS in an Angular application we often do not create our own observables. Rather, we work with the observables Angular creates for us. But there are times we want to create one ourselves. We could create an observable using the observable constructor

const car$ = new Observable ( carSubscriber => {
  carSubscriber.next('Car 1');
  carSubscriber.next('Car 2');
  carSubscriber.complete();
})

but the recommended technique is to use a built-in creation function. of is one such a function

const car$ = of('Car 1','Car 2');

In the example above, the observable emits two strings and completes. Or we can use the from creation function

const car$ = from(['Car 1', 'Car 2']);

from creates an observable from an array or other data structure, emitting each individual value from that structure and then completing.

Untitled Diagram.drawio (1).png

If you would like to have the same result you can use the javascript spread operator (...) of(...cars)

There are other creation functions in RxJS, these are just the simplest ones.

of(2,4,6,8).subscribe( (item) => console.log(item))

from([20,15,10,5]).subscribe({
  next: (item)=> console.log(`resulting item ..${item}`),
  error: (err)=> console.log(`error occured ..${err}`),
  complete: ()=> console.log('complete')
})

Summary

  • Observable : Collection of events or values emitted over time
  • Observer: Observes notifications from the Observable, three methods: next(), error(), complete()
  • Subscriber: An observer that can unsubscribe
  • Subscription: Tells the Observable that the subscriber is ready for notifications
  • Creation functions: of, from, fromEvent, interval ...
  • Observables from angular features: forms, routing, HTTP
  • Subscribe by using the subscribe() method

Since the reading time would be too long to talk about RxJS, i will split this article into for parts. Keep on reading 😋

Did you find this article valuable?

Support Renátó Bogár by becoming a sponsor. Any amount is appreciated!

See recent sponsors Learn more about Hashnode Sponsors
 
Share this