pull-stream
Streams are an asynchronous abstraction that allows dealing with data in small chunks, pushing bottlenecks into the IO layer. This usually leads to less memory cost and increased performance, which is a very good thing. In streams, data flows from a source, through a bunch of through streams, into a sink:
┌────────┐ ┌─────────┐ ┌────────┐
│ Source │──▶│ Through │──▶│ Sink │
└────────┘ └─────────┘ └────────┘
Which using marble charts is:
source()
-------------------------
through()
-------------------------
sink()
-------------------------
The x-axis in a marble chart is the progression of time. In JavaScript each
dash is roughly equivalent to a process.nextTick()
call on the event loop.
In pull streams there are 3 types of streams. Source, through and sink. In order to let data flow, a source and sink must be connected. Through streams are combinations of sources and sinks, making every connection in the pipeline a source and a sink that talk to each other.
Under the hood a through
stream is nothing but a sink
coupled to a
source
. When the source
gets called to provide more data, it calls the
sink
to request more data. Through this mechanism the full flow of data is
corked until a sink
is attached at the end that starts reading data.
Conceptually it looks like this:
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│Source│──▶│ Sink │ ┌▶│ Sink │ ┌▶│ Sink │
└──────┘ ├──────┤ │ ├──────┤ │ └──────┘
│Source│─┘ │Source│─┘
└──────┘ └──────┘
Through Through
source
//a stream of 100 random numbers.
var i = 100
var random = function () {
return function (end, cb) {
if(end) return cb(end)
//only read 100 times
if(i-- < 0) return cb(true)
cb(null, Math.random())
}
}
through
var map = function sink (read, map) {
//return a readable function!
return function source (end, cb) {
read(end, function (end, data) {
cb(end, data != null ? map(data) : null)
})
}
}
function postTorrent (req, res, params) {
return function writable (read) {
return function readable (end, cb) {
read(end, function (end, data) {
if (end === true) return cb(true)
if (end) return cb(end)
cb(error.client('nooop'))
})
}
}
}
function myCoolFunction () {
return (read) => (end, cb) => read(end, (end, data) => {
if (end === true) return cb(true)
if (end) return cb(end)
cb(null, 'hey!')
})
}
## sink
```js
//read source and log it.
var logger = function () {
return function (read) {
read(null, function next(end, data) {
if(end === true) return
if(end) throw end
console.log(data)
read(null, next)
})
}
}
Core methods
Core pull-stream methods. Marble charts adapted from xstream.
Source
count
Create a stream that outputs 0 ... max
. by default, max = Infinity
count(6)
0---1---2---3---4---5---6
const source = pull.count(6)
const sink = pull.log()
pull(source, sink)
empty
A stream with no contents (it just ends immediately)
empty()
------------------------
const source = pull.empty()
const sink = pull.log()
pull(source, sink)
error
A stream that errors immediately.
error()
--e---------------------
const source = pull.error()
const sink = pull.log()
pull(source, sink)
infinite
Create an unending stream by repeatedly calling a generator function (by
default, Math.random()
)
infinite(() => i++)
--1--2--3--4--5--6--7--8--
var i = 0
const source = pull.infinite(() => i++)
const sink = pull.log()
pull(source, sink)
keys
once
values
Through
async-map
filter-not
filter
flatten
Turn a stream of arrays into a stream of their items.
---------------1----2---3--
-----1--2----3----4--------
flatten()
-----1--2----3-1--4-2---3--
const source = pull.values([
pull.values([ 1, 2, 3, 4 ]),
pull.values([ 1, 2, 3 ])
])
const sink = pull.log()
pull(source, sink)
map
Transforms each value from the source stream through a project function, to get a stream that emits the transformed data.
--1---3--5-----7------
map((i) => i * 10)
--10--30-50----70-----
const source = pull.values([ 1, 3, 5, 7])
const through = pull.map((i) => i * 10)
const sink = pull.log()
pull(source, through, sink)
non-unique
take
If test is a function, read data from the source stream and forward it downstream until test(data) returns false. If opts.last is set to true, the data for which the test failed will be included in what is forwarded. If test is an integer, take n item from the source.
--1---2--3----4---5--
take(3)
--1---2--3|
const source = pull.count()
const through = pull.take(3)
const sink = pull.log()
pull(source, through, sink)
through
unique
Sink
collect
concat
drain
find
log
on-end
reduce
Patterns
Eventual values
Return a stream from a function synchronously, emit data from it asynchronously.
const notify = require('pull-notify')
const pull = require('pull-stream')
const xhr = require('xhr')
pull(request('foobar.com'), pull.log())
function request (url) {
const xhr$ = notify()
xhr(url, (err, res, body) {
if (err) return xhr$.abort(err)
xhr$(body)
xhr$.end()
})
return xhr$.listen()
}