- Published on
Creating observables from scratch
- Authors

- Name
- Marcelo Carmona
- @carmonamarcelo

Video overview
Creating observables from scratch
I'll walk you through the process of building your own small version of an RxJS-like library from scratch.
One way to understand RxJS is by implementing something similar and simpler from scratch. I'm going to show how we can compose functions in a way similar to how RxJS does it.
We are going to see different types of callbacks.
const elem = document.querySelector('#someElem')
function consoleClick(event) {
console.log(`clicked ${event.x}`)
}
elem.addEventListener('click', consoleClick)
In this case, we use an event listener to execute a callback in the DOM.
const arr = [1, 2, 3, 4, 5]
arr.forEach(function callback(x) {
console.log(x)
})
In this second example, we have a callback that is executed for each iteration of our array. NOTE: In this case, the callback is synchronous. It is important to keep in mind that when we say callbacks, we do not have to think that all callbacks are asynchronous.
const promise = fetch('https://jsonplaceholder.typicode.com/posts/1').then((res) => res.json())
function successCb(post) {
console.log(`post1: ${post.title}`)
}
function failureCb(err) {
console.error(err)
}
promise.then(successCb, failureCb)
In this example, we see how to execute callbacks when a promise resolves successfully or when an error occurs. Keep in mind that this case is a little different from the previous one, since there is no chance of making a mistake when I click.
fs = require('fs')
const readable = fs.createReadStream('intro03.js', { highWaterMark: 100 })
function nextDataCb(chunk) {
console.log(`Received ${chunk.length} bytes of data.`)
}
function errorCb(err) {
console.log(`Something was wrong :( ${err}`)
}
function doneCb() {
console.log('There will be no more data.')
}
readable.on('data', nextDataCb)
readable.on('error', errorCb)
readable.on('end', doneCb)
In this example, in Node, we can see a case where we have 3 callbacks: one that reads data as it arrives ("data"), another that runs when it ends ("end"), and another that runs when an error occurs.
Bearing this in mind, the idea is to think about a generic way to handle all callbacks in JavaScript. We could think about it the same way, with three callbacks: next, error, and complete.
function nextCallback(data) {
console.log(data) // To do something
}
function giveMeSomeData(nextCb, errorCb, completeCb) {
// We use just the nextCb for this case
document.addEventListener('click', nextCb)
}
giveMeSomeData(nextCallback, errorCallback, completeCallback)
Remembering the first example of the event listener, we might think about it this way.
function nextCallback(data) {
console.log(data)
}
function completeCallback() {
console.log('done')
}
function giveMeSomeData(nextCb, errorCb, completeCb) {
;[1, 2, 3].forEach(nextCallback)
completeCb()
}
giveMeSomeData(nextCallback, errorCallback, completeCallback)
Remembering the second example of the array, we could think of something like this.
function nextCallback(data) {
console.log(data)
}
function errorCallback(err) {
console.error(err)
}
function completeCallback() {
console.log('done')
}
function giveMeSomeData(nextCb, errorCb, completeCb) {
fetch('https://jsonplaceholder.typicode.com/posts/1')
.then((res) => {
// We call next and complete
nextCb(res)
completeCb()
})
.catch(errorCb) // Error callback
}
giveMeSomeData(nextCallback, errorCallback, completeCallback)
Spoiler: it is the same idea as RxJS's fromPromise: http://reactivex.io/rxjs/file/es6/observable/PromiseObservable.js.html#lineNumber58
const observable = {
subscribe: function subscribe(ob) {
;[1, 2, 3].forEach(ob.next)
ob.complete()
},
}
const observer = {
next: function nextCallback(data) {
console.log(data)
},
error: function errorCallback(err) {
console.error(err)
},
complete: function completeCallback() {
console.log('done')
},
}
observable.subscribe(observer)
Taking the refactored array example into account, we create observer and observable objects.
We rename GiveMeSomeData to subscribe.
function map(transformCb) {
const inputObservable = this
const outputObservable = createObservable(function subcribe(outputObservable) {
inputObservable.subscribe({
next: function nextCallback(data) {
const transformData = transformCb(data)
outputObservable.next(transformData)
},
error: function errorCallback(err) {
console.error(err)
},
complete: function completeCallback() {
console.log('done')
},
})
})
return outputObservable
}
function filter(condicionalCb) {
const inputObservable = this
const outputObservable = createObservable(function subcribe(outputObservable) {
inputObservable.subscribe({
next: function nextCallback(data) {
if (condicionalCb(data)) {
outputObservable.next(data)
}
},
error: function errorCallback(err) {
console.error(err)
},
complete: function completeCallback() {
console.log('done')
},
})
})
return outputObservable
}
function createObservable(subcribe) {
return {
subscribe: subcribe,
map: map,
filter: filter,
}
}
const arrayObservable = createObservable((ob) => {
;[1, 2, 3].forEach(ob.next)
ob.complete()
})
const observer = {
next: function nextCallback(data) {
console.log(data)
},
error: function errorCallback(err) {
console.error(err)
},
complete: function completeCallback() {
console.log('done')
},
}
arrayObservable
.map((x) => x * 10)
.filter((x) => x !== 20)
.subscribe(observer)
Then we can create the map and filter operators.
I also left the complete code in a gist:
https://gist.github.com/marcelocarmona/5aa60c8baff780a29673b7987b71a743
Finally, for comparison, we can see an example that uses an observable array with RxJS.
const Rx = require('rxjs')
const arrayObservable = Rx.Observable.from([1, 2, 3])
arrayObservable
.map((x) => x * 10)
.filter((x) => x !== 20)
.subscribe({
next: (x) => console.log(x),
error: (err) => console.error(err),
complete: () => console.log('done'),
})