Have you ever wondered how you can open a 10GB file on a device with only 4GB of RAM? Or how Netflix app can play a cloud-hosted 4K movie on your phone as soon as you press play? On a device, with 4GB of RAM, a 10GB file is effectively infinite. How does your device manage to load something that is infinite?
It has to load the file into memory in small chunks, read them, and discard them before loading more data into memory. It has to stream the data and process it in small chunks.
What is a Stream?
The stream is a collection of data that is potentially infinite. It is a sequence of data coming in overtime. It can be thought of as items on a conveyor belt being processed one at a time.
Stream = Array + Infinity
Since the data is potentially infinite, our trusty loops are not going to be effective on them. You can’t write a for loop from zero to infinity to process the whole stream.
Javascript
for (let i = 0; i < infinite; i++) { const element = stream[i]; } |
The problem is how do we know when to stop it. This is where Observables come into the picture.
Observables
Observables are potentially infinitely collections that return values one at a time asynchronously. i.e. there can potentially be some time passed between one value returned and the next.
Observable = Array + Infinity + Asynchronous // OR Observable = Promise + Returns many times
————— Value 1 ————— Value 2 ————— Value 3 ————— Value 4 —————|—>
In this the way, they are very similar to Promises. Promises can return one value after some time has passed. Observables returns potentially infinite values with some time passing between each of them.
Javascript
// Two states => resolve, reject const promise = new Promise(resolve, reject); promise .then((data) => console.log( "Data came back:" + data)) // Success . catch ((err) => console.error( "No, Ew David" , err)); // Error |
Promises have two possible states: resolve, reject, Or in other words: complete, error.
Javascript
const observable = from([1, 2, 3, 4]); // Three states => next, complete, error observable.subscribe({ next: (value) => console.log( "Next value:" , value), complete: () => console.log( "Infinity is Done!!! ¯\_(ツ)_/¯ " ), error: (err) => console.log( "No, Ew Binod" , err), }); |
Observables add an extra state to the same concept: next, complete, error. One of the most popular Observable libraries in JavaScript is RxJS. What makes RxJS awesome is not just the concept of Observables but also the extensive array of Operators. These Operators can take action on Observables to allow complex asynchronous code to be easily composed in a declarative manner.
RxJs Operators
In RxJS, Operators are functions that accept an Observable as input, run some transformations on it, and return the new transformed Observable as output. These Operators are (mostly) pure, side effect free functions; meaning, they don’t change the incoming Observable in any way. This makes the Operators chain-able or pipe-able; allowing asynchronous logic in your application to be broken down into small manageable blocks.
RxJs categorizes its Operators in a few categories but the most commonly used Operators are Creation Operators and Transformation Operators. In this guide, we will explore how to create an Observable, how to transform, and how to consume the data emitted by Observables.
Creation Operator: of
This is the simplest way to create an Observable from static data. It is an easy-to-use wrapper that accepts any sequence of data as input and returns a ready to use Observable. This operator comes in handy to start a brand new Observable pipeline.
Javascript
of(10, 20, 30).subscribe( next => console.log( 'next:' , next), err => console.log( 'error:' , err), () => console.log( 'the end' ), ); |
Output:
'next: 10' 'next: 20' 'next: 30' the end
Creation Operator: from
This Operator is similar to `of` but it works for iterable data i.e. it accepts a collection of data and returns an Observable that emits each value of the collection one at a time. The real power of this Operator comes from the fact that it can also accept asynchronous iterables like generators, promises, and other Observables.
Javascript
from([10, 20, 30]).subscribe( next => console.log( 'next:' , next), err => console.log( 'error:' , err), () => console.log( 'the end' ), ); console.log( '----------' ) const promise = fetchDataFromServer(); from(promise).subscribe( next => console.log( 'next:' , next), err => console.log( 'error:' , err), () => console.log( 'the end' ), ); |
Output:
'next: 10' 'next: 20' 'next: 30' the end ---------- 'next: {msg: "Hello world!"}' the end
Transformation Operator: map
This operator is very similar to Array#map. It accepts each new value emitted by the observable, transforms it, and passes it along to the next Operator in the pipe. This is where the conceptual underpinning of the Streams and Observables starts to shine.
Why go through the trouble of learning this whole new concept when the same problem can be solved using Array#map? Observables come in real handy when we simply can’t load the whole dataset into an Array (i.e. the data is effectively infinite). Or when we don’t have the whole dataset available to us upfront. As in, the dataset is asynchronous and new values are coming in slowly over the network. Or many times we have both problems meaning, effectively infinite data is coming in slowly over the network a few values at a time.
Javascript
// Another RxJS creation operator that // starts at 0 and emits 1000 values range(1, 1000) .pipe(map(x => x * 10)) .subscribe( next => console.log( 'next:' , next), err => console.log( 'error:' , err), () => console.log( 'the end' ), ); |
Output:
'next: 10' 'next: 20' 'next: 30' .... .... .... 'next: 10000' the end
RxJs operators are almost always Pure/side-effect free, and they work with one emitted value at a time. This makes dealing with effectively infinite datasets really easy. Since the function is side effect free, the system doesn’t have to hold on to items that are not currently being processed. i.e. only one item at a time is held in memory.
Transformation Operator: mergeMap
This operator is very similar to map but its transformation function returns asynchronous data (Observables or Promises). This makes handling many async calls to servers or databases very easy and even allows us to parallelize those calls.
Javascript
range(1, 1000).pipe(mergeMap(pageNum => fetchBulkDataFromServer({pageNum: pageNum}))) .pipe(map(bulkData=>`Page Num ${bulkData.page} returned ${bulkData.items.length} items`)) .subscribe( next => console.log( 'next:' , next), err => console.log( 'error:' , err), () => console.log( 'the end' ), ); |
Output:
'next: Page Num 1 returned 100 items' 'next: Page Num 2 returned 90 items' 'next: Page Num 3 returned 70 items' 'next: Page Num 4 returned 100 items' .... .... 'next: Page Num 1000 returned 30 items' the end
As mergeMap is mapping over async data (Observables), it significantly speeds things up by mapping over multiple Observables in parallel. It accepts a second argument `concurrency count` that defines how many Observables to run in parallel. Implementing this level of parallel async processing without using Observables is not a straightforward task and can easily result in hard to debug concurrency bugs.
Javascript
const maxParallelApiCalls = 50; range(1, 1000).pipe(mergeMap(pageNum => fetchBulkDataFromServer({pageNum: pageNum}), maxParallelApiCalls)).pipe(map(bulkData => `Page Num ${bulkData.page} returned ${bulkData.items.length} items`)) .subscribe( next => console.log( 'next:' , next), err => console.log( 'error:' , err), () => console.log( 'the end' ), ); |
Output:
'next: Page Num 7 returned 10 items' 'next: Page Num 12 returned 8 items' 'next: Page Num 38 returned 12 items' 'next: Page Num 3 returned 70 items' .... .... 'next: Page Num 1000 returned 30 items' the end
In the above example, RxJs starts processing 50 observables at the same time and emit the data returned by those Observables in the order they finished. So whichever API call returns first its data would be piped to the next Operator. Here’s a timeline visualization of how async data is parallelized by mergeMap.
<Start> — Value 1 —————————————————————————————————|—> <Start> ————————————————————— Value 2 —————————————|—> <Start> ——————————— Value 3 ———————————————————————|—> <Start> —————————————————————————————— Value 4 ————|—> <Start> —————————————————Value 5 ——————————————————|—> ——————————————————————— Merge ———————————————————————— <Start> — Value 1 —— Value 3 —— Value 5 —— Value 2 —— Value 4 —|—>
Conclusion: The above examples cover a few Operators in this beginners guide, but RxJS has many more, suitable for a wide variety of use cases. Check out their documentation to explore more.
Reference: https://rxjs.dev/api