A system that provides you data over time.
A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.Wikipedia
We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also.
Reading from a file, produces data
Writing to a file, consumes data
Socket connection, bi directional.
TCP connections are a good example.
Similar top a Duplex stream. Takes input, produces output.
gulp.src(paths.srcCSS + 'style.scss')
.pipe(sass().on('error', sass.logError))
.pipe(postcss(postcss))
.pipe(minifyCss())
.pipe(gulp.dest(paths.distCSS));
gulp-sass
is an example of a transform stream. It has a readable stream, and a writable stream.
Temporary Storage for data.
The maximum level of data to be consumed at a time.
The data source for information to go.
The act of taking the buffer of data and sending it to the sink.
Written in 1984, the Unix system was starting to get complex, and the connections to various protocols needed a new system to handle I/O.
git log | grep Ryan | wc -l
The |
character is used to pipe information from the output of one program to the input of another.
The ability pipe data like this allows for great composition!
Each command in Unix has three types of data streams that are initiated with it, the stdin, stdout, and stderr
In this example the command git log
is sending its stdout via the pipe to greps
stdin, and so on
git log | grep Ryan | wc -l
A great resource.
In node, the most common place you might have worked with a stream before is the fs
module.
const fs = require('fs');
fs.createReadStream('mobydick.txt')
.pipe(process.stdout);
In node we use .pipe()
to connect our streams together, think the garden hose idea!
If you have ever used the http
module and CreateServer()
, you have technically been working with streams!
http.CreateServer((req,res) =>
//req is a reablestream
//res is a writablestream
});
const fs = require('fs');
const ws = fs.createWriteStream('newclassic.txt');
ws.write("The air was chill that morning\n");
ws.write("you could feel it in the air");
ws.end();
fs.createReadStream('mobydick.txt')
.pipe(upperCaser)
.pipe(process.stdout)
const Transform = require('stream').Transform;
class UpperTransform extends Transform {
_transform(chunk,enc,next) {
this.push(chunk.toString().toUpperCase());
next();
}
}
const upperCaser = new UpperTransform();
A buffer representation of "Hey there!"
<Buffer 48 65 79 20 74 68 65 72 65 21 >
class WhaleTransform extends Transform {
_transform(chunk, enc, next) {
const string = chunk.toString().replace(/whale/ig,"🐳");
this.push(string);
next();
}
}
const whaler = new WhaleTransform();
fs.createReadStream('../mobydick.txt')
.pipe(upperCaser)
.pipe(whaler)
.pipe(process.stdout);
Now we are getting something that
looks like how Gulp works.
gulp.src(paths.srcCSS + 'style.scss')
.pipe(sass().on('error', sass.logError))
.pipe(postcss(postcss))
.pipe(minifyCss())
.pipe(gulp.dest(paths.distCSS));
fs.createReadStream('mobydick.txt')
.pipe(upperCaser)
.pipe(whaler)
.pipe(process.stdout)
npm i whaleit
const fs = require('fs')
const Whaleit = require('whaleit');
const whaler = new Whaleit();
fs.createReadStream('mobydick.txt')
.pipe(whaler)
.pipe(process.stdout)
🙏 Substack 🙏
const net = require('net');
const server = net.createServer((stream)=>{
stream.pipe(stream);
});
server.listen(5000);
const net = require('net');
const Whaleit = require('whaleit');
const whaleit = new Whaleit();
const server = net.createServer((stream)=>{
stream
.pipe(whaleit)
.pipe(stream);
});
server.listen(5000);
https://streams.spec.whatwg.org/
The response from a fetch
request has the body
property on it. This is an implementation of a ReadableStream.
const response = await fetch('./mobydick.txt');
response.body.pipTo(WritableStream)
With ReadableStreams you can use the pipeTo
to pipe data along.
pipeTo
pipeThrough
We can also create a reader from the body.
const reader = response.body.getReader();
reader.read().then(({value, done}) => {});
async function whaler() {
const response = await fetch('./mobydick.txt');
const reader = response.body.getReader();
return pump();
function pump() {
return reader.read().then(({value, done}) => {
if (done) return;
//Do something with that value
return pump();
});
}
}
let string = decoder.decode(value,{stream:true});
const textElement = document
.createTextNode(string.replace(/whale/ig,'🐳'))
document.body.appendChild(textElement);
const write = new WritableStream({
write(chunk,controller) {
const decoder = new TextDecoder('utf-8')
console.log(decoder.decode(chunk))
}
});
const res = await fetch('mobydick.txt');
res.body
.pipeTo(write)
.then(() => {})
.catch(console.err);
Not yet implemented
class TransformStream {
constructor() {
this.readable = new ReadableStream({
start(controller) {},
cancel(reason) {}
});
this.writable = new WritableStream({
start(controller) {},
write(chunk) {},
close() {},
abort(reason) {}
});
}
}
Jake & Surma
npm i boatit
const fs = require('fs')
const Whaleit = require('whaleit');
const Boatit = require('boatit');
const whaler = new Whaleit();
const boatit = new Boatit();
fs.createReadStream('../mobydick.txt')
.pipe(whaler)
.pipe(boatit)
.pipe(process.stdout)