39

Node.js TypeScript #5. Writable streams, pipes, and the process streams

 6 years ago
source link: https://www.tuicool.com/articles/hit/INnie2M
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

In this article, we continue covering streams , since they have a significant role in Node.js development. This time we focus on  writable streams  and pipes. To illustrate how a  writable stream works we implement our simple version of a stream writing to a file. We also provide examples of streams appearing in the Node.js environment in the global  process object: stdin , stdout , and stderr .

Node.js TypeScript Writable Streams

In previous examples, we use the fs . writeFile function that we can create and write to files with:

import * as fs from 'fs';
import * as util from 'util'
 
const writeFile = util.promisify(fs.writeFile);
 
writeFile('./file.txt',
  'Hello world!',
  { encoding: 'utf8' }
)
  .then(() => {
    console.log('File created!');
  })
  .catch(error => console.log(error));

While this works, it is not a solution for every case. Performance when writing big data this way is not that good. Also, using fs . writeFile multiple times on the same file requires waiting for the previous one to finish for it to be safe. In this scenario,  fs . createWriteStream is strongly encouraged. It creates a  writable stream .

To write some data to it, we use the write method.

import * as fs from 'fs';
 
const stream = fs.createWriteStream('./file.txt');
 
stream.write('Hello world!', () => {
  console.log('File created!');
});

To indicate that you intend for no more data to be written to the stream, you can call the end method. You can also provide it with the last chunk of the data.

Since every stream is an instance of EventEmitter that we cover in the second part of the series , the  writable stream also has a set of events. One of them is ‘ finish ‘. Stream emits it after you call the  end function and all the data is transmitted.

import * as fs from 'fs';
 
const stream = fs.createWriteStream('./file.txt');
 
stream.on('finish', () => {
  console.log('All the data is transmitted');
});
 
stream.write('Hello ');
stream.write('world!')

Since we now know both readable and  writable streams, we can combine them. Let’s transfer one big file into another.

import * as fs from 'fs';
 
const readable = fs.createReadStream('./file1.txt');
const writable = fs.createWriteStream('./file2.txt');
 
readable.on('data', chunk => {
  writable.write(chunk);
});

Here we create a readable stream and switch it into the  flowing mode by attaching the ‘ data ‘ event listener. Every chunk we receive we pass to the  writable stream with the  write function. While it looks quite convenient, we can do it even better with  pipes .

Pipes

The pipe function is available for  readable stream. When provided with a writable stream, it attaches it to the readable  stream and pushes data to the  writable stream.

import * as fs from 'fs';
 
const readable = fs.createReadStream('./file1.txt');
const writable = fs.createWriteStream('./file2.txt');
 
readable.pipe(writable);

That simple!

By default, when all data is transmitted, and the readable emits the ‘ end ‘ event, the  writable stream closes with the  writable . end function.

import * as fs from 'fs';
 
const readable = fs.createReadStream('./file1.txt');
const writable = fs.createWriteStream('./file2.txt');
 
writable.on('finish', () => {
  console.log('The end!');
});
 
readable.pipe(writable);

The end!

This behavior can be changed with the { end : false } option.

One note here is that if any error occurs during piping, the writable is not closed automatically, so it might be necessary to track it and close it manually.

Writable stream under the hood

The fs . createWriteStream is not the only way of making a  writable stream. We can create our writable stream to understand it better.

Every writable stream has to implement a  _write method that we call indirectly when we write data to a stream.

import { Writable } from 'stream';
 
const writable = new Writable();
 
writable._write = function(chunk, encoding, next) {
  console.log(chunk.toString());
  next();
};
 
writable.write('Hello world!');
Hello world!

In our simple example, every time we write to the stream, the string is logged to the console. The encoding is a string that might contain the encoding of our data. Calling the  next function indicates that the chunk of data is  flushed , meaning we finished handling it.

The _write method can also be declared by passing it to the  Writable constructor, or used by  extending the Writable class.

Having all this knowledge, let’s implement a simplified version of a stream that writes data to a file.

import * as fs from 'fs';
import * as util from 'util'
import { Writable } from 'stream';
 
const writeFile = util.promisify(fs.writeFile);
 
class WritableFileStream extends Writable {
  path: string;
 
  constructor(path: string) {
    super();
    this.path = path;
  }
 
  _write(chunk: any, encoding: string, next: (error?: Error) => void) {
    writeFile(this.path, chunk)
      .then(() => next())
      .catch((error) => next(error));
  }
}
 
const readable = fs.createReadStream('./file1.txt');
const writable = new WritableFileStream('./file2.txt');
 
readable.pipe(writable);

In the above example, every time we write to our WritableFileStream , we add the data at the end of a file.

Process streams

In the first part of the series , we mention the global process object. Aside from properties like process . argv and  process . execPath it contains  streams that our application can use.

process.stdin

The process . stdin is a readable stream that gathers the data  incoming to our process. Using it we can listen for data in the terminal. As we mention in the previous part of the series, the  readable streams have modes, and the  stdin  stream is in a  paused mode by default. To switch the  stdin stream to  flowing and make application listen for input we need to resume the  stdin . It happens under the hood when attaching the ‘ data ‘ event listener.

let a;
let b;
 
process.stdin.on('data', (data) => {
  if (a === undefined) {
    a = Number(data.toString());
  } else if (b === undefined) {
    b = Number(data.toString());
    console.log(`${a} + ${b} = ${a + b}`);
  }
});

In the example above, we expect two numbers from the terminal and add them together.

ayUNFbB.gif

As you can see in the animation above, the process does not exit after two numbers. The above is due to the fact, that the process . stdin stream is still  flowing . To fix it, we need to pause it.

let a;
let b;
 
process.stdin.on('data', (data) => {
  if(a === undefined) {
    a = Number(data.toString());
  } else if(b === undefined) {
    b = Number(data.toString());
    console.log(`${a} + ${b} = ${a + b}`);
    process.stdin.pause();
  }
});

process.stdout and process.stderr

The process . stdout and  process . stderr are  writable streams. They are used in the  console . log ( ) , and  console . error ( ) and writing to them results in text appearing in the console. We can easily make use of that and, for example, log a file:

import * as fs from 'fs';
 
const readable = fs.createReadStream('./file1.txt');
 
readable.pipe(process.stdout);

The streams differ from other Node.js in terms of asynchronicity. For more details check out the documentation .

Summary

In this article, we covered writable  streams: how to handle files using them and how to combine them with  readable streams thanks to  pipes . We also implemented our writable stream for handling files that included writing the  _write function. We also learned how to pass additional data to our process through the  process . stdin stream and what the  process . stdout and  process . stderr streams are. This knowledge, combined with  readable streams, gives quite a bit of insight into the topic of streams, but there are still some things to be explained in that matter. Stay tuned!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK