Full Stream Ahead! 🐳

About Me

What is a stream?

A system that provides you data over time.

A stream can be thought of as items on a conveyor belt being processed one at a time rather than in large batches.
Wikipedia

History

Doug Mcilroy, in a Bell labs memo from 1964

We should have some ways of connecting programs like garden hose--screw in another segment when it becomes necessary to massage data in another way. This is the way of IO also.

Types of streams

  • Readable
  • Writable
  • Duplex
  • Transform

Readable

Reading from a file, produces data

Writable

Writing to a file, consumes data

Duplex

Socket connection, bi directional.
TCP connections are a good example.

Transform

Similar top a Duplex stream. Takes input, produces output.

Where have you maybe seen this?


  gulp.src(paths.srcCSS + 'style.scss')
    .pipe(sass().on('error', sass.logError))
    .pipe(postcss(postcss))
    .pipe(minifyCss())
    .pipe(gulp.dest(paths.distCSS));

gulp-sass is an example of a transform stream. It has a readable stream, and a writable stream.

Terminology

  • Buffer
  • High water mark
  • Sink
  • Flush

Buffer

Temporary Storage for data.

High Water Mark

The maximum level of data to be consumed at a time.

Sink

The data source for information to go.

Flush

The act of taking the buffer of data and sending it to the sink.

Streams in Unix

A Stream Input-Output System

Written in 1984, the Unix system was starting to get complex, and the connections to various protocols needed a new system to handle I/O.


  git log | grep Ryan | wc -l

The | character is used to pipe information from the output of one program to the input of another.

The ability pipe data like this allows for great composition!

Standard Input /
Standard Output

Each command in Unix has three types of data streams that are initiated with it, the stdin, stdout, and stderr

In this example the command git log is sending its stdout via the pipe to greps stdin, and so on


  git log | grep Ryan | wc -l

Streams in Node

Stream Handbook - substack

A great resource.

In node, the most common place you might have worked with a stream before is the fs module.

Readable stream in node


  const fs = require('fs');

  fs.createReadStream('mobydick.txt')
    .pipe(process.stdout);

In node we use .pipe() to connect our streams together, think the garden hose idea!

If you have ever used the http module and CreateServer(), you have technically been working with streams!


  http.CreateServer((req,res) =>
    //req is a reablestream
    //res is a writablestream
  });

Writable streams in node


  const fs = require('fs');

  const ws = fs.createWriteStream('newclassic.txt');

  ws.write("The air was chill that morning\n");
  ws.write("you could feel it in the air");
  ws.end();

Transform Streams in node


  fs.createReadStream('mobydick.txt')
    .pipe(upperCaser)
    .pipe(process.stdout)


  const Transform = require('stream').Transform;

  class UpperTransform extends Transform {
    _transform(chunk,enc,next) {
      this.push(chunk.toString().toUpperCase());
      next();
    }
  }

  const upperCaser = new UpperTransform();

A buffer representation of "Hey there!"


  <Buffer 48 65 79 20 74 68 65 72 65 21 >


  class WhaleTransform extends Transform {
    _transform(chunk, enc, next) {
      const string = chunk.toString().replace(/whale/ig,"🐳");
      this.push(string);
      next();
    }
  }


  const whaler = new WhaleTransform();

  fs.createReadStream('../mobydick.txt')
    .pipe(upperCaser)
    .pipe(whaler)
    .pipe(process.stdout);

Now we are getting something that
looks like how Gulp works.

Gulp


  gulp.src(paths.srcCSS + 'style.scss')
    .pipe(sass().on('error', sass.logError))
    .pipe(postcss(postcss))
    .pipe(minifyCss())
    .pipe(gulp.dest(paths.distCSS));

Our super sweet thing...


  fs.createReadStream('mobydick.txt')
    .pipe(upperCaser)
    .pipe(whaler)
    .pipe(process.stdout)

Whaleit 🐳


  npm i whaleit


  const fs = require('fs')
  const Whaleit = require('whaleit');
  const whaler = new Whaleit();

  fs.createReadStream('mobydick.txt')
    .pipe(whaler)
    .pipe(process.stdout)

Duplex Stream

🙏 Substack 🙏


  const net = require('net');

  const server = net.createServer((stream)=>{ 
    stream.pipe(stream);
  });

  server.listen(5000);


  const net = require('net');
  const Whaleit = require('whaleit');

  const whaleit = new Whaleit();

  const server = net.createServer((stream)=>{ 
    stream
      .pipe(whaleit)
      .pipe(stream);
  });

  server.listen(5000);

Performance

Browser Streams

Documentation

https://streams.spec.whatwg.org/

ReadableStream

The response from a fetch request has the body property on it. This is an implementation of a ReadableStream.


  const response = await fetch('./mobydick.txt');

  response.body.pipTo(WritableStream)

With ReadableStreams you can use the
pipeTo to pipe data along.

Piping

  • pipeTo
  • pipeThrough

We can also create a reader from the body.


  const reader = response.body.getReader();

  reader.read().then(({value, done}) => {});

XML to JSON

XML to JSON Parser...


  async function whaler() {
    const response = await fetch('./mobydick.txt');
    const reader = response.body.getReader();

    return pump();
    function pump() {
      return reader.read().then(({value, done}) => {
        if (done) return;
        //Do something with that value
        return pump();
      });
    }
  }


  let string = decoder.decode(value,{stream:true});
  const textElement = document
     .createTextNode(string.replace(/whale/ig,'🐳'))
  document.body.appendChild(textElement);

Writeable Streams


  const write = new WritableStream({
    write(chunk,controller) {
      const decoder = new TextDecoder('utf-8')
      console.log(decoder.decode(chunk))
    }
  });


  const res = await fetch('mobydick.txt');

  res.body
    .pipeTo(write)
    .then(() => {})
    .catch(console.err);

Transform streams

Not yet implemented


  class TransformStream {
    constructor() {
      this.readable = new ReadableStream({
        start(controller) {},
        cancel(reason) {}
      });
      this.writable = new WritableStream({
        start(controller) {},
        write(chunk) {},
        close() {},
        abort(reason) {}
      });
    }
  }

Streams in Service workers

Jake & Surma

Faster Responses

Resources

  • https://www.bell-labs.com/usr/dmr/www/mdmpipe.html
  • https://en.wikipedia.org/wiki/Stream_(computing)
  • https://cseweb.ucsd.edu/classes/fa01/cse221/papers/ ritchie-stream-io-belllabs84.pdf
  • https://dtai.cs.kuleuven.be/projects/ALP/newsletter/ archive_93_96/comment/streams.html
  • https://github.com/substack/stream-handbook
  • https://jakearchibald.com/2016/streams-ftw/
  • http://www.linfo.org/standard_input.html
  • http://www.linfo.org/standard_output.html

Thanks! 👋

@rchristiani

Boatit ⛵️


  npm i boatit


  const fs = require('fs')
  const Whaleit = require('whaleit');
  const Boatit = require('boatit');

  const whaler = new Whaleit();
  const boatit = new Boatit();

  fs.createReadStream('../mobydick.txt')
    .pipe(whaler)
    .pipe(boatit)
    .pipe(process.stdout)

👌