In a nutshell, we'll investigate and use Typescript with Node.js to implement ReactiveX. In this tutorial, we'll demonstrate how to use several modules while using this API.
Introduction
ReactiveX is a library for creating asynchronous and event-based programs utilizing observable sequences, according to the reactivex.io/intro.html. It includes operators that let you assemble sequences together declaratively while abstracting away worries about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O. It extends en.wikipedia.org/wiki/Observer_pattern to support sequences of data and/or events. In order to access numerous items in asynchronous sequences, observables bridge the gap.
Prerequisites
- JavaScript, and basic knowledge of TypeScript
- ReactiveX — RxJs
- NodeJS
Let's start now.
By employing them here, we will examine the various operators available in the library. Observables can be made in numerous ways. Observables are used by RxJs to observe and emit data. The create function as well as the of, from, interval, range,
etc. operators can be used to build them.
As a side note, construct is deprecated starting with version 6+, therefore we must create
the Observable module from scratch.
import {Observable} from 'rxjs'
const observable = new Observable(subscriber => {
subscriber.next("Hello,")
subscriber.next("My name is...")
subscriber.next("Spectre :)")
subscriber.complete()
})
observable.subscribe({
next: x => {console.log(x)},
error: err => {console.log(`Error : ${err}`)},
complete: () => {console.log("Done")}
})
The Observable module was imported, the Observable object that is stored in theobservable
was built, and the subscriber object was supplied as an argument. The subscribe callbacks on the subscriber object will then be invoked. The techniques are:
subsequently: carried out after a value emission.
error: executed when a mistake is made
completed: when emission comes to an end. Unlike the latter, it does not emit values.
Let's investigate importing and employing the interval
operator to create an observable.
import { interval } from 'rxjs';
//Emits a number after every second.
interval(1000).subscribe(val => console.log(val));
//Output : 0,1,2,3,4....
Great!
Examine the merge
operator now. By using this operator, many observables can be combined into a single observable. This indicates that all observables emit data simultaneously.
import {interval, merge} from "rxjs";
import {mapTo} from "rxjs/operators";
//emits every second
const ones = interval(1000);
//emit every 2 seconds
const twos = interval(2000);
merge(
ones.pipe(mapTo("Sweet")),
twos.pipe(mapTo("sauce"))
).subscribe(val => console.log(val));
//Output : "Sweet", "sauce", "Sweet", "Sweet", "sauce", "Sweet", "Sweet"...
In the example above, we built two observables:Sweet
and Sauce.
Sweet emits data after a second and Sauce after two. The two observables are then combined into one using merge. The observables to be joined are passed as arguments to merge. In the meantime, we now have pipe in our code.
Observables can have operators attached to them using the function. Additionally, RxJs provides operators for changing the data that observables broadcast. To convert the value emitted to a String
in the code, we usedmapTo.
The operators are applied to the observable by the pipe function after accepting their arguments as arguments. The pipe function is open to several operators. Let closely look at our code for more information.
Within our code, the data emited happens in the manner:
On the 1st second countSweet
is emitted, then 2nd second"sauce", "Sweet"
is emitted . On the 3rd second count Sweet
gets emitted while on the 4th second count "sauce", "Sweet",
emits, and so the pattern continues.
Then themap
operator used to change the values from an observable.
import {from} from 'rxjs'
import {map} from "rxjs/operators";
from([1,2,3,4,5]).pipe(map(val => Math.pow(val, 2)))
.subscribe(value => {console.log(value)})
//Output : 1,4,9,16,25
Thefrom
operator, which builds observables from an array, promise, or iterable, and emits the values in the array one after the other, is what we just used to construct the observable. We then attach operators to our observable usingpipe,
so map
takes each value that is broadcast, squares it, and then emits the result.
Filter is the final operator we'll examine. This is used to filter the output data according to a set of requirements. However, if the value happens to not match this requirement, it is not emitted. Say we're looking for odd numbers in a list:
import {from} from "rxjs";
import {filter} from "rxjs/operators";
from([1, 2, 3, 4, 5]).pipe(filter(val => val % 2 !== 0))
.subscribe(value => {console.log(value)})
//Output : 1,3,5
It seems pretty simple, right? Every value that is an odd number is verified before being output. We can also filter out items based on values to make a small adjustment.
import {from} from "rxjs";
import {filter} from "rxjs/operators";
from([
{ name: 'Harvey', age: 20 },
{ name: 'Mike', age: 13 },
{ name: 'Donna', age: 17 },
{ name: 'Jessica', age: 25 }
]).pipe(
filter(user => user.age >= 18))
.subscribe(value => { console.log(`${value.name} is old enough to drink`)})
/**
Output:
* Harvey is old enough to drink
* Jessica is old enough to drink
**/
Why use RxJs and ReactiveX
As we saw in our tutorials, ReactiveX, also known as Reactive Extensions, is a large collection of lovely projects created by the ReactiveX community that enable asynchronous programming across various platforms and programming languages. They also aid in making the code shorter (less) and simpler to comprehend because to their observable patterns.
The library also has a well-managed error handling system that outperforms the conventional try/catch
method. RxJava, RxJs, RxSwift, and other projects under ReactiveX are also included.
Conclusion
These will make it simple for you to create asynchronous code that will improve the performance of our code or the entire project. In addition to the tasks mentioned in this article, these can be used to complete more complicated or related jobs.