Marcelo Carmona
Published on

Creating observables from scratch

Authors

One way to understand Rxjs is implementing something similar and simplest from scratch. I'm going to show how we can compose functions in a similar way that Rxjs does.

We are going to see different types of callbacks

const elem = document.querySelector('#someElem')

function consoleClick(event) {
  console.log(`clicked ${event.x}`)
}

elem.addEventListener('click', consoleClick)

In this case, we use an event listener to execute a callback in the DOM

const arr = [1, 2, 3, 4, 5]

arr.forEach(function callback(x) {
  console.log(x)
})

In this second example, we have a callback that is executed for each iteration of our array. NOTE: in this case the callback is synchronous, it is important to keep in mind that when we say callbacks we do not have to think that all callbacks are asynchronous.

const promise = fetch('https://jsonplaceholder.typicode.com/posts/1').then((res) => res.json())

function successCb(post) {
  console.log(`post1: ${post.title}`)
}

function failureCb(err) {
  console.error(err)
}

promise.then(successCb, failureCb)

In this example, we see how to execute callback in the event that a promise was resolved correctly or in the event of an error. To take into account this case is a little different from the previous one since I have no chance of making a mistake when I clicked.

fs = require('fs')

const readable = fs.createReadStream('intro03.js', { highWaterMark: 100 })

function nextDataCb(chunk) {
  console.log(`Received ${chunk.length} bytes of data.`)
}

function errorCb(err) {
  console.log(`Something was wrong :( ${err}`)
}

function doneCb() {
  console.log('There will be no more data.')
}

readable.on('data', nextDataCb)
readable.on('error', errorCb)
readable.on('end', doneCb)

In this example, in Node, we can see a case where we have 3 callback, one that is reading the data as it needs "data" another when it ends "end" and another when an error occurs.

Bearing this in mind, the idea is to think about a generic way of how to handle all the callbacks in javascript, we could think in the same way with three callbacks next, error, and complete

function nextCallback(data) {
  console.log(data) // To do something
}

function giveMeSomeData(nextCb, errorCb, completeCb) {
  // We use just the nextCb for this case
  document.addEventListener('click', nextCb)
}

giveMeSomeData(nextCallback, errorCallback, completeCallback)

Remembering the first example of the eventListener we might think so.

function nextCallback(data) {
  console.log(data)
}

function completeCallback() {
  console.log('done')
}

function giveMeSomeData(nextCb, errorCb, completeCb) {
  ;[1, 2, 3].forEach(nextCallback)
  completeCb()
}

giveMeSomeData(nextCallback, errorCallback, completeCallback)

Remembering the second example of the array we could think of something like that.

function nextCallback(data) {
  console.log(data)
}

function errorCallback(err) {
  console.error(err)
}

function completeCallback() {
  console.log('done')
}

function giveMeSomeData(nextCb, errorCb, completeCb) {
  fetch('https://jsonplaceholder.typicode.com/posts/1')
    .then((res) => {
      // We call next and complete
      nextCb(res)
      completeCb()
    })
    .catch(errorCb) // Error callback
}

giveMeSomeData(nextCallback, errorCallback, completeCallback)

Spoiler: is the same idea as fromPromise of Rxjs http://reactivex.io/rxjs/file/es6/observable/PromiseObservable.js.html#lineNumber58

const observable = {
  subscribe: function subscribe(ob) {
    ;[1, 2, 3].forEach(ob.next)
    ob.complete()
  },
}

const observer = {
  next: function nextCallback(data) {
    console.log(data)
  },
  error: function errorCallback(err) {
    console.error(err)
  },
  complete: function completeCallback() {
    console.log('done')
  },
}

observable.subscribe(observer)

Taking into account the example of the refactoring array we create an object observer and observable. GiveMeSomeData we rename it to subscribe

function map(transformCb) {
  const inputObservable = this
  const outputObservable = createObservable(function subcribe(outputObservable) {
    inputObservable.subscribe({
      next: function nextCallback(data) {
        const transformData = transformCb(data)
        outputObservable.next(transformData)
      },
      error: function errorCallback(err) {
        console.error(err)
      },
      complete: function completeCallback() {
        console.log('done')
      },
    })
  })
  return outputObservable
}

function filter(condicionalCb) {
  const inputObservable = this
  const outputObservable = createObservable(function subcribe(outputObservable) {
    inputObservable.subscribe({
      next: function nextCallback(data) {
        if (condicionalCb(data)) {
          outputObservable.next(data)
        }
      },
      error: function errorCallback(err) {
        console.error(err)
      },
      complete: function completeCallback() {
        console.log('done')
      },
    })
  })
  return outputObservable
}

function createObservable(subcribe) {
  return {
    subscribe: subcribe,
    map: map,
    filter: filter,
  }
}

const arrayObservable = createObservable((ob) => {
  ;[1, 2, 3].forEach(ob.next)
  ob.complete()
})

const observer = {
  next: function nextCallback(data) {
    console.log(data)
  },
  error: function errorCallback(err) {
    console.error(err)
  },
  complete: function completeCallback() {
    console.log('done')
  },
}

arrayObservable
  .map((x) => x * 10)
  .filter((x) => x !== 20)
  .subscribe(observer)

And then we can create the map and filter operators. The complete code also I leave it in a gist

https://gist.github.com/marcelocarmona/5aa60c8baff780a29673b7987b71a743

And finally to compare we can see an example already using an observable array with Rxjs.

const Rx = require('rxjs')

const arrayObservable = Rx.Observable.from([1, 2, 3])

arrayObservable
  .map((x) => x * 10)
  .filter((x) => x !== 20)
  .subscribe({
    next: (x) => console.log(x),
    error: (err) => console.error(err),
    complete: () => console.log('done'),
  })