stream

What is a Stream?

A stream is like a pipe that can carry data. It's a way to send data from one place to another, one chunk at a time.

Readable Streams

Readable streams can output data that you can read. Like a water pipe that sends water to your faucet.

Writable Streams

Writable streams can take data that you write to them. Like a drain that takes water away from your sink.

Duplex Streams

Duplex streams can both read and write data. Like a two-way pipe.

Why Use Streams?

Streams are useful because they:

  • Handle large amounts of data efficiently: They don't store all the data in memory at once.

  • Support asynchronous operations: They allow you to perform operations on the data as it's being streamed.

  • Are flexible: They can be used in various ways, such as reading files, sending network data, and more.

Node.js Stream Module

The Node.js stream module provides a set of classes and functions for creating and working with streams.

Creating a Readable Stream

const { Readable } = require("stream");

const myStream = new Readable({
  read() {
    this.push("Some data");
    this.push(null); // Signal end of stream
  },
});

myStream.on("data", (chunk) => {
  console.log(`Received data: ${chunk.toString()}`);
});

myStream.on("end", () => {
  console.log("End of stream reached");
});

Creating a Writable Stream

const { Writable } = require("stream");

const myStream = new Writable({
  write(chunk, encoding, callback) {
    console.log(`Received data: ${chunk.toString()}`);
    callback();
  },
});

myStream.write("Some data");
myStream.end();

Creating a Duplex Stream

const { Duplex } = require("stream");

const myStream = new Duplex({
  write(chunk, encoding, callback) {
    this.push(chunk);
    callback();
  },

  read() {
    this.push("Some data");
    this.push(null); // Signal end of stream
  },
});

myStream.on("data", (chunk) => {
  console.log(`Received data: ${chunk.toString()}`);
});

myStream.on("end", () => {
  console.log("End of stream reached");
});

myStream.write("Some data");
myStream.end();

Potential Applications

Streams are useful in many applications, including:

  • File processing: Reading and writing large files efficiently.

  • Network communication: Sending and receiving data over the network.

  • Data processing: Filtering, sorting, and transforming data as it's streamed.

  • Realtime applications: Handling data as it's generated in real-time.


Section 1: Using Existing Streams

Imagine you have a water pipe. You can attach it to a faucet to get water, or to a drain to get rid of water. Streams are like virtual pipes that you can use to handle data.

In Node.js, there are already many built-in streams you can use, like:

  • Readable streams: These are like faucets that constantly provide data. For example, you could use a readable stream to read data from a file or a network connection.

  • Writable streams: These are like drains that take data and do something with it. For example, you could use a writable stream to write data to a file or send data over a network connection.

  • Duplex streams: These are like pipes that can both read and write data. For example, you could use a duplex stream to communicate with a server.

  • Transform streams: These are like filters that can modify data as it flows through them. For example, you could use a transform stream to convert data from one format to another.

To use an existing stream, you can simply create a stream.Readable or stream.Writable object and start writing or reading data to it:

const fs = require("fs");

// Create a readable stream to read data from a file
const readStream = fs.createReadStream("myfile.txt");

// Create a writable stream to write data to a file
const writeStream = fs.createWriteStream("outfile.txt");

// Pipe the data from the read stream to the write stream
readStream.pipe(writeStream);

This code will copy the contents of myfile.txt to outfile.txt.

Section 2: Creating New Types of Streams

Sometimes, the built-in streams don't meet your needs. That's when you can create your own custom streams.

To create a custom stream, you can extend the stream.Readable, stream.Writable, or stream.Duplex classes. For example, you could create a custom stream that reads data from a database:

const { Readable } = require("stream");

class DatabaseStream extends Readable {
  constructor() {
    super();
    // Connect to the database and start reading data
  }

  _read() {
    // Read a chunk of data from the database and push it to the stream
  }
}

const databaseStream = new DatabaseStream();
databaseStream.on("data", (data) => {
  // Do something with the data
});

This code will create a stream that continuously reads data from a database and provides it to any listeners.

Applications in the Real World

Streams are used in a wide variety of applications, including:

  • Logging: Streams can be used to log data to a file or a remote server.

  • Data processing: Streams can be used to process large amounts of data, such as filtering or converting data.

  • Communication: Streams can be used to communicate with other processes or services over a network.

  • Web applications: Streams can be used to send and receive data to and from a web server.


Types of Streams

A stream is a sequence of data that is flowing through a pipeline. In Node.js, there are four main types of streams:

1. Writable Streams

Simplified Explanation: These streams allow you to write data to them, like writing to a file or sending data over a network.

Example:

const fs = require('fs');

const writableStream = fs.createWriteStream('output.txt');

writableStream.write('Hello, world!');

2. Readable Streams

Simplified Explanation: These streams allow you to read data from them, like reading from a file or receiving data over a network.

Example:

const fs = require('fs');

const readableStream = fs.createReadStream('input.txt');

readableStream.on('data', (data) => {
  console.log(data.toString());
});

3. Duplex Streams

Simplified Explanation: These streams combine writable and readable streams, allowing you to both write and read data. An example is a network socket.

Example:

const net = require('net');

const socket = net.createConnection({ port: 3000 });

socket.write('Hello, server!');

socket.on('data', (data) => {
  console.log(data.toString());
});

4. Transform Streams

Simplified Explanation: These streams are like duplex streams with an added superpower - they can transform the data flowing through them as it is written and read.

Example:

const zlib = require('zlib');

const transformStream = zlib.createDeflate();

transformStream.write('Hello, world!');

transformStream.on('data', (data) => {
  console.log(data.toString());
});

Utility Functions

In addition to the four stream types, Node.js also provides several utility functions for working with streams:

  • stream.pipeline(): Creates a pipeline between multiple streams, allowing you to connect them for sequential data processing.

  • stream.finished(): Returns a promise that resolves when a stream finishes processing.

  • stream.Readable.from(): Creates a readable stream from an array or iterator.

  • stream.addAbortSignal(): Adds an AbortSignal to a readable stream, allowing it to be aborted if the signal is triggered.

Potential Applications

Streams are used in various applications, including:

  • File operations: Reading and writing files asynchronously.

  • Network communication: Sending and receiving data over sockets.

  • Data processing: Transforming and filtering data as it flows through a pipeline.

  • Event handling: Subscribing to events and processing data as it is received.


Introduction

The stream/promises API is a set of functions that make it easier to work with streams using promises instead of callbacks.

Simplified Explanation

Imagine you have a water pipe (stream) and you want to let water flow through it. You can use the traditional way of connecting the pipe and waiting for water to come out (callbacks), or you can use the stream/promises API to ask the pipe to promise that it will let water flow and then wait for that promise to be fulfilled (promises).

Topics in Detail

1. Streams

Streams are like pipes that can carry data. They can be used to read or write data, one chunk at a time.

2. Promises

Promises are objects that represent the eventual completion or failure of an asynchronous operation.

3. Callback Functions

Callback functions are functions that are passed as arguments to other functions. They are called when the asynchronous operation completes.

4. stream/promises API

The stream/promises API provides functions for creating, reading, writing, and manipulating streams that return promises.

Code Snippets

// Create a readable stream
const readableStream = fs.createReadStream("file.txt");

// Read the stream using promises
readableStream.read().then((data) => {
  console.log(data);
});
// Create a writable stream
const writableStream = fs.createWriteStream("file.txt");

// Write to the stream using promises
writableStream.write("Hello world").then(() => {
  console.log("Data written to file");
});
// Create a duplex stream (both readable and writable)
const duplexStream = net.createServer();

// Listen for connections using promises
duplexStream.on("connection").then((socket) => {
  console.log("New connection established");
});

Real-World Applications

The stream/promises API can be used in any application that uses streams, such as:

  • File I/O: Reading and writing files

  • Networking: Sending and receiving data over a network

  • Pipelines: Connecting multiple streams together

Potential Applications

  • Server-side programming: Writing web servers that can handle multiple concurrent requests

  • Data processing: Processing large amounts of data in a pipeline

  • File management: Reading and writing files in a more efficient and convenient way


What is stream.pipeline() in Node.js?

Imagine a water pipe with a source (faucet) and a destination (sink). You can attach different filters or devices to the pipe to transform the water in various ways.

stream.pipeline() in Node.js allows you to do the same with data streams. It's a function that connects a series of data sources (streams) and transformations into a single pipeline.

How it works:

  1. Source: Start with a source stream that produces data (like a faucet).

  2. Transforms: Add optional streams that modify or process the data (like filters).

  3. Destination: Specify a destination stream where the final transformed data will be written (like a sink).

Code Example:

const { pipeline } = require('stream');
const fs = require('fs');

const source = fs.createReadStream('input.txt');
const transform = fs.createWriteStream('output.txt');

pipeline(source, transform, (err) => {
  if (err) {
    console.error('Error:', err);
  } else {
    console.log('File copied successfully.');
  }
});

In this example, we create a pipeline to copy a file. Source reads data from the input file, and Transform writes it to the output file.

Potential Applications:

  • Data Processing: Filter, transform, or analyze data from various sources.

  • File Operations: Copy, compress, or archive files.

  • Data Streaming: Send data from one service to another in real-time.

  • Web Applications: Process user input, format responses, and send data to the client.


stream.pipeline(streams[, options])

The stream.pipeline function allows you to easily connect a series of streams in a row, so that the output of one stream becomes the input of the next. This can be useful for performing complex data processing tasks, such as filtering, transforming, and merging data.

The stream.pipeline function takes two or more arguments:

  • The first argument is an array of streams. The first stream in the array will be the source of the data, and the last stream will be the destination.

  • The second argument is an optional options object. The only supported option is signal, which can be used to abort the pipeline if the signal is aborted.

Here is a simple example of how to use stream.pipeline to copy a file from one location to another:

const fs = require("fs");
const { pipeline } = require("stream/promises");

async function copyFile() {
  await pipeline(
    fs.createReadStream("input.txt"),
    fs.createWriteStream("output.txt")
  );

  console.log("File copied successfully.");
}

copyFile();

In this example, the fs.createReadStream function creates a readable stream that reads data from the input.txt file. The fs.createWriteStream function creates a writable stream that writes data to the output.txt file. The pipeline function connects the two streams, so that the data from the input.txt file is copied to the output.txt file.

You can also use stream.pipeline to connect more than two streams. For example, the following code uses a zlib.createGzip stream to compress the data before writing it to the output.txt file:

const fs = require("fs");
const zlib = require("zlib");
const { pipeline } = require("stream/promises");

async function compressFile() {
  await pipeline(
    fs.createReadStream("input.txt"),
    zlib.createGzip(),
    fs.createWriteStream("output.txt.gz")
  );

  console.log("File compressed successfully.");
}

compressFile();

In this example, the zlib.createGzip stream is inserted between the readable stream and the writable stream. The zlib.createGzip stream compresses the data as it passes through, so that the data written to the output.txt.gz file is compressed.

The stream.pipeline function can be a useful tool for performing complex data processing tasks. By chaining together multiple streams, you can create pipelines that can filter, transform, and merge data in a variety of ways.

Here are some real-world applications of stream.pipeline:

  • Data filtering: You can use stream.pipeline to filter data from a source stream based on a certain criteria. For example, you could use a stream.pipeline to filter out all of the lines in a text file that contain a certain word.

  • Data transformation: You can use stream.pipeline to transform data from a source stream into a different format. For example, you could use a stream.pipeline to convert a CSV file into a JSON file.

  • Data merging: You can use stream.pipeline to merge data from multiple source streams into a single destination stream. For example, you could use a stream.pipeline to merge data from two different databases into a single report.

The stream.pipeline function is a powerful tool that can be used to perform a variety of data processing tasks. By chaining together multiple streams, you can create pipelines that can meet your specific needs.


stream.finished(stream[, options])

The finished method in stream module returns a promise that fulfils when the stream is no longer readable or writable.

Simplified Explanation:

Imagine a stream like a water pipe. The finished method checks if the water has stopped flowing in both directions.

Detailed Explanation:

A stream has two ends: a readable end and a writable end. Data flows from the writable end to the readable end. The finished method waits until there is no more data flowing in either direction.

Syntax:

stream.finished(stream[, options])

Parameters:

  • stream: The stream to check.

  • options: An optional object with the following properties:

    • error: Check if there was an error during the stream operation.

    • readable: Check if the stream is no longer readable.

    • writable: Check if the stream is no longer writable.

Return Value:

The method returns a promise that fulfils when the stream is finished.

Usage:

const { finished } = require('stream/promises');
const fs = require('fs');

const rs = fs.createReadStream('file.txt');

finished(rs)
  .then(() => {
    console.log('The file has been fully read.');
  })
  .catch(err => {
    console.error('An error occurred:', err);
  });

Example:

This example checks if a file has been fully read from a readable stream.

Potential Applications:

  • File processing: Check if a file has been fully processed before performing further operations.

  • Data streaming: Ensure that all data has been received before processing it.

  • Error handling: Handle errors that occur during stream operations.


Object Mode in Node.js Streams

What is Object Mode?

By default, Node.js streams work with strings and buffers. However, some streams need to work with other types of data, like objects. This is where "object mode" comes in.

How to Use Object Mode:

To switch a stream into object mode, use the objectMode option when creating the stream:

const { PassThrough } = require('stream');

const stream = new PassThrough({ objectMode: true });

How it Works:

In object mode, instead of receiving strings or buffers, the stream receives JavaScript objects. The stream can then process and emit those objects.

stream.write({ name: 'Alice', age: 25 });
stream.write({ name: 'Bob', age: 30 });

Example:

Let's create a stream that filters out objects based on their age:

const { Transform } = require('stream');

const filterStream = new Transform({
  objectMode: true,

  transform(data, encoding, callback) {
    if (data.age > 25) {
      this.push(data);
    }
    callback();
  }
});

stream.pipe(filterStream).pipe(process.stdout);

This stream will output objects with ages greater than 25.

Real-World Applications:

Object mode streams are useful in many situations:

  • Data processing: Filtering, sorting, or transforming objects.

  • Data exchange: Communicating between different components or applications.

  • Object serialization: Converting objects to and from a stream-compatible format.

Conclusion:

Object mode allows Node.js streams to work with JavaScript objects, opening up possibilities for advanced data handling and processing.


Buffering in Node.js Streams

What is Buffering?

When you transfer data from one place to another, it's like filling a bucket. If the bucket is too small, you'll have to stop and empty it before you can continue. This is called "buffering."

In Node.js, streams are used to transfer data. Streams have a "buffer," which is like a bucket that stores data until it can be transferred.

Readable Streams

Readable streams receive data from a source, like a file or a network connection. They have a "read buffer" that stores data until it can be read by the user.

When the read buffer reaches a certain size, called the "highWaterMark," the stream stops receiving data. This prevents the user from receiving too much data at once and overloading their system.

Writable Streams

Writable streams send data to a destination, like a file or a network connection. They have a "write buffer" that stores data until it can be sent.

When the write buffer reaches the highWaterMark, the stream stops receiving data. This prevents the destination from being overwhelmed with too much data.

Duplex and Transform Streams

Duplex streams are both readable and writable, so they have two buffers: a read buffer and a write buffer. This allows them to transfer data in both directions independently.

Transform streams are a type of duplex stream that can modify data as it passes through.

HighWaterMark

The highWaterMark is a threshold that determines when a stream should stop buffering data. It's set when the stream is created and can be any number of bytes or objects.

Real-World Implementations

Buffering is used in many real-world applications, such as:

  • Streaming video: The video player buffers the video data to ensure smooth playback.

  • File downloads: The download manager buffers the file data to prevent interruptions due to network fluctuations.

  • Chat applications: The chat application buffers messages to prevent them from being lost if the network connection drops.

Example Code

Here's an example of a readable stream that reads data from a file:

const fs = require("fs");

const readableStream = fs.createReadStream("test.txt");

readableStream.on("data", (chunk) => {
  // Process the data chunk
});

And here's an example of a writable stream that writes data to a file:

const fs = require("fs");

const writableStream = fs.createWriteStream("test.txt");

writableStream.write("Hello, world!");

Conclusion

Buffering is an important part of Node.js streams. It allows streams to transfer data smoothly and efficiently by preventing either the source or destination from being overwhelmed.


Node.js Streams Module

What are streams?

Streams are like pipes that let data flow through them. They can be used to read data from a file, from a network socket, or from any other source that produces data over time. Streams can also be used to write data to a file, to a network socket, or to any other destination that can accept data.

Readable streams

Readable streams emit 'data' events when there is new data available to be read. You can listen for these events and then read the data from the stream. Here is an example of how to use a readable stream to read data from a file:

const fs = require("fs");

const readableStream = fs.createReadStream("input.txt");

readableStream.on("data", (chunk) => {
  console.log(chunk.toString());
});

Writable streams

Writable streams emit 'drain' events when they are ready to receive more data. You can listen for these events and then write data to the stream. Here is an example of how to use a writable stream to write data to a file:

const fs = require("fs");

const writableStream = fs.createWriteStream("output.txt");

writableStream.on("drain", () => {
  writableStream.write("Hello world!\n");
});

Duplex streams

Duplex streams are both readable and writable. This means that you can both read data from them and write data to them. Here is an example of how to use a duplex stream to create a simple chat application:

const net = require("net");

const duplexStream = net.createServer();

duplexStream.on("connection", (socket) => {
  socket.on("data", (data) => {
    socket.write(data);
  });
});

Transform streams

Transform streams are a type of duplex stream that can modify the data that flows through them. Here is an example of how to use a transform stream to convert a stream of uppercase letters to a stream of lowercase letters:

const Transform = require("stream").Transform;

const transformStream = new Transform();

transformStream._transform = (chunk, encoding, callback) => {
  callback(null, chunk.toString().toLowerCase());
};

Real-world applications

Streams are used in a wide variety of real-world applications, including:

  • Reading and writing files

  • Communicating over networks (HTTP, FTP, etc.)

  • Parsing and generating data (JSON, XML, etc.)

  • Compressing and decompressing data (gzip, bzip2, etc.)

Potential applications

The potential applications for streams are endless. They can be used to create any type of application that needs to read or write data over time.


Writable Streams in Node.js

What are Writable Streams?

Imagine you're writing a letter. The blank paper you're writing on is like a Writable Stream. It's where you send the data you want to write.

Examples of Writable Streams:

  • Sending data to a website through an HTTP request.

  • Writing data to a file on your computer.

  • Adding data to a zipped file.

How to Use Writable Streams:

You can write data to a stream like this:

const myStream = getWritableStreamSomehow();
myStream.write('some data');

When you're done writing, you can signal the end of the stream like this:

myStream.end('done writing data');

Real-World Examples:

1. Writing to a File:

const fs = require('fs');

const myStream = fs.createWriteStream('my-file.txt');
myStream.write('Hello world!');
myStream.end();

2. Sending Data to a Website:

const http = require('http');

const request = http.request('https://example.com', (response) => {
  response.on('data', (data) => {
    console.log(data.toString());
  });
});

request.write('My data');
request.end();

Potential Applications:

  • Saving user input to a database.

  • Sending logs to a remote server.

  • Creating backups of important files.


Writable Stream

In streams, data flows in one direction. A writable stream is a stream that you can write data to. For example, you could use a writable stream to write data to a file or to a network socket.

Creating a Writable Stream

To create a writable stream, you use the fs.createWriteStream() function. This function takes the path to the file that you want to write to as its first argument. The second argument is an object that contains options for the stream.

For example, the following code creates a writable stream to the file myfile.txt:

const fs = require('fs');

const writableStream = fs.createWriteStream('myfile.txt');

Writing Data to a Writable Stream

To write data to a writable stream, you use the write() function. The write() function takes the data that you want to write as its first argument. The second argument is a callback function that is called when the data has been written.

For example, the following code writes the string "Hello world!" to the writable stream:

writableStream.write('Hello world!', (err) => {
  if (err) {
    console.error(err);
  }
});

Ending a Writable Stream

When you are finished writing data to a writable stream, you should call the end() function. The end() function tells the stream that there is no more data to write.

For example, the following code ends the writable stream:

writableStream.end();

Listening for Events on a Writable Stream

You can listen for events on a writable stream using the on() method. The on() method takes the name of the event that you want to listen for as its first argument. The second argument is a callback function that is called when the event occurs.

For example, the following code listens for the finish event on the writable stream:

writableStream.on('finish', () => {
  console.log('All data has been written to the file.');
});

Real-World Applications of Writable Streams

Writable streams are used in a variety of real-world applications, including:

  • Logging: Writable streams can be used to write log messages to a file or to a remote server.

  • Data storage: Writable streams can be used to write data to a database or to a file system.

  • Data processing: Writable streams can be used to process data from a variety of sources, such as sensors or web services.


Event: 'close'

Simplified explanation:

When a stream and all the things it needs to work (like a file or a connection) are no longer being used, the 'close' event tells you that the stream is finished and done. It's like when you turn off a faucet, the water stops flowing and the faucet is empty.

Detailed explanation:

  • Stream: Think of a stream like a pipe that carries data.

  • Underlying resources: These are the things that make the stream work, like a file or a network connection.

  • Closed: When a stream is closed, it means that the pipe and everything else connected to it is no longer being used.

  • No more events: Once a stream is closed, there won't be any more events for the stream.

  • Writable stream: A stream that sends data to a destination (like a file or a server).

  • emitClose option: When you create a Writable stream, you can choose to have it emit the 'close' event.

Real-world example:

A simple Writable stream that logs data to a file:

const fs = require("fs");
const stream = fs.createWriteStream("log.txt");

stream.write("This is a log message.\n");

stream.on("close", () => {
  console.log("The log file has been closed.");
});

stream.end();

Potential applications:

  • Logging: Writing messages to a file or a database.

  • Data transfer: Sending data from one location to another.

  • File processing: Reading and writing files.


Simplified Explanation

When you write data to a stream, the stream has a limited capacity. If you try to write more data than the stream can handle, the stream will pause and emit a 'drain' event. This event means that the stream is ready to receive more data.

Example

The following code snippet shows how to write data to a stream and listen for the 'drain' event:

const fs = require("fs");

const writeStream = fs.createWriteStream("my-file.txt");

writeStream.on("drain", () => {
  console.log("The stream is ready to receive more data.");
});

// Write some data to the stream
writeStream.write("Hello, world!\n");

// The stream will pause after writing this data
writeStream.write("This is a lot of data.\n");

// The stream will resume after the 'drain' event is emitted
writeStream.write("The stream is ready to receive more data.");

Real-World Applications

The 'drain' event is useful in any situation where you need to control the flow of data to a stream. For example, you could use the 'drain' event to:

  • Prevent a stream from getting overwhelmed with data

  • Throttle the flow of data to a stream

  • Pause and resume a stream based on certain conditions

Improved Code Example

The following code snippet shows an improved version of the previous example:

const fs = require("fs");

const writeStream = fs.createWriteStream("my-file.txt", {
  highWaterMark: 1024, // 1KB buffer
});

writeStream.on("drain", () => {
  console.log("The stream is ready to receive more data.");
});

// Write some data to the stream
writeStream.write("Hello, world!\n");

// The stream will pause after writing this data
writeStream.write("This is a lot of data.\n");

// The stream will resume after the 'drain' event is emitted
writeStream.end("The stream is ready to receive more data.");

In this example, we have specified a highWaterMark option for the stream. This option specifies the maximum amount of data that the stream will buffer before pausing. In this case, we have set the highWaterMark to 1KB. This means that the stream will pause after writing 1KB of data.

By using the highWaterMark option, we can control the flow of data to the stream and prevent it from getting overwhelmed.


Simplified Explanation of the 'error' Event in Node.js Streams

Event: 'error'

  • What it does: When there's a problem writing to or piping data through the stream, this event is triggered.

  • Arguments: The listener function receives a single argument:

    • Error: This is the error object that describes what happened.

Stream Behavior After 'error'

  • Auto-Destroy: By default, when the 'error' event is emitted, the stream is closed automatically. However, you can prevent this by setting the autoDestroy option to false when creating the stream. If you choose not to auto-destroy, you'll need to manually close the stream after handling the error.

  • No Further Events: After the 'error' event, no other events (except for 'close') should be emitted. This means the stream is considered broken.

Real-World Example

Consider a stream that writes data to a file:

const fs = require("fs");
const stream = fs.createWriteStream("my-file.txt");

stream.on("error", (error) => {
  // Handle the error (e.g., log it or show a message to the user)
  console.error("Error writing to file:", error.message);
  // If auto-destroy is set to false, manually close the stream
  if (!stream.autoDestroy) {
    stream.end();
  }
});

Applications in the Real World

  • Error handling in file operations

  • Error reporting in network communication

  • Stream processing errors

  • Handling exceptions in data pipelines


Simplified Explanation of 'finish' Event

The 'finish' event in Node.js's streams module is triggered when the stream.end() method has been called and all the data in the stream has been successfully written to the underlying system. This event indicates that no more data will be written to the stream.

Example

Consider a simple program that writes a series of strings to a writable stream:

// Create a writable stream
const stream = require("stream").Writable();

// Write some data to the stream
stream.write("Hello, world!");
stream.write("Goodbye, world!");

// End the stream (triggering the 'finish' event)
stream.end();

// Listen for the 'finish' event
stream.on("finish", () => {
  console.log("All data has been written to the system.");
});

When you run this program, the following output will be displayed:

All data has been written to the system.

Real-World Applications

The 'finish' event is useful in various scenarios:

  • Ensuring data completeness: By listening to the 'finish' event, you can be sure that all data has been successfully written to the destination before performing subsequent tasks.

  • Graceful shutdown: You can use the 'finish' event to gracefully shut down a stream-based process or application. For example, if you're writing data to a file, you can listen for the 'finish' event to close the file handle and release system resources.

Improved Example

Here's a more practical example:

// Create a writable stream to write to a file
const fs = require("fs");
const stream = fs.createWriteStream("data.txt");

// Write some data to the stream asynchronously
stream.write("Hello, world!", (err) => {
  if (err) {
    console.error("Error writing to file:", err);
    return;
  }

  // Trigger the 'finish' event by manually ending the stream
  stream.end();
});

// Handle the 'finish' event to close the file
stream.on("finish", () => {
  console.log("All data has been written to the file.");
});

In this example:

  • We create a writable stream using fs.createWriteStream().

  • We write some data to the stream asynchronously using the stream.write() method.

  • We manually end the stream using stream.end() to trigger the 'finish' event.

  • We listen for the 'finish' event to close the file handle, ensuring that all data has been written to the file before continuing.


Event: 'pipe'

Simplified Explanation:

When you connect a readable stream (like a file reader) to a writable stream (like a file writer) using the stream.pipe() method, the 'pipe' event is emitted on the writable stream. This event indicates that the readable stream is now sending data to the writable stream.

In-Depth Explanation:

  • Readable Stream: A stream that reads data from a source, like a file or network connection.

  • Writable Stream: A stream that writes data to a destination, like a file or network connection.

When you use stream.pipe(), you're essentially creating a pipeline between two streams. The readable stream sends data to the writable stream, which writes it to its destination.

Code Snippet:

// Create a readable stream
const reader = fs.createReadStream("file.txt");

// Create a writable stream
const writer = fs.createWriteStream("copy.txt");

// Connect the streams using `pipe()`
reader.pipe(writer);

// Listen for the 'pipe' event on the writable stream
writer.on("pipe", (src) => {
  console.log("Readable stream is now piping data.");
  console.log(`Source stream: ${src}`); // This will print `reader`
});

Real-World Applications:

  • Copying files

  • Compressing data

  • Streaming video and audio

  • Transforming data (e.g., converting JSON to CSV)


Event: 'unpipe'

Explanation:

Imagine a water pipe connecting two tanks, one with water (the readable stream) and the other empty (the writable stream). When you open the valve, water flows from the readable tank to the writable tank. Now, picture someone closing the valve (calling stream.unpipe()), stopping the flow of water. The 'unpipe' event is like an alarm that sounds when the valve is closed.

Simplified Explanation:

The 'unpipe' event is triggered when something stops sending data to a writable stream. It's like when you disconnect a USB cable from your computer or turn off a Wi-Fi router.

Real-World Example:

  • When you load a webpage in your browser, the browser sends a request to a server. The server responds by sending data back to the browser. The browser's Readable stream receives and processes the data, while the browser's Writable stream displays it on the web page. When you close the webpage, the 'unpipe' event is triggered, indicating that the data flow has stopped.

Code Example:

const fs = require("fs");

const readableStream = fs.createReadStream("input.txt");
const writableStream = fs.createWriteStream("output.txt");

readableStream.pipe(writableStream); // Connect the streams

// Stop the flow of data
readableStream.unpipe(writableStream);

writableStream.on("unpipe", (src) => {
  console.log("Data flow stopped from", src);
});

Potential Applications:

  • Monitoring data flow in real-time systems

  • Detecting when data sources or destinations become unavailable

  • Debugging data pipelines


writable.cork()

The writable.cork() method in Node.js is used to control how data is sent to the underlying destination. By default, data is sent out immediately, but cork() can be used to buffer multiple writes and send them out all at once. This can help to improve performance when writing large amounts of data, as it reduces the number of system calls that need to be made.

How does writable.cork() work?

When writable.cork() is called, all writes that are made to the stream will be buffered in memory. This means that no data will be sent out to the underlying destination until writable.uncork() is called.

Once writable.uncork() is called, all of the buffered data will be sent out to the underlying destination at once. This can be done using a single system call, which is much more efficient than making multiple system calls for each write.

When should I use writable.cork()?

writable.cork() should be used when you are writing large amounts of data to a stream and you want to improve performance. It is especially useful when you are writing to a destination that is slow to respond, such as a network socket.

Here is a simple example of how to use writable.cork():

const fs = require("fs");

const writableStream = fs.createWriteStream("output.txt");

writableStream.cork();
for (let i = 0; i < 100000; i++) {
  writableStream.write("Hello world!\n");
}
writableStream.uncork();

Potential applications in real world

  • Writing logs to disk: When writing logs to disk, it is often important to ensure that all logs are written to disk in order. This can be done by using writable.cork() to buffer all writes and then uncorking them once all logs have been written.

  • Sending data to a network socket: When sending data to a network socket, it is often important to send all data as quickly as possible. This can be done by using writable.cork() to buffer all writes and then uncorking them once all data has been sent.


Writable.destroy() Method in Node.js Streams

writable.destroy([error]) is a method used to destroy a writable stream in Node.js.

Explanation:

Imagine a stream as a pipe carrying data from one place to another. A writable stream is a pipe that can be written to. By calling writable.destroy(), you're effectively breaking the pipe and preventing any further data from being written to it.

If you provide an error argument, it will be emitted as an 'error' event on the stream. Additionally, a 'close' event will be emitted, unless you set the emitClose property to false.

After calling writable.destroy(), the stream is considered "destroyed". Any subsequent attempts to write to it will result in an ERR_STREAM_DESTROYED error.

Real-World Example:

Consider the following code:

const { Writable } = require("stream");

const myStream = new Writable();

myStream.on("pipe", (source) => {
  // Do something when data is piped into the stream
});

myStream.on("unpipe", (source) => {
  // Do something when data is unpiped from the stream
});

myStream.on("error", (error) => {
  // Handle errors that occur during writing
});

myStream.destroy();

In this example, a writable stream is created and various event listeners are added to handle different scenarios. When the stream is destroyed, the 'error', 'close', 'pipe', and 'unpipe' events will no longer be emitted.

Applications in Real World:

The writable.destroy() method is useful in various scenarios:

  • Error handling: If an error occurs while writing data to a stream, you can destroy the stream to prevent further data loss.

  • Resource cleanup: When you're finished with a stream and want to release any associated resources, you can destroy it.

  • Data validation: If you're validating data as it's being written to a stream, you can destroy the stream if the data is invalid.

  • Premature termination: If you need to stop writing data to a stream before it's complete, you can destroy it to terminate the process.

Simplified in Plain English:

  • Imagine a water pipe that you can put water into.

  • writable.destroy() is like smashing that pipe into pieces, so no more water can go through it.

  • If you want, you can have the pipe tell you it's broken ("error") and that it's closing ("close").

  • Once the pipe is broken, you can't put any more water into it.


writable.closed:

Imagine a pipe that you use to transport water. When you're done using the pipe, you close it so that water doesn't keep flowing unnecessarily. In the case of streams in Node.js, writable.closed tells you whether the "pipe" is closed or not.

Simplified Explanation:

Just like how closing a pipe stops water from flowing, closing a writable stream stops data from being written to it. When writable.closed is true, it means the stream has been closed.

Real-World Implementation:

Let's say you're writing a program that needs to log messages to a file. You could use a writable stream to write the messages:

const fs = require("fs");
const writableStream = fs.createWriteStream("logfile.txt");

writableStream.write("Hello, world!");
writableStream.end(); // Closes the stream

console.log(writableStream.closed); // Prints 'true'

Potential Applications:

  • Controlling the flow of data in streams

  • Ensuring that data is written correctly before closing a stream

  • Monitoring the status of writable streams


writable.destroyed

Simplified Explanation:

After you call writable.destroy(), it will set writable.destroyed to true to signify that the stream is no longer usable.

Detailed Explanation:

The writable.destroyed property is a boolean value that indicates whether the writable stream has been destroyed. When you call writable.destroy(), the stream is marked as destroyed and writable.destroyed is set to true. This means that the stream can no longer be used for writing data.

Real-World Example:

Let's say you have a writable stream that you use to write data to a file. You decide that you no longer need to write to the file, so you call writable.destroy() to close the stream and release any resources that it was using. Once you call writable.destroy(), writable.destroyed will be set to true and the stream can no longer be used.

Potential Applications:

Here are some potential applications of the writable.destroyed property:

  • Checking if a writable stream has been destroyed before trying to write data to it.

  • Closing a writable stream after it has been used to write all of the desired data.

  • Releasing resources that were being used by a writable stream.

Improved Code Snippet:

Here is an improved version of the code snippet that you provided:

const { Writable } = require("stream");

const myStream = new Writable();

console.log(`Is my stream destroyed? ${myStream.destroyed}`); // false
myStream.destroy();
console.log(`Is my stream destroyed? ${myStream.destroyed}`); // true

This code snippet first creates a writable stream and then logs whether or not the stream is destroyed. Then, it calls writable.destroy() and logs again whether or not the stream is destroyed. As you can see, the output shows that the stream is not destroyed before calling writable.destroy() and is destroyed after calling writable.destroy().


writable.end([chunk[, encoding]][, callback])

  • Purpose: Signals the end of the data writing process for a writable stream.

  • Arguments:

    • chunk: (Optional) A final chunk of data to write before closing the stream.

    • encoding: (Optional) The encoding of the chunk if it's a string.

    • callback: (Optional) A function to be called when the stream is fully closed.

  • Return Value: The writable stream itself.

Simplified Explanation

When writing data to a writable stream, you can indicate that you're done by calling the end() method. This tells the stream that it should stop accepting new data and begin the process of closing itself.

Real-World Example

Let's say you're writing data to a file using a file write stream. The following code creates a file stream and writes the text "Hello, world!" to it:

const fs = require("fs");

const fileStream = fs.createWriteStream("example.txt");
fileStream.write("Hello, world!");

// Close the stream once the data is written
fileStream.end();

When the end() method is called, the stream will close itself and flush any remaining data to the file.

Potential Applications

The end() method is used in various real-world applications, including:

  • File writing: Closing a file after writing data to it.

  • Data processing: Ending a stream of data after filtering or transforming it.

  • Network communication: Closing a socket or HTTP connection after sending data.


writable.setDefaultEncoding(encoding)

  • encoding {string} The new default encoding

The default encoding for the stream. See stream.Transform for more information.

Example

const { Writable } = require("stream");
const writable = new Writable();

// set the default encoding to utf8
writable.setDefaultEncoding("utf8");

// write some data to the stream
writable.write("Hello, world!");

Potential Applications

  • Setting the default encoding for a stream can be useful in a variety of situations. For example, you might want to set the default encoding to utf8 if you are writing a stream to a file that will be read by a web browser. This will ensure that the data in the file is properly encoded for the web browser to display.


writable.uncork()

Unveiling the Buffering Mechanism

Imagine a stream as a pipe carrying water. writable.cork() is like placing a cork in the pipe, preventing water from flowing out. writable.uncork() removes the cork, allowing the water to resume flowing.

How it Works

  • When you call writable.cork(), the stream stops sending data to its destination.

  • Data written to the stream while it's corked is stored in a buffer.

  • When you call writable.uncork(), all the buffered data is flushed out and sent to the destination.

  • You must call writable.uncork() the same number of times as you called writable.cork().

Example

Pretend you're writing a word processor that sends text data to a file.

const stream = writable();

// Cork the stream to buffer data while typing
stream.cork();

// Write some text
stream.write('Hello, world!');

// Uncork the stream to send the buffered data to the file
process.nextTick(() => stream.uncork());

Real-World Applications

  • Batching data: Collect data in a buffer and send it in larger chunks to improve efficiency.

  • Preventing data loss: If the destination is slow or unavailable, corking the stream prevents data from being lost.

  • Optimizing performance: By controlling when data is sent, you can ensure that the destination is ready to handle the incoming data.


Writable Stream

A writable stream is a data stream that allows you to write data to it.

writable.writable

The writable.writable property is a boolean that indicates whether the stream is ready to receive data. It is true if the stream is in a state where you can call [writable.write()][stream-write] to write data to it, and false otherwise.

When is writable.writable false?

writable.writable becomes false in the following situations:

  • The stream has been destroyed.

  • The stream has ended.

  • The stream has encountered an error.

Real-world example

In the following example, we create a writable stream and write some data to it. We then check the writable.writable property to see if the stream is still writable.

const fs = require("fs");

// Create a writable stream
const writableStream = fs.createWriteStream("output.txt");

// Write data to the stream
writableStream.write("Hello, world!");

// Check if the stream is still writable
console.log(writableStream.writable); // true

Potential applications

Writable streams can be used in a variety of applications, including:

  • Logging data to a file

  • Writing data to a database

  • Sending data over a network


Topic: writable.writableAborted

Explanation:

Imagine you have a water hose that you're using to fill a bucket. If you suddenly turn off the hose or the hose breaks, the bucket might not be completely filled.

In the same way, when you're writing data to a stream in Node.js, the stream might be stopped before all the data is written. The writable.writableAborted property tells you if this has happened.

Simplified Example:

const fs = require("fs");

const writeStream = fs.createWriteStream("file.txt");

// Write some data to the stream
writeStream.write("Hello World!");

// Check if the stream was aborted
if (writeStream.writableAborted) {
  console.log("The stream was aborted before it could finish writing.");
}

Real-World Applications:

The writable.writableAborted property can be useful in error handling situations. For example, you could use it to log an error message if the stream was aborted. You could also use it to clean up any resources that were allocated for the stream.

Improved Code Example:

const fs = require("fs");

const writeStream = fs.createWriteStream("file.txt");

try {
  // Write some data to the stream
  writeStream.write("Hello World!");
} catch (err) {
  // Check if the stream was aborted
  if (writeStream.writableAborted) {
    console.log("The stream was aborted before it could finish writing: ", err);
  } else {
    console.log("An error occurred while writing to the stream: ", err);
  }
} finally {
  // Close the stream to release resources
  writeStream.close();
}

Writable streams: writableEnded

  • writableEnded is a property of writable streams.

  • It is a boolean value that indicates whether the [writable.end()][] function has been called.

  • This property does not indicate whether the data has been flushed, for this use [writable.writableFinished][] instead.

Example:

const fs = require('fs');
const writableStream = fs.createWriteStream('file.txt');

writableStream.on('finish', () => {
  console.log('All data has been flushed to disk');
});

writableStream.on('end', () => {
  console.log('The end() function has been called');
});

writableStream.end();

Output:

The end() function has been called
All data has been flushed to disk

Understanding writable.writableCorked Property

What is writableCorked?

When writing data to a writable stream, sometimes it's useful to temporarily pause the stream's internal buffering. This is achieved by "corking" the stream. The writableCorked property represents the count of times the stream has been corked.

How Corking Works

Imagine a water pipe where the flow of water is regulated by a water faucet.

  • Uncorking the stream is like opening the faucet, allowing water to flow freely.

  • Corking the stream is like closing the faucet partially, restricting the flow of water.

Practical Applications

  • Data Buffering: Corking a stream can be useful when you want to buffer a large amount of data before sending it out. This can improve efficiency and reduce performance overhead.

  • Flow Control: You can cork a stream to prevent overwhelming downstream consumers that may not be able to keep up with the flow of data.

  • Error Handling: In case of an error, you can cork the stream to pause the write operation while you handle the issue.

Code Example

Consider a simple stream where we want to buffer chunks of data before sending them out:

const { Writable } = require("stream");

const stream = new Writable({
  write(chunk, encoding, callback) {
    // Buffer the data in an array
    this.bufferedData.push(chunk);

    // Check if we've reached a certain threshold
    if (this.bufferedData.length > 10) {
      // If so, cork the stream to pause buffering
      this.cork();
    }

    callback();
  },

  // Uncork the stream once the buffer is emptied
  drain() {
    this.uncork();
  },
});

stream.on("finish", () => {
  // When the stream finishes, write out the buffered data
  console.log(stream.bufferedData.join(""));
});

stream.write("Hello");
stream.write("World");
stream.write("!");
stream.end();

In this example, when the writableCorked property reaches a value greater than 0, the stream is considered corked and data buffering is paused. Once the buffer is emptied (in the drain event), the stream is uncorked, and data writing resumes.


Simplified and Detailed Explanation:

The writable.errored property in Node.js's Stream module provides access to the error object if the stream has been destroyed due to an error.

Property Details:

  • Type: Error

  • Usage: Read-only

Real-World Implementation:

const fs = require("fs");

const writableStream = fs.createWriteStream("output.txt");

writableStream.on("error", (err) => {
  console.log("Error in writable stream:", err);
});

writableStream.write("Hello, world!");
writableStream.write("This is a test");
writableStream.end();

Potential Applications:

  • Error Handling: The writable.errored property can be used to handle errors that occur during the writing process. For example, it can be used to log the error, send a notification, or trigger a recovery mechanism.

  • Debugging: The property provides insight into the cause of stream destruction, making it valuable for debugging purposes.


writable.writableFinished

  • Type: boolean

  • Description:

    This property is set to true just before the ['finish'][] event is emitted. It indicates that the writable stream has finished writing all of its data.

  • Example:

const fs = require("fs");

const writableStream = fs.createWriteStream("output.txt");

writableStream.on("finish", () => {
  console.log("All data has been written to the file.");
});

writableStream.end();
  • Real-World Applications:

    The writableFinished property is useful for determining when all of the data has been written to a file or other destination. This can be important for ensuring that all of the data has been successfully saved before closing the stream or performing other operations.


writable.writableHighWaterMark

  • Definition: The high-water mark value passed when creating this Writable stream.

  • Simplified Explanation:

    A "writable stream" is like a pipe that can receive data and write it to a destination (e.g., a file, console, etc.).

    When creating a writable stream, you can specify a "high-water mark". This value tells the stream how much data it can hold in memory before it starts to slow down or stop accepting new data.

    The writableHighWaterMark property simply returns the high-water mark value that was set when the stream was created.

  • Real-World Example:

    Suppose you're writing data to a file and want to ensure that the file is flushed to disk periodically to avoid losing data if the program crashes. You could set a high-water mark of 1 MB to indicate that once the amount of data in the stream reaches 1 MB, it should be written to the file.

  • Code Example:

    const { Writable } = require("stream");
    
    // Create a writable stream with a high-water mark of 1 MB
    const writableStream = new Writable({
      highWaterMark: 1024 * 1024,
    });
    
    // Listen for the 'drain' event, which is emitted when the stream is no longer holding any data
    writableStream.on("drain", () => {
      console.log("Data has been successfully written to the file.");
    });
  • Potential Applications:

    • Controlling the flow of data in a stream to prevent buffer overflows or underflows.

    • Optimizing performance by writing data to disk or other destinations in chunks rather than all at once.


Writable stream: A stream which is used to write data to a destination.

writableLength: Indicates the number of bytes or objects in the queue that are ready to be written.

highWaterMark: A threshold that defines when the stream starts to emit the 'drain' event. When the writableLength exceeds the highWaterMark, the stream stops writing and waits for the 'drain' event to be emitted before continuing.

Real-world implementations and potential applications:

  1. Writing to a file:

    • Create a writable stream using fs.createWriteStream().

    • Use stream.write() to write data to the stream.

    • The writableLength property can be used to monitor the progress of the write operation and adjust the write rate accordingly.

    • The 'drain' event can be used to pause the write operation when the stream's buffer is full and resume it when the buffer is partially drained.

  2. Sending data over a network:

    • Create a writable stream using net.createConnection() or http.request().

    • Use stream.write() to send data over the network.

    • The writableLength property can be used to prevent the stream from overloading the network and causing packet loss.

    • The 'drain' event can be used to throttle the data transmission rate and prevent the network from becoming congested.


writable.writableNeedDrain

This property is a boolean that indicates whether the stream's buffer is full and the stream will emit a 'drain' event.

Simplified Explanation:

Imagine a stream as a pipe. When you write data to the stream, it goes into a buffer, like a tank. If the tank gets too full, the stream will stop writing data and emit a 'drain' event. This signals that the buffer has been emptied and the stream is ready to receive more data.

Code Example:

const fs = require("fs");

const writable = fs.createWriteStream("output.txt");

writable.on("drain", () => {
  // The buffer has been emptied and the stream is ready to receive more data.
});

writable.write("Hello, world!");

Real-World Applications:

  • File writing: When writing a large file to disk, the stream's buffer can fill up quickly. The 'drain' event can be used to pause writing until the buffer is emptied, preventing data loss.

  • Network communication: When sending data over a network, the network buffer can become congested. The 'drain' event can be used to pause sending data until the network is ready to receive more.


Writable stream

A writable stream is a destination for data that can be written to. It's like a pipe that you can send data through. Here's a simplified explanation:

  • Imagine you have a water hose and a garden. The hose is the writable stream, and the garden is the destination where the water (data) goes.

  • When you turn on the hose, water starts flowing. In the same way, when you write data to a writable stream, it starts flowing to the destination.

  • The destination can be anything, such as a file, another stream, or a device. It's like the garden receiving the water from the hose.

Writable object mode

In Node.js, streams have a property called objectMode. This property determines whether the stream can handle objects as data or only strings.

  • Object mode: If objectMode is true, the stream can handle objects as data. This means you can write objects to the stream, and they will be passed to the destination.

  • Non-object mode: If objectMode is false (the default), the stream can only handle strings as data. If you try to write an object to a stream in non-object mode, it will throw an error.

Potential applications in real world

Writable streams are used in many applications, such as:

  • Logging: Writing log messages to a file or console.

  • Data processing: Transforming data as it flows through a stream.

  • Data storage: Writing data to a database or file system.

  • Network communication: Sending data over a network connection.

Real world example

Here's an example of using a writable stream to log messages to a file:

const fs = require("fs");
const { Writable } = require("stream");

// Create a writable stream to the file 'log.txt'
const writableStream = fs.createWriteStream("log.txt");

// Write a message to the stream
writableStream.write("Hello, world!");

Simplified code snippet

Here's a snippet that demonstrates how to check if a writable stream is in object mode:

const { Writable } = require("stream");

// Create a writable stream in object mode
const writableStream = new Writable({
  objectMode: true,
});

// Check if the stream is in object mode
if (writableStream.writableObjectMode) {
  // The stream is in object mode
}

// Write an object to the stream
writableStream.write({ foo: "bar" });

Simplified explanation of writable.write() method in Node.js

The writable.write() method is used to write data to a stream. It takes three parameters:

  • chunk: The data to be written. This can be a string, a Buffer, a Uint8Array, or any other JavaScript value in object mode streams.

  • encoding (optional): The encoding of the data, if chunk is a string. The default encoding is 'utf8'.

  • callback (optional): A callback function that will be called once the data has been written.

The writable.write() method returns a boolean value:

  • true if the data was successfully written to the stream.

  • false if the stream is full and cannot accept any more data.

If the writable.write() method returns false, you should stop writing data to the stream until the 'drain' event is emitted. The 'drain' event is emitted when the stream is ready to accept more data.

Here is an example of how to use the writable.write() method:

const fs = require("fs");

const writableStream = fs.createWriteStream("output.txt");

writableStream.write("Hello, world!");

writableStream.on("drain", () => {
  console.log("Data has been written to the file.");
});

This code creates a writable stream and writes the string 'Hello, world!' to it. The 'drain' event is emitted when the data has been written to the file.

Applications in the real world

The writable.write() method is used in a variety of applications, including:

  • Logging: Data can be written to a log file using the writable.write() method.

  • File I/O: Data can be written to a file using the writable.write() method.

  • Network programming: Data can be written to a network socket using the writable.write() method.


Readable Streams in Node.js

Imagine a water pipe that brings water to your house. In Node.js, that pipe is called a "Readable Stream." It represents a way to get data from a source.

Types of Readable Streams

  • HTTP Responses: When you download a website, the response from the server comes in a Readable Stream.

  • HTTP Requests: On a web server, the request from the user is also a Readable Stream.

  • File Reading: If you're reading a file from your computer, the stream of data comes into a Readable Stream.

  • Other: There are also streams for compressing, encrypting, and communicating with other computers.

How Readable Streams Work

Readable Streams have a simple interface:

  • read(): Gets the next chunk of data if available.

  • pause(): Stops the flow of data.

  • resume(): Resumes the flow of data.

Real-World Examples

  • Downloading a file:

const fs = require("fs"); // Node's File System module

const fileStream = fs.createReadStream("my-file.txt");

fileStream.on("data", (chunk) => {
  // Do something with the chunk of data
});
  • Sending data to a server:

const http = require("http"); // Node's HTTP module

const request = http.request({
  // Request options
});

request.write("This is the data I want to send"); // Send data to the server

Applications of Readable Streams

  • File uploads/downloads: Streaming large files is much faster than reading/writing the entire file at once.

  • Data processing: Readable Streams allow you to process data in chunks, which can be more efficient.

  • Real-time communication: Streams are used in applications like chat and video conferencing to send data continuously.


Flowing Mode vs Paused Mode in Readable Streams

What is a Readable Stream?

Imagine you have a water pipe. The water flowing through the pipe is like data in a Readable Stream. You can read the data from the stream using a 'read' function, just like you can turn on the faucet to get water.

Flowing Mode:

In flowing mode, the data flows from the pipe (Readable Stream) to your bucket (data consumer) automatically. Imagine you have a bucket under the faucet, and the water flows into the bucket without you having to turn on the faucet every time. This is achieved by adding a 'data' event listener.

readableStream.on("data", (chunk) => {
  // Process the data chunk
});

Paused Mode:

In paused mode, you have to tell the stream to give you data. You do this by calling the 'read' function. Imagine you have to turn on the faucet every time you want to fill your bucket.

const chunk = readableStream.read();
// Process the data chunk

How to Switch Modes:

  • Switch from Paused to Flowing:

    • Add a 'data' event listener

    • Call the 'resume' function

    • Pipe the stream to another stream (called Writable Stream)

  • Switch from Flowing to Paused:

    • Remove all 'data' event listeners

    • Remove all pipe destinations

    • Call the 'pause' function

Why is it Important?

It's important to manage the flow of data to avoid overloading your system or losing data. If you don't have a way to process or consume the data, you should pause the stream to prevent data loss.

Real-World Example:

  • Log Files: Imagine you have a program that generates log files continuously. You might want to read these logs in flowing mode and display them on a real-time dashboard.

  • File Download: If you're downloading a large file, you might want to pause the download if your internet connection becomes weak.

Code Example:

Here's a complete example that demonstrates flowing and paused modes:

const fs = require("fs");
const readableStream = fs.createReadStream("myfile.txt");

// Flowing mode: add a 'data' event listener
readableStream.on("data", (chunk) => {
  console.log(`Received data: ${chunk.toString()}`);
});

// After a while, pause the stream
setTimeout(() => {
  readableStream.pause();
}, 5000);

// Resume the stream after another while
setTimeout(() => {
  readableStream.resume();
}, 10000);

Internal State of a Readable Stream

Imagine a water pipe that streams water. Inside the pipe, there's a valve that controls the flow of water. Similarly, a readable stream in Node.js has an internal state that controls the flow of data.

Three States of a Readable Stream

The valve of the readable stream can be in one of three positions:

  1. Valve Closed (readableFlowing === null): No one is asking for water, so the valve is closed. No data will flow.

  2. Valve Partially Open (readableFlowing === false): Someone asked for water, but then closed the tap. The valve is partially open, so some data might still be flowing into a buffer, but no events will be emitted.

  3. Valve Open (readableFlowing === true): The valve is fully open, allowing data to flow freely. Events will be emitted to alert consumers that data is available.

Controlling the Valve

  • Opening the Valve: To start reading data from a stream, you need to open the valve. This can be done by:

    • Calling stream.resume(): Opens the valve if it's closed.

    • Attaching a 'data' event listener: Opens the valve if it's closed.

    • Calling stream.pipe(): Opens the valve and connects the stream to a writable stream.

  • Closing the Valve: To stop reading data from a stream, you can close the valve by:

    • Calling stream.pause(): Closes the valve if it's open.

    • Calling stream.unpipe(): Closes the valve and disconnects the stream from a writable stream.

    • Receiving backpressure: When the downstream stream cannot handle more data, it will send backpressure to the readable stream, which will close the valve.

Example

Let's create a readable stream and control its flow:

const { Readable } = require("stream");

// Create a readable stream
const stream = new Readable();

// Open the valve
stream.resume();

// Attach a 'data' event listener
stream.on("data", (chunk) => {
  console.log(chunk.toString());
});

// Write some data
stream.push("Hello");
stream.push("World");

// Close the valve
stream.pause();

// Write more data (will be buffered)
stream.push("!");
stream.push("!");

// Open the valve again
stream.resume();

// The buffered data will now be emitted
console.log(stream.readableFlowing); // true

Application

Readable streams are used in various applications:

  • File I/O: Reading from a file.

  • Network I/O: Receiving data over a network.

  • Data pipelines: Breaking down large data processing tasks into smaller chunks.

  • Event handling: Emitting events when data is available.


Readable Streams

Streams are like pipes that carry data from one place to another. In Node.js, the Readable stream represents a stream that you can read data from.

API Styles

When working with Readable streams, there are multiple ways to read the data:

  • on('data'): This method listens for any data that the stream receives.

  • on('readable'): This method listens for when there is readable data available in the stream.

  • pipe(): This method connects one stream to another, allowing data to flow directly between them.

  • Async iterators: This feature allows you to use for await...of loops to consume stream data.

Choosing One Method

It's important to choose only one method to consume data from a stream. Using multiple methods can lead to unexpected behaviors and errors.

Real-World Examples

1. Using on('data')

const stream = require("stream");
const readableStream = new stream.Readable();

readableStream.on("data", (chunk) => {
  // Do something with the data chunk
});

This example sets up a Readable stream and listens for data events. When data is available, the chunk will be passed to the callback function.

2. Using on('readable')

const stream = require("stream");
const readableStream = new stream.Readable();

readableStream.on("readable", () => {
  // Do something with the readable data
});

This example listens for readable events on the stream. When data is readable, the callback function will be executed.

3. Using pipe()

const stream = require("stream");
const readableStream = new stream.Readable();
const writableStream = new stream.Writable();

readableStream.pipe(writableStream);

This example connects the readable stream to the writable stream using the pipe() method. Data from the readable stream will flow directly to the writable stream.

Potential Applications

Readable streams are used in a variety of applications, including:

  • Reading data from files

  • Communicating with databases

  • Processing data in real time

  • Creating custom data pipelines


Class: stream.Readable

A readable stream is a stream that emits data in the form of chunks. A chunk is a buffer of data, and can be of any size. Readable streams are used to read data from a source, such as a file or a network connection.

Methods

readable.read([size])

  • Read data from a stream.

  • size (optional): The number of bytes to read. If not specified, all available data will be read.

  • Returns: A Buffer containing the data that was read.

readable.setEncoding(encoding)

  • Sets the encoding for the data that is emitted from the stream.

  • encoding (string): The encoding to use. Can be one of: 'ascii', 'utf8', 'utf16le', 'ucs2', 'base64', or 'hex'.

  • Returns: this.

readable.pause()

  • Pauses the stream. No more data will be emitted until the stream is resumed.

readable.resume()

  • Resumes the stream. Data will continue to be emitted from the stream.

readable.pipe(destination)

  • Pipes the output of this stream to another stream.

  • destination (stream): Destination to pipe

Events

'data' event

  • Emitted when data is available to be read.

  • Listener: function(chunk):

    • chunk: A Buffer containing the data that was read.

'end' event

  • Emitted when all data has been read from the stream.

  • Listener: function()

'error' event

  • Emitted when an error occurs.

  • Listener: function(err):

    • err: An Error object describing the error.

Real-World Examples

  • Reading a file from disk:

const fs = require("fs");

const readableStream = fs.createReadStream("file.txt");

readableStream.on("data", (chunk) => {
  console.log(chunk.toString());
});

readableStream.on("end", () => {
  console.log("All data has been read");
});

readableStream.on("error", (err) => {
  console.log(err);
});
  • Consuming data from a network connection:

const net = require("net");

const server = net.createServer();

server.on("connection", (socket) => {
  socket.on("data", (data) => {
    console.log(data.toString());
  });
});

server.listen(8080);

Event: 'close'

What is it?

The 'close' event is triggered when a stream and all its resources (like files, sockets, etc.) have been closed. This means that there won't be any more events or data coming from the stream, and it's finished doing its job.

Why is it used?

We use the 'close' event to know when a stream is completely done, and we can safely clean up any resources that were used by the stream.

How does it work?

The stream will emit the 'close' event after all the data has been processed and all resources have been released. You can listen for this event using the .on() method:

ReadableStream.on('close', () => {
  // Do stuff when the stream is closed
});

Real-world example:

Imagine you have a stream that reads data from a file. When the stream reaches the end of the file, it will emit the 'close' event. You can listen for this event and then close the file descriptor to release the file's resources:

const fs = require('fs');

const readableStream = fs.createReadStream('file.txt');

readableStream.on('close', () => {
  fs.close(fd); // Close the file descriptor
});

Potential applications:

  • Releasing resources when a stream is finished to avoid memory leaks.

  • Detecting when a network connection has been closed to clean up associated resources.

  • Notifying other parts of your program that a stream has completed its operation.


Event: 'data'

Explanation in simplified English:

Imagine a stream as a garden hose. When water flows through the hose, we can attach a nozzle to it to catch and use the water in a specific way. Similarly, when data flows through a stream, we can attach 'data' event listeners to catch and process the data in a specific way.

Chunk:

Each time the stream sends data, it comes in small pieces called 'chunks'. These chunks can be like slices of bread, and the stream sends them out one slice at a time.

Encoding:

Just like there are different ways to slice bread, there are different ways to handle the data in the chunks. For example, we can convert the chunks into readable text, like "Hello, world!" or keep them as raw binary data. This process is called 'encoding'.

Callback function:

When we attach a 'data' event listener, we provide a callback function that will be called each time a chunk of data arrives. In the callback function, we can decide what to do with the data, like display it, store it, or pass it on to another part of our program.

Flowing mode:

Streams can be in 'flowing' mode or 'paused' mode. When a stream is in flowing mode, the data chunks keep flowing out automatically. When it's paused, we need to manually call readable.read() to retrieve the data. Attaching a 'data' event listener automatically switches the stream into flowing mode.

Real-world applications:

  • Streaming video: A video player can use the 'data' event to receive and display chunks of video data as they arrive.

  • File uploading: A web application can use the 'data' event to receive and save chunks of a file being uploaded by a user.

  • Data streaming: A data processing pipeline can use 'data' events to receive and process chunks of data in real-time.

Example:

// Create a readable stream that emits the characters "Hello, world!"
const readable = require("stream").Readable({
  read() {
    // This function will be repeatedly called by Node.js until we stop emitting data.
    // We stop emitting data by calling readable.push(null).

    // Emit the first character of "Hello, world!"
    this.push("H");

    // Emit the rest of the characters one by one.
    this.push("e");
    this.push("l");
    this.push("l");
    this.push("o");
    this.push(", ");
    this.push("w");
    this.push("o");
    this.push("r");
    this.push("l");
    this.push("d");
    this.push("!");

    // When there is no more data to emit, call readable.push(null) to indicate the
    // end of the stream.
    this.push(null);
  },
});

// Attach a 'data' event listener to the readable stream and print the received data
readable.on("data", (chunk) => {
  console.log(`Received a chunk of data: ${chunk.toString()}`);
});

// Start the stream
readable.read();

Output:

Received a chunk of data: H
Received a chunk of data: e
Received a chunk of data: l
Received a chunk of data: l
Received a chunk of data: o
Received a chunk of data: ,
Received a chunk of data: w
Received a chunk of data: o
Received a chunk of data: r
Received a chunk of data: l
Received a chunk of data: d
Received a chunk of data: !

Event: 'end'

The 'end' event is triggered when there is no more data to be received from the stream.

Imagine a water pipe: When you turn on the tap, water flows out. The 'data' event is like receiving water droplets coming out of the tap.

Once the tap is turned off, there will be no more water coming out. The 'end' event is like that moment when the last drop of water has dripped out and there's nothing left.

How to consume all data

To receive the 'end' event, you need to make sure you've consumed all the data from the stream. You can do this by:

  • Setting the stream to "flowing" mode: This means the stream will automatically read and process data as it becomes available.

  • Manually calling stream.read() repeatedly: You can keep calling stream.read() until it returns null, indicating there's no more data.

Code example

const stream = getReadableStreamSomehow();

// Flowing mode
stream.on("data", (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});

// Manual reading
stream.on("data", (chunk) => {
  if (chunk === null) {
    console.log("There will be no more data.");
  } else {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
});

Real-world usage

The 'end' event is useful for marking the completion of data transmission. For example:

  • File uploads: When a file is fully uploaded, the stream will emit the 'end' event, signaling that the upload is complete.

  • Data parsing: After a stream of data has been fully parsed, the 'end' event can be used to trigger further processing or analysis of the parsed data.


Event: 'error'

Explanation

The 'error' event is emitted by a Readable stream when it encounters an error that prevents it from continuing to read. This could be due to an internal failure in the stream implementation, or because the stream is trying to push invalid data.

Callback Function

The listener callback for the 'error' event takes one argument:

  • error: An Error object describing the error that occurred.

Real-World Example

Here is an example of how to handle the 'error' event on a Readable stream:

const stream = require("stream");

const readableStream = new stream.Readable();

readableStream.on("error", (err) => {
  console.error(`Error occurred: ${err.message}`);
});

readableStream.push("Hello, world!");
readableStream.push(null); // Signal the end of the stream.

In this example, we create a Readable stream and add a listener for the 'error' event. If an error occurs while the stream is reading, the listener callback will be called with the error object.

Potential Applications

The 'error' event can be used to handle errors that occur while reading from a stream. This could be useful in applications where it is important to continue processing data even if errors occur. For example, a data processing application could use the 'error' event to log errors and continue processing data from other sources.


Event: 'pause'

The 'pause' event is emitted when [stream.pause()][stream-pause] is called, indicating that no more data should be read from the stream until stream.resume() is called.

Example

const fs = require("fs");

const readableStream = fs.createReadStream("file.txt");

readableStream.on("pause", () => {
  console.log("Stream is paused.");
});

When to use

The 'pause' event is useful for controlling the flow of data through a stream. For example, you could use it to pause a stream while you process some of the data, or to pause it until some other condition is met.

Real-world applications

  • Pausing a stream while processing data: If you are processing data from a stream and need to pause the stream to perform some other task, you can use the 'pause' event to do so. This will prevent any more data from being read from the stream until you call stream.resume().

  • Pausing a stream until a condition is met: If you need to pause a stream until a certain condition is met, you can use the 'pause' event to do so. For example, you could pause a stream until a certain number of bytes have been read, or until a certain event has occurred.


Simplified Explanation of 'readable' Event:

Imagine a water pipe that can fill a bucket.

  • Event Trigger: When there's water in the pipe (data available).

  • What Happens: You can start collecting water (reading data) from the pipe.

  • If No Data: If there's no water, you won't get any (end of stream reached).

Handling 'readable' Event:

// Create a water pipe (readable stream)
const pipe = getReadableStreamSomehow();

// Listen for when there's water (readable event)
pipe.on("readable", () => {
  // Start collecting water (reading data)
  let water;
  while ((water = pipe.read()) !== null) {
    console.log(`Collected ${water}`);
  }
});

Alternative and Easier Options:

  • 'data' Event: Use the 'data' event instead. It automatically calls pipe.read() for you.

  • .pipe() Method: Connect the pipe to another stream or destination using .pipe(). It handles data flow automatically.

Real-World Applications:

  • Data Processing: Read data from a file or network and process it chunk by chunk.

  • Data Streaming: Send data from one source to another in real-time.

  • HTTP Requests: Receive data from a server in a web browser.

Potential Code Implementations:

File Reading:

const fs = require("fs");

// Create a readable stream for a file
const fileStream = fs.createReadStream("data.txt");

// Listen for readable events
fileStream.on("readable", () => {
  let dataChunk;
  while ((dataChunk = fileStream.read()) !== null) {
    // Process the data chunk
    console.log(`Read ${dataChunk.length} bytes of data`);
  }
});

HTTP Request Response:

// Create a server
const http = require("http");

http
  .createServer((req, res) => {
    // When the request is readable, read the data and send a response
    req.on("readable", () => {
      let dataChunk;
      while ((dataChunk = req.read()) !== null) {
        res.write(dataChunk);
      }
    });
  })
  .listen(8080);

Event: 'resume'

When you create a readable stream, it starts in a paused state. When you call the resume() method on the stream, it starts emitting data. The 'resume' event is emitted when the resume() method is called and the stream is not already flowing (i.e., readableFlowing is not true).

Real-world example

Imagine you have a file containing a list of names, and you want to print them to the console. You can use a readable stream to process the file. The following code shows how to use the 'resume' event to start emitting data from the stream:

const fs = require("fs");
const { Transform } = require("stream");

const transformStream = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  },
});

const fileStream = fs.createReadStream("./names.txt");
fileStream
  .pipe(transformStream)
  .on("resume", () => {
    console.log("Starting to print names:");
  })
  .on("data", (data) => {
    console.log(data);
  });

In this example, the fileStream is paused by default. When the resume() method is called on the fileStream, the 'resume' event is emitted, and the console.log('Starting to print names:') message is printed. The fileStream then starts emitting data, which is transformed by the transformStream and printed to the console.

Potential applications

The 'resume' event can be used to control when data is emitted from a readable stream. This can be useful in cases where you want to pause and resume the stream for performance reasons or to control the flow of data.


readable.destroy([error])

  • error (Optional): An error object to be emitted as a payload in the 'error' event.

  • Returns: {this} (The stream itself)

Purpose

The destroy() method allows you to terminate a readable stream and release any internal resources it may be holding. Once a stream is destroyed, any subsequent calls to push() will be ignored.

How it Works

  • Optionally, the method emits an 'error' event with the provided error object as the payload.

  • It then emits a 'close' event, unless you set the emitClose option to false.

  • The stream releases all its internal resources.

When to Use

You should use destroy() to properly clean up a readable stream when you no longer need to read data from it. This helps prevent memory leaks and ensures that the stream's resources are released.

Real-World Code Example

// Create a readable stream
const fs = require("fs");
const readableStream = fs.createReadStream("file.txt");

// Read data from the stream
readableStream.on("data", (chunk) => {
  console.log(chunk.toString());
});

// Destroy the stream when you are done reading
readableStream.on("end", () => {
  readableStream.destroy();
});

Potential Applications

  • Closing a file after reading its contents

  • Cleaning up a data pipeline after processing data

  • Handling errors and terminating a stream gracefully

  • Releasing resources after a stream is no longer needed

  • Implementing cleanup logic in server-side applications

  • Ensuring proper resource management in web applications


readable.closed

  • Type: boolean

  • Description: A boolean value that is true after the 'close' event has been emitted.

Real-world example

The following code sample shows you how to use the readable.closed property:

const { Readable } = require("stream");

const readable = new Readable();

readable.on("close", () => {
  // The stream is now closed.
});

readable.pause();
readable.destroy();

Applications in the real world

The readable.closed property can be used to determine whether a readable stream is closed. This can be useful in a variety of scenarios, such as:

  • Logging: Logging the fact that a stream has been closed can be helpful for debugging purposes.

  • Error handling: If a stream is closed unexpectedly, the readable.closed property can be used to determine whether the error was caused by the stream being closed.

  • Resource management: If a stream is no longer needed, the readable.closed property can be used to determine whether it can be safely closed to free up resources.


readable.destroyed

  • Type: Boolean

  • Description: Indicates whether the stream has been destroyed.

Simplified Explanation

Imagine you have a water pipe. When you turn off the water supply and close the valve, the pipe is no longer providing water. In the same way, when you call readable.destroy(), you are effectively turning off the water supply and closing the valve. The readable.destroyed property lets you know that the stream has been destroyed.

Real-World Example

A common use case for destroying a stream is when you want to stop reading from it. For example, if you are using a stream to read data from a file, you can destroy the stream once you have finished reading the file. This will free up resources and prevent the stream from continuing to read data unnecessarily.

Here is a simple example:

const fs = require('fs');

const readableStream = fs.createReadStream('test.txt');

// Read data from the stream
readableStream.on('data', (data) => {
  console.log(data.toString());
});

// Destroy the stream after reading all data
readableStream.on('end', () => {
  console.log('Finished reading file');
  readableStream.destroy();
});

In this example, we create a readable stream from a file named test.txt. We then listen for the data event to read data from the stream. Once the stream reaches the end of the file, it emits the end event. In the end event handler, we call readableStream.destroy() to destroy the stream. This will free up resources and prevent the stream from continuing to read data.


readable.isPaused() method in stream module returns a boolean indicating whether or not the readable stream is currently paused.

How does it work? When a readable stream is created, it starts in a flowing state, meaning that data will be emitted as soon as it is available. However, you can pause the stream using the readable.pause() method, which will stop the emission of data. You can then resume the stream using the readable.resume() method, which will start the emission of data again.

Why would you use this method? You might want to pause a readable stream if you need to do some processing on the data before it is emitted, or if you want to control the rate at which data is emitted.

Real world example: Let's say you have a readable stream that is emitting data from a file. You want to pause the stream so that you can process the data before it is displayed on the screen. You can do this by using the following code:

const fs = require('fs');
const stream = fs.createReadStream('file.txt');

stream.on('data', (chunk) => {
  // Process the data here
});

stream.pause();

// Do some processing on the data

stream.resume();

Applications in the real world:

1. Data processing: You can use the readable.isPaused() method to pause a readable stream while you process the data. This can be useful if you need to do some complex processing on the data before it is emitted, or if you want to control the rate at which data is emitted.

2. Error handling: You can use the readable.isPaused() method to pause a readable stream if an error occurs. This can prevent the stream from emitting data that could be corrupted or invalid.

3. Flow control: You can use the readable.isPaused() method to control the flow of data in a readable stream. For example, you can pause the stream if you need to wait for some other operation to complete before continuing to emit data.


readable.pause()

  • What it does: Pauses the stream from emitting data events.

  • How it works: When you call readable.pause(), the stream stops emitting 'data' events. Any data that becomes available after pausing will remain in the internal buffer.

  • Example: Let's say you want to pause a stream that is reading a large file. Instead of reading the entire file into memory, you can pause the stream after reading a certain amount of data, process that data, and then resume the stream to read the next chunk of data.

const fs = require("fs");
const readableStream = fs.createReadStream("large-file.txt");

// Pause the stream after reading 1MB of data
readableStream.on("data", (chunk) => {
  if (chunk.length >= 1024 * 1024) {
    readableStream.pause();
  }
});

// Resume the stream after 1 second
setTimeout(() => {
  readableStream.resume();
}, 1000);
  • Potential applications:

    • Controlling the flow of data in a stream

    • Pausing a stream to process data in chunks

    • Limiting memory usage by pausing a stream before the buffer fills up


Readable.pipe(Destination, [Options]) Method

Explanation:

Imagine you have data flowing from a Readable stream, like a water pipe. The pipe() method allows you to connect this pipe to a Writable stream, like a sink, where the water (data) will flow into.

Parameters:

  • Destination: The Writable stream where the data will flow into.

  • Options: (Optional) An object that can specify additional options:

    • end: If true, the destination stream will be closed when the readable stream finishes sending data. Default: true.

Return Value:

The method returns the Destination stream, allowing you to chain multiple pipes together.

How it Works:

When you call pipe(), the readable stream automatically starts sending data to the destination. The flow of data is controlled so that the destination stream doesn't get overwhelmed.

Code Snippet:

// Read data from a file and write it to the console
const fs = require("fs");
const readableStream = fs.createReadStream("file.txt");
const writableStream = process.stdout;

readableStream.pipe(writableStream);

Real-World Applications:

  • Piping data from a file to be compressed and stored in a database.

  • Piping data from a sensor to a dashboard for real-time monitoring.

  • Piping data from a user's input field to a server for validation.

Example Code:

Pipe data to compress and store:

const fs = require("fs");
const zlib = require("zlib");

const readableStream = fs.createReadStream("large_file.txt");
const writableStream = fs.createWriteStream("large_file.gz");

// Compress the data while piping
const gzipStream = zlib.createGzip();

readableStream.pipe(gzipStream).pipe(writableStream);

Pipe data to a UI:

const io = require("socket.io");
const readableStream = io.of("/sensor").on("connection", (socket) => {
  // Send data to the connected client
  socket.emit("sensor-data", data);
});

Simplified Explanation:

A readable stream allows you to access data in chunks. Like when you watch a video, you get chunks of the video at a time.

readable.read([size]) method:

  • The read method lets you take chunks of data from the stream.

  • The size parameter (optional) tells the stream how much data you want to read (in bytes). If you don't specify a size, it will read all available data.

  • The method returns the data as a string, Buffer (like a chunk of bytes), or null if there's no data to read.

How to use:

You call the read method when you want to process the data. This can be useful for:

  • Reading files chunk by chunk (e.g., large log files)

  • Reading data from an HTTP response chunk by chunk

Example:

// Import the 'fs' module
const fs = require("fs");

// Create a readable stream from a file
const readableStream = fs.createReadStream("my_file.txt");

// Listen for the 'readable' event
readableStream.on("readable", () => {
  // Keep reading chunks of data until there's no more to read
  let data;
  while ((data = readableStream.read()) !== null) {
    // Process the data here (e.g., display it on a web page)
  }
});

Potential Applications:

  • Streaming videos, audios

  • Downloading files in chunks

  • Real-time data processing (e.g., stock market updates)


readable.readable

  • Type: boolean

  • Description: Indicates whether the stream is in a state where calling [readable.read()][stream-read] is safe. This means that the stream has not been destroyed, and has not emitted 'error' or 'end'.

Simplified Explanation

Imagine a stream as a pipe carrying water. The readable.readable property tells you if the pipe is open and flowing. If it's true, you can keep reading water from the pipe. If it's false, the pipe is either closed or has a problem.

Real-World Example

const fs = require("fs");

const readableStream = fs.createReadStream("./file.txt");

// Check if the stream is readable before reading from it
if (readableStream.readable) {
  // Read and process the data from the stream
  readableStream.on("data", (chunk) => {
    // Do something with the data chunk
  });
} else {
  // Handle the error or end event
}

In this example, we check if the readableStream is in a readable state before attempting to read data from it. This ensures that we do not try to read from a closed or errored stream.

Potential Applications

  • File reading: Checking if a file stream is readable before attempting to read data ensures that we do not encounter errors or unexpected end-of-file situations.

  • Network connections: Checking if a network connection is readable before attempting to send or receive data ensures that the connection is active and ready for communication.

  • Data processing: Streams are commonly used for data processing tasks, such as filtering or transforming data. Checking the readability of a stream before processing data ensures that the stream is still active and has not encountered any problems.


readable.readableAborted

This property tells you if the stream was interrupted or encountered an error before it could finish emitting all of its data.

Simplified explanation: A stream that has been aborted is like a water pipe that has been cut off before all the water could flow out.

Code snippet:

const fs = require("fs");

const readableStream = fs.createReadStream("./my-file.txt");

readableStream.on("readable", () => {
  // Read chunks of data from the stream
});

readableStream.on("end", () => {
  // The stream has finished emitting all of its data
});

readableStream.on("error", () => {
  // An error occurred while reading the stream
});

readableStream.on("close", () => {
  // The stream has been closed
});

// Check if the stream was aborted
if (readableStream.readableAborted) {
  console.log(
    "The stream was aborted before it could finish emitting all of its data."
  );
}

Real-world application: This property can be useful for handling errors or interruptions in a stream. For example, if you want to log an error when a file cannot be read, you could use the following code:

const fs = require("fs");

const readableStream = fs.createReadStream("./my-file.txt");

readableStream.on("error", (err) => {
  console.log(`Error reading file: ${err.message}`);

  // Check if the error was caused by the stream being aborted
  if (readableStream.readableAborted) {
    console.log(
      "The stream was aborted before it could finish emitting all of its data."
    );
  }
});

readable.readableDidRead

Simplified Explanation:

readable.readableDidRead is like a flag that tells us if the stream has already sent any 'data' events. It returns true if at least one 'data' event has been emitted; otherwise, it returns false.

Detailed Explanation:

  • Readable streams are streams that emit data in chunks.

  • readable.readableDidRead is a property of readable streams that indicates whether the stream has emitted any 'data' events yet.

  • This property is useful for checking if the stream has started flowing data. For example, you can use it to trigger an action when the first chunk of data is received.

Real-World Example

const fs = require('fs');

const readableStream = fs.createReadStream('file.txt');

readableStream.on('readable', () => {
  // The stream has started flowing data
  // Do something with the data
});

In this example, we create a readable stream using fs.createReadStream(). We then listen for the 'readable' event, which is emitted when the stream has data available to be read.

When the 'readable' event is emitted, we check the readableDidRead property to see if any 'data' events have been emitted yet. If readableDidRead is true, it means that the stream has already started flowing data and we can start processing the data.

Potential Applications

  • Triggering actions when the first chunk of data is received.

  • Checking if a stream has stopped flowing data.

  • Debugging stream behavior.


readable.readableEncoding

  • Type: null or string

Description

The readable.readableEncoding property gets or sets the character encoding for the data. It's important for text-based streams to set the encoding property so that incoming data can be correctly interpreted and displayed.

Usage

const { Readable } = require("stream");

// Create a readable stream
const readableStream = new Readable();

// Set the encoding to 'utf-8'
readableStream.setEncoding("utf-8");

// Listen for data events
readableStream.on("data", (chunk) => {
  console.log(chunk.toString()); // Output: Hello World
});

// Push data to the stream
readableStream.push("Hello World");
readableStream.push(null); // End the stream

Real-World Applications

The readable.readableEncoding property is used in many real-world applications, including:

  • Web Servers: HTTP servers need to set the encoding property to properly interpret incoming request data and generate appropriate responses.

  • Log Files: Log files often use specific character encodings to store data in a structured format.

  • Data Processing: Data processing pipelines may need to convert data between different character encodings for compatibility or interoperability.

Potential Improvements

One potential improvement to the readable.readableEncoding property would be to make it automatically detect the encoding of incoming data. Currently, developers must manually set the encoding, which can be error-prone.


Readable.readableEnded

What is it? The readableEnded property indicates whether the readable stream (eg. a file, a network connection, etc.) has reached the end of its data and emitted the 'end' event.

How to use it: You can check the value of the readableEnded property to determine if the stream has ended.

Example:

const fs = require("fs");

const readableStream = fs.createReadStream("file.txt");

readableStream.on("end", () => {
  console.log("The stream has ended.");
});

if (readableStream.readableEnded) {
  console.log("The stream has already ended.");
}

In this example, we are creating a readable stream from a file. We then add an event listener for the 'end' event, which will be triggered when the stream reaches the end of its data. We also check the value of the readableEnded property to see if the stream has already ended.

Real-world applications: The readableEnded property can be used to determine when a stream has finished processing data. This can be useful for:

  • Checking if a file has been completely read

  • Waiting for a network connection to close

  • Terminating a process when all data has been processed

Simplified explanation: Imagine you are reading a book. The readableEnded property is like a flag that tells you whether you have reached the last page of the book. You can check this flag to see if you are finished reading or if you need to keep going.


readable.errored

  • What it is:

    readable.errored is a property that can return an error. It is used to check if a stream has been destroyed with an error.

  • How to use it:

    const { Readable } = require('stream');
    
    const readableStream = new Readable();
    
    readableStream.on('error', (err) => {
      // Handle error here
    });
    
    readableStream.on('close', () => {
      if (readableStream.errored) {
        // Stream was destroyed with an error
      }
    });
  • Real-world example:

    In a web application, you can use readable.errored to handle errors that occur when reading from a file. For example, if you are reading from a file that does not exist, you can use readable.errored to catch the error and display a message to the user.

  • Potential applications:

    readable.errored can be used in any application that needs to handle errors that occur when reading from a stream.


readable.readableFlowing

Simplified Explanation:

Imagine a water pipe. The readable.readableFlowing property tells us whether the water is flowing through the pipe or not.

Detailed Explanation:

A Readable stream represents a source of data that can be read as a flow of individual chunks. The readable.readableFlowing property reflects the current state of the stream in terms of its flow control:

  • True (flowing): Data is actively being emitted from the stream.

  • False (paused): The stream is temporarily paused, preventing any further data emission.

Real-World Example:

Consider a scenario where you're downloading a large file from the internet. The Readable stream represents the data coming in from the network.

Initially, the stream is flowing (true), and data is downloaded at the network speed. However, if your internet connection becomes slow or unstable, the stream might get paused (false), temporarily stopping the data flow.

Potential Applications:

  • Flow Control: Adjust the flow of data into a stream to prevent buffer overflow.

  • Error Handling: Detect when the flow is paused due to an error on either the source or the consumer end.

  • Data Regulation: Manage the rate at which data is consumed, smoothing out fluctuations in data availability.

Improved Example:

const { Readable } = require("stream");

const myReadableStream = new Readable({
  read() {}, // Implementation of the read() method
});

myReadableStream.on("data", (chunk) => {
  // Data is actively being emitted, so we can process it here
});

myReadableStream.on("pause", () => {
  // The stream has been paused, so we can stop processing data for now
});

myReadableStream.on("resume", () => {
  // The stream has been resumed, so we can resume processing data
});

readable.readableHighWaterMark

  • number: The high water mark for the readable stream. This is the number of bytes that the stream's buffer can hold before it starts to emit the 'data' event.

Real-World Example

The following code creates a readable stream with a high water mark of 100 bytes:

const { Readable } = require("stream");

// create a readable stream
const rs = new Readable({
  highWaterMark: 100,
});

// add some data to the stream
rs.push("hello world");

// pause the stream
rs.pause();

// get the high water mark
const highWaterMark = rs.readableHighWaterMark; // 100

// resume the stream
rs.resume();

Potential Applications

  • Controlling the flow of data in a stream.

  • Preventing a stream from buffering too much data.

  • Improving the performance of a stream.


readable.readableLength

Simplified Explanation:

The readable.readableLength property tells you how much data is waiting to be read from the stream. It's like a queue of data that's ready for your program to process.

Detailed Explanation:

When you create a readable stream, data is sent into the stream in chunks. Each chunk is a small piece of the data, like a paragraph in a book. As chunks arrive, they get added to a queue, which is a waiting line for data. The readable.readableLength property tells you how many chunks are currently in the queue.

The readable.readableLength property is useful for managing the flow of data in your stream. For example, if the readableLength is getting too high, you might want to pause the stream to prevent your program from getting overwhelmed with data.

Code Example:

const fs = require("fs");

const readableStream = fs.createReadStream("large-file.txt");

readableStream.on("data", (chunk) => {
  // Process the data chunk here
});

// Check the readableLength periodically to manage flow
setInterval(() => {
  const readableLength = readableStream.readableLength;

  if (readableLength > 1000) {
    // Higher than threshold
    // Pause the stream to avoid overloading
    readableStream.pause();
  }
}, 200);

// Resume the stream when it's ready to process more
readableStream.on("drain", () => {
  readableStream.resume();
});

Real-World Applications:

The readable.readableLength property is used in various applications, including:

  • File reading: To prevent loading too much data into a program's memory.

  • Streaming media: To control the rate at which audio or video is played.

  • Network communication: To ensure data is transmitted at a manageable pace.


readable.readableObjectMode

Getter:

Returns true if the stream is in objectMode. Otherwise, returns false.

Usage:

const readableStream = require("stream").Readable({
  objectMode: true,
});

console.log(readableStream.readableObjectMode); // true

Explanation:

  • A stream can operate in two modes: bufferMode or objectMode.

  • In bufferMode, the stream emits raw data chunks as Buffer objects.

  • In objectMode, the stream emits JavaScript objects instead of raw data chunks.

  • Setting objectMode to true allows you to process data as JavaScript objects, making it easier to work with complex data structures.

Real-World Application:

Suppose you have a stream of JSON data. You can use a Readable stream in objectMode to parse the incoming data as JavaScript objects:

const readableStream = require("stream").Readable({
  objectMode: true,
});

readableStream.on("data", (object) => {
  // Process the object
});

readableStream.push({ name: "John", age: 30 });

In this example, the readableStream will emit JavaScript objects, allowing you to work with the data in a more structured manner.


readable.resume()

In plain english:

readable.resume() is a method that makes a paused stream start flowing again.

Detailed explanation:

A stream is like a pipe that carries data. A readable stream is a stream that you can read data from.

When you pause a readable stream, it stops flowing data. This can be useful if you want to wait for something to happen before you start reading data again.

To resume a paused readable stream, you can call the readable.resume() method. This will cause the stream to start flowing data again.

Code snippet:

const fs = require("fs");

const readableStream = fs.createReadStream("file.txt");

// Pause the stream
readableStream.pause();

// Do something else...

// Resume the stream
readableStream.resume();

Real world example:

One potential application for readable.resume() is to limit the amount of data that you read from a stream at a time. You can do this by pausing the stream after you have read a certain amount of data, and then resuming the stream once you have processed that data.

Improved code example:

const fs = require("fs");

const readableStream = fs.createReadStream("file.txt");
let data = "";

readableStream.on("data", (chunk) => {
  data += chunk;

  // Pause the stream after we have read 100 bytes of data
  if (data.length >= 100) {
    readableStream.pause();
  }
});

readableStream.on("pause", () => {
  // Process the data that we have read so far
  console.log(data);

  // Resume the stream after we have processed the data
  readableStream.resume();
});

Readable.setEncoding(encoding)

Explanation

The readable.setEncoding() method in Node.js allows you to specify the character encoding used to decode data from a readable stream.

Imagine you're receiving messages from a friend through a letter that's written in code. You need to know what kind of code (e.g., Morse code, ASCII, etc.) your friend is using to decipher the message correctly.

By default, Node.js treats stream data as raw bytes (like the letter you receive). But if you know the encoding (like the code your friend is using), you can use setEncoding() to convert these raw bytes into readable characters (decipher the message).

Usage

const stream = fs.createReadStream("myFile.txt");

// Decode data as UTF-8 characters
stream.setEncoding("utf8");

stream.on("data", (chunk) => {
  console.log(chunk); // Outputs a string with data decoded in UTF-8
});

Code Implementation

Consider a server that broadcasts messages to clients in a specific encoding.

// Server code
const net = require("net");

const server = net.createServer((socket) => {
  // Set encoding for message data
  socket.setEncoding("utf8");

  // Receive and process message data from client
  socket.on("data", (data) => {
    console.log(`Received message: ${data}`);
  });
});

server.listen(3000);

Clients connecting to this server will receive messages decoded using UTF-8 encoding.

// Client code
const net = require("net");

const client = net.createConnection({ port: 3000 });

// Set encoding for message data
client.setEncoding("utf8");

client.on("connect", () => {
  client.write("Hello from client!");
});

Real-World Applications

  • Web servers: Decode HTTP request bodies encoded in various formats (e.g., JSON, form data).

  • Data processing: Convert raw data streams from sensors, databases, or files into human-readable formats.

  • Data serialization: Encode objects into a specific encoding for storage or transmission.


Readable.unpipe

The Readable.unpipe method is used to detach a Writable stream from a Readable stream.

Usage

readable.unpipe([destination]);
  • destination: The Writable stream to detach, or null to detach all Writable streams.

Behavior

  • If no destination is provided, all Writable streams that are currently piped from the Readable stream will be detached.

  • If a destination is provided, only the Writable stream that is currently piped to that destination will be detached.

  • If no Writable stream is currently piped to the destination, the method does nothing.

Example

const fs = require('fs');

const readable = fs.createReadStream('input.txt');
const writable1 = fs.createWriteStream('output1.txt');
const writable2 = fs.createWriteStream('output2.txt');

// Pipe the Readable stream to the two Writable streams.
readable.pipe(writable1);
readable.pipe(writable2);

// Detach the Readable stream from the first Writable stream.
readable.unpipe(writable1);

// The Readable stream will now only be piped to the second Writable stream.

Real-World Applications

  • Unpiping streams can be useful in situations where you need to stop sending data from a Readable stream to a Writable stream.

  • For example, you might want to unpipe a stream when you have finished processing the data from the Readable stream.

  • Alternatively, you can unpipe a stream when you no longer want the Writable stream to receive data from the Readable stream.


readable.unshift(chunk[, encoding])

The readable.unshift() method pushes data back into the internal buffer, allowing previously consumed data to be un-consumed.

Usage

readable.unshift(chunk[, encoding])
  • chunk: Data to push back into the buffer. For streams not in object mode, chunk must be a string, Buffer, Uint8Array, or null. For object mode streams, chunk can be any JavaScript value.

  • encoding: Encoding of string chunks (only used for strings).

Example

const stream = require('stream');
const readable = new stream.Readable();

// Read a chunk of data from the stream
readable.on('data', (chunk) => {
  console.log(`Received data: ${chunk.toString()}`);

  // Un-consume the chunk of data
  readable.unshift(chunk);
});

// Push some data into the stream
readable.push('Hello');
readable.push('World');

// End the stream
readable.push(null);

This example reads data from the stream, then un-consumes the last chunk of data and pushes it back into the buffer.

Real-World Applications

  • Streaming data from a database: A database stream may emit rows of data as it becomes available. If the consumer needs to process the rows in a specific order or needs to re-process a row, it can use readable.unshift() to push the row back into the buffer.

  • Handling backpressure: When a stream consumer is unable to keep up with the producer, the producer may need to slow down or stop sending data. readable.unshift() can be used to temporarily store data that cannot be processed right away and release it when the consumer is ready.

  • Creating a custom stream: When creating a custom stream, readable.unshift() can be used to implement buffering or other data management operations.


Readable Stream Wrapping

What is a readable stream?

A readable stream is a stream that can be read from. It emits data events when new data is available.

What is an old-style readable stream?

Before Node.js 0.10, streams didn't implement the full stream module API. They only emitted 'data' events and had an advisory-only pause() method.

What does readable.wrap() do?

The readable.wrap() method allows you to create a Readable stream that uses an old-style readable stream as its data source.

Why would you use readable.wrap()?

You might need to use readable.wrap() if you're using an older Node.js library that uses old-style readable streams. This allows you to interact with the old stream using the new stream API.

How do you use readable.wrap()?

To use readable.wrap(), you pass the old-style readable stream as an argument to the Readable constructor. For example:

const { OldReader } = require("./old-api-module.js");
const { Readable } = require("stream");

const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);

myReader.on("readable", () => {
  myReader.read(); // etc.
});

Real-world example

Let's say you have an older Node.js library that uses old-style readable streams to read data from a file. You can use readable.wrap() to create a Readable stream that wraps the old stream and allows you to use the new stream API. This would allow you to use the new stream API's features, such as the ability to pause and resume the stream, or to pipe the stream to another stream.

const fs = require("fs");
const { Readable } = require("stream");

const oldStream = fs.createReadStream("myfile.txt");
const newStream = new Readable().wrap(oldStream);

newStream.on("readable", () => {
  const data = newStream.read();
  // Do something with the data
});

This example shows how to use readable.wrap() to create a Readable stream that wraps an old-style readable stream. The new stream can then be used with the new stream API.


readable[Symbol.asyncIterator]()

Simplified Explanation:

It's like a magic trick that lets you go through a stream of data, one chunk at a time, and you can stop anytime.

Detailed Explanation:

Streams are like pipes that let data flow through them. readable[Symbol.asyncIterator]() gives you a special power to move through the stream one chunk at a time. You can use it to read data from a stream asynchronously, meaning you can do other things while waiting for data to come in.

Example:

// Imagine you have a stream that keeps receiving numbers: 1, 2, 3, 4, 5
const stream = createNumberStream(); // This is just an imaginary stream

// Now you can use the magic trick to loop through the numbers
for await (const number of stream) {
  console.log(`The magic trick just gave me ${number}`);
}

Real-World Applications:

  • File Reading: You can use it to read a large file chunk by chunk, which is faster and more efficient than reading the whole file at once.

  • Streaming Video: Websites use it to stream videos to your browser, allowing you to watch them as they're downloaded.

Tips:

  • If you want to stop the magic trick early, you can use break, return, or throw.

  • The stream will finish reading all the data it can even if you stop the loop early.


readable[Symbol.asyncDispose]() Method

This method is used to close a readable stream and return a promise that resolves when the stream is finished. It can be used to ensure that all the data in the stream has been processed before closing it.

Syntax

readable[Symbol.asyncDispose]()

Return Value

A promise that resolves when the stream is finished.

Example

const fs = require('fs');

const readableStream = fs.createReadStream('file.txt');

readableStream[Symbol.asyncDispose]().then(() => {
  console.log('Stream finished');
});

Real-World Applications

This method can be used in any situation where you need to ensure that all the data in a readable stream has been processed before closing it. For example, you could use it to:

  • Ensure that all the data in a file has been read before closing the file.

  • Ensure that all the data in a database query has been processed before closing the connection.

  • Ensure that all the data in a network stream has been received before closing the connection.

Potential Applications:

  • Reading files from disk

  • Processing data from a database

  • Receiving data from a network


readable.compose(stream[, options])

The readable.compose() method in stream module returns a stream composed with the stream stream.

Syntax

readable.compose(stream[, options])

The following code sample shows you how to use the readable.compose() method:

import { Readable } from 'node:stream';

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ');

    for (const word of words) {
      yield word;
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(
  splitToWords
);
const words = await wordsStream.toArray();

console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']

readable.iterator([options])

This method creates an async iterator that can be used to consume a stream.

  • options is an optional object that can contain the following properties:

    • destroyOnReturn: If set to false, calling return on the async iterator, or exiting a for await...of iteration using a break, return, or throw will not destroy the stream. Default: true.

  • Returns: an async iterator that can be used to consume the stream.

The async iterator created by this method can be used to iterate over the chunks of data emitted by the stream.

  • The iterator will automatically pause the stream when it reaches the end of the data, and will resume the stream when the next iteration is requested.

  • If the stream emits an error, the iterator will automatically throw an error.

Here is an example of how to use the readable.iterator() method:

const readable = Readable.from([1, 2, 3]);

for await (const chunk of readable.iterator()) {
  console.log(chunk); // 1
  // Pause the stream
  await new Promise((resolve) => setTimeout(resolve, 1000));
  // Resume the stream
  console.log(chunk); // 1
}

// The stream is now paused
console.log(readable.isPaused()); // true

// Resume the stream
readable.resume();

// The stream is now flowing again
console.log(readable.isPaused()); // false

In this example, the readable.iterator() method is used to create an async iterator that iterates over the chunks of data emitted by the readable stream.

  • The for await...of loop is used to iterate over the async iterator.

  • The await keyword is used to pause the stream when the end of the data is reached.

  • The setTimeout() function is used to simulate a delay between iterations.

The readable.iterator() method can be useful in situations where you need to consume a stream in a controlled manner.

  • For example, you could use the readable.iterator() method to implement a custom stream transformer that filters or modifies the data emitted by a stream.

Potential applications in real world for each

The readable.iterator() method can be used in a variety of real-world applications, including:

  • Data processing: The readable.iterator() method can be used to process data from a stream in a controlled manner.

  • For example, you could use the readable.iterator() method to implement a custom stream transformer that filters or modifies the data emitted by a stream.

  • Data analysis: The readable.iterator() method can be used to analyze data from a stream in a controlled manner.

  • For example, you could use the readable.iterator() method to implement a custom stream transformer that aggregates or summarizes the data emitted by a stream.

  • Data visualization: The readable.iterator() method can be used to visualize data from a stream in a controlled manner.

  • For example, you could use the readable.iterator() method to implement a custom stream transformer that renders the data emitted by a stream in a graphical format.


readable.map(fn, options)

What is it?

The map() method allows you to transform each chunk of data flowing through a readable stream into a new value. It's like using a magic wand to change the data into something else!

How it works

You provide the map() method with a function that does the transformation. This function takes the original chunk of data as input and returns the new transformed value.

For example, let's say you have a readable stream of numbers, and you want to double each number. You would write code like this:

const readableStream = Readable.from([1, 2, 3, 4]);

const doubledStream = readableStream.map((number) => number * 2);

The doubledStream will now contain the doubled values: [2, 4, 6, 8].

Options

The map() method also has some options you can use to control how it works:

  • concurrency: This option specifies how many chunks of data can be transformed at the same time. By default, it's set to 1, meaning that the transformation is done one chunk at a time. You can increase this value to improve performance if your transformation function is fast.

  • highWaterMark: This option specifies how many transformed chunks can be buffered before the stream is paused. By default, it's set to concurrency * 2 - 1. This means that if you have a concurrency of 2, the stream will pause when there are 3 transformed chunks waiting to be consumed.

  • signal: This option allows you to abort the stream if the signal is aborted. This can be useful if you want to stop the transformation process early.

Real-world applications

The map() method has many real-world applications, such as:

  • Data filtering: You can use the map() method to filter out unwanted data from a stream. For example, you could remove any lines in a text file that contain a certain word.

  • Data transformation: The map() method can be used to transform data into a different format. For example, you could convert a stream of JSON objects into a stream of CSV lines.

  • Data aggregation: The map() method can be used to aggregate data into a new form. For example, you could count the number of occurrences of a certain word in a stream of text.

Potential applications

Here are some potential applications of the map() method:

  • Streaming data analysis: You can use the map() method to analyze streaming data in real-time. For example, you could use the map() method to identify trends or patterns in a stream of financial data.

  • Data visualization: You can use the map() method to prepare data for visualization. For example, you could use the map() method to convert a stream of JSON objects into a stream of SVG elements.

  • Machine learning: You can use the map() method to transform data into a format that is suitable for machine learning algorithms. For example, you could use the map() method to convert a stream of images into a stream of feature vectors.

Improved example

Here is an improved example of using the map() method:

const fs = require("fs");
const { Resolver } = require("dns");

const resolver = new Resolver();

const readableStream = fs.createReadStream("domains.txt");

const dnsResultsStream = readableStream.map(async (domain) => {
  return await resolver.resolve4(domain);
});

dnsResultsStream.on("data", (result) => {
  console.log(result);
});

This example reads a list of domains from a file and resolves them to IP addresses using the DNS protocol. The map() method is used to transform the stream of domains into a stream of IP addresses. The concurrency option is set to 2 to allow up to 2 DNS queries to be made at the same time.


readable.filter(fn[, options])

The filter method helps you select specific data from a readable stream. It passes each chunk of data from the stream through a filter function fn that you provide. If the function returns true, the chunk is included in the filtered stream. If the function returns false, the chunk is discarded.

How it works:

  1. Function fn: You can provide a synchronous or asynchronous function fn to the filter method.

    • If the function is synchronous, it returns true or false immediately.

    • If the function is asynchronous, it returns a promise that resolves to true or false.

  2. Options: You can also provide options to the filter method:

    • concurrency: Limits the number of concurrent function calls to improve performance.

    • highWaterMark: Specifies the maximum number of filtered chunks to buffer before waiting for consumption.

    • signal: Allows you to abort the filtering process if needed.

Real-world example:

Let's say you have a stream that contains numbers. You can use the filter method to create a new stream that only includes numbers greater than 5.

// Input stream with numbers
const input = Readable.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

// Filter function to select numbers greater than 5
const filterFn = (chunk) => chunk > 5;

// Create a filtered stream
const filtered = input.filter(filterFn);

// Read the filtered stream
for await (const chunk of filtered) {
  console.log(chunk); // Output: 6, 7, 8, 9, 10
}

Potential applications:

  • Filtering data based on specific criteria (e.g., filtering out empty or invalid data).

  • Selecting specific events or messages from a stream.

  • Transforming data by modifying or extracting only the relevant parts.


readable.forEach(fn[, options])

This method allows you to iterate over the chunks of data in a readable stream. It calls the fn function for each chunk of data.

Parameters:

  • fn {Function|AsyncFunction} a function to call on each chunk of the stream.

    • data {any} a chunk of data from the stream.

    • options {Object}

      • signal {AbortSignal} aborted if the stream is destroyed allowing to abort the fn call early.

  • options {Object}

    • concurrency {number} the maximum concurrent invocation of fn to call on the stream at once. Default: 1.

    • signal {AbortSignal} allows destroying the stream if the signal is aborted.

  • Returns: {Promise} a promise for when the stream has finished.

Example:

The following code snippet shows you how to use the forEach() method to iterate over the chunks of data in a readable stream:

const readableStream = Readable.from([1, 2, 3, 4]);

readableStream.forEach((data) => {
  console.log(data);
});

This code will log the following output:

1
2
3
4

Real-world applications:

The forEach() method can be used to process the chunks of data in a readable stream in a variety of ways. For example, you could use it to:

  • Filter the chunks of data.

  • Transform the chunks of data.

  • Aggregate the chunks of data.

  • Write the chunks of data to a file.

  • Send the chunks of data to a network socket.

Potential applications:

The forEach() method can be used in a variety of applications, including:

  • Data processing

  • Data transformation

  • Data aggregation

  • Data logging

  • Data transmission


Simplified Explanation of Readable.toArray()

What is Readable.toArray()?

Readable.toArray() is a method that allows you to easily get all the data from a readable stream and convert it into an array.

How does it work?

Readable.toArray() reads all the data from the stream into memory and then creates an array with the data.

Why would you use it?

You might use Readable.toArray() if you want to:

  • Collect all the data from a stream into a single array

  • Process the data in the array using other JavaScript methods

  • Save the data to a file or database

Example:

const readableStream = Readable.from([1, 2, 3, 4]);

readableStream.toArray().then((result) => {
  console.log(result); // [1, 2, 3, 4]
});

Real-World Applications:

  • Data Collection: Collect data from a stream and store it in an array for further processing or analysis.

  • Data Transformation: Convert data from a stream into an array and then apply transformations using array methods.

  • Data Storage: Save data from a stream to a file or database by converting it into an array.

Potential Applications:

  • Web Scraping: Collect data from web pages by reading HTML documents from a stream and converting them into arrays.

  • Log Analysis: Process log files by reading them from a stream and converting them into arrays for analysis.

  • Data Aggregation: Combine data from multiple streams into a single array for reporting or visualization.


readable.some(fn[, options])

Simplified Explanation

The readable.some() method checks if any of the chunks in a readable stream meet a certain condition. It takes a function that checks each chunk and returns true or false. If the function returns true for any chunk, the stream is stopped and the promise resolves to true. If the function never returns true, the promise resolves to false.

Detailed Explanation

Parameters

  • fn - A function that takes a chunk of data from the stream as its first argument and an options object as its second argument. The function should return true if the chunk meets the condition and false otherwise.

  • options - An optional object that can contain the following properties:

    • concurrency - The maximum number of chunks that can be processed by the function at the same time. Defaults to 1.

    • signal - An AbortSignal that can be used to abort the stream.

Return Value

The method returns a promise that resolves to true if any of the chunks in the stream meet the condition, and false otherwise.

Code Snippet with Improved Example

const { Readable } = require("stream");
const { stat } = require("fs/promises");

const stream = Readable.from(["file1", "file2", "file3"]);

stream
  .some(
    async (fileName) => {
      const stats = await stat(fileName);
      return stats.size > 1024 * 1024;
    },
    { concurrency: 2 }
  )
  .then((result) => {
    console.log(result); // `true` if any file is larger than 1MB
  });

Real World Application

The some() method can be used to check if a stream contains any data that meets a certain condition. For example, it can be used to check if a file contains any lines that match a certain pattern, or if a stream of data contains any errors.

Here are some specific examples of how the some() method can be used in the real world:

  • Checking if a file contains any lines that match a certain pattern:

const { Readable } = require("stream");
const { createReadStream } = require("fs");

const stream = createReadStream("file.txt");

stream
  .some(async (chunk) => chunk.toString().includes("error"))
  .then((result) => {
    // If `result` is true, the file contains an error message.
  });
  • Checking if a stream of data contains any errors:

const { Readable } = require("stream");

const stream = Readable.from(["data1", "data2", "error data"]);

stream
  .some((chunk) => chunk.toString().includes("error"))
  .then((result) => {
    // If `result` is true, the stream contains an error message.
  });

Simplified Explanation of readable.find() Method

What it does:

The readable.find() method searches for the first chunk of data in a stream that meets a given condition. It works similarly to the Array.prototype.find() method.

How it works:

The method takes a function as an argument. This function is called on each chunk of data that passes through the stream.

If the function returns true for a chunk, the stream is stopped, and the promise returned by readable.find() is resolved with that chunk.

If the function returns false for all chunks, the promise is resolved with undefined.

How to use it:

readable
  .find((chunk) => {
    // Your custom logic to check the chunk
  })
  .then((result) => {
    // Do something with the result (which will be the first chunk that meets the condition)
  });

Code Snippets:

Example 1: Finding the first number greater than 2

const readable = Readable.from([1, 2, 3, 4]);

readable
  .find((chunk) => chunk > 2)
  .then((result) => {
    console.log(result); // Output: 3
  });

Example 2: Finding the first file with a size greater than 1MB

const { Readable } = require("stream");
const { stat } = require("fs/promises");

const readable = Readable.from(["file1", "file2", "file3"]);

readable
  .find(async (chunk) => {
    const stats = await stat(chunk);
    return stats.size > 1024 * 1024;
  })
  .then((result) => {
    console.log(result); // Output: Filename of the first file with size greater than 1MB or undefined if none found
  });

Potential Applications:

  • Checking for specific keywords in a log file.

  • Identifying the first email in a mailbox that meets certain criteria.

  • Parsing a large CSV file and finding the row that contains a particular value.


Readable.every(fn, options)

In Plain English

Imagine you have a stream of data like a conveyor belt of boxes. You want to check every box on the belt and see if they all meet a certain condition.

The every method in Node.js's stream module lets you do just that. It runs a function (fn) on each box (chunk of data) as it comes down the belt. If the function returns true for every box, the method returns true. If it returns false for any box, the method returns false.

Parameters

  • fn: This is the function you want to run on each box. It takes the box as an argument and returns true or false.

  • options: This is an optional object that can contain the following properties:

    • concurrency: This specifies how many boxes can be checked at the same time. The default is 1.

    • signal: This is an AbortSignal that you can use to stop the stream if you want to.

Return Value

The method returns a promise that resolves to true if all the boxes meet the condition, and false if any box fails.

Example

Here's an example of how you can use the every method:

const stream = Readable.from([1, 2, 3, 4, 5]);

stream.every(async (box) => await checkCondition(box))
  .then((result) => {
    console.log(`All boxes meet the condition: ${result}`);
  })
  .catch((error) => {
    console.error(`An error occurred: ${error}`);
  });

In this example, we have a stream of numbers. We use the every method to check if all the numbers in the stream are greater than 2. If they are, the promise will resolve to true, and we will log a message saying so. If any of the numbers are less than or equal to 2, the promise will reject, and we will log an error message.

Real-World Applications

The every method can be used in a variety of real-world applications, such as:

  • Validating data before processing it

  • Checking for errors in a stream of data

  • Determining if a stream of data meets a certain criteria

  • Filtering out unwanted data from a stream


flatMap() method in Node.js streams allows you to transform each chunk of data in a stream into a new stream, and then merge all the resulting streams into a single stream.

How it Works:

Imagine you have a stream of numbers like [1, 2, 3, 4]. You can use flatMap() to apply a function to each number that creates a new stream. For example, a function that doubles each number would create a stream of [2, 4, 6, 8].

After applying the function to each number, flatMap() combines all the resulting streams into a single stream. In this case, the output stream would be [2, 4, 6, 8].

Parameters:

  • fn: A function that takes each chunk of data as an argument and returns a stream or iterable.

  • options: (Optional) An object with the following properties:

    • concurrency: The maximum number of concurrent function invocations.

    • signal: An AbortSignal that can be used to abort the stream.

Real-World Example:

Suppose you have a file with a list of URLs and you want to download the content of each URL and save it to a file. You can use flatMap() to create a stream that combines all the downloaded data into a single stream.

// Suppose we have a file with the following URLs
const urls = [
  "https://example.com/file1.txt",
  "https://example.com/file2.txt",
  "https://example.com/file3.txt",
];

// Create a stream of URLs
const urlStream = Readable.from(urls);

// Create a function that downloads a file and returns a stream of its contents
const downloadFile = (url) => {
  // This function would actually make a request to the URL and return a stream of the response data
  return Readable.from([`Downloaded file ${url}`]);
};

// Use flatMap to transform the URL stream into a stream of downloaded files
const downloadedFilesStream = urlStream.flatMap(downloadFile);

// Save the combined stream to a file
downloadedFilesStream.pipe(fs.createWriteStream("downloaded-files.txt"));

In this example, flatMap() creates a stream that combines the content of all the downloaded files into a single stream. We can then pipe this stream to a file to save its contents.

Potential Applications:

  • Data transformation: Transform each chunk of data in a stream into a new stream or iterable.

  • Aggregation: Combine multiple streams or iterables into a single stream.

  • Data processing: Process data in parallel by applying a function to each chunk of data and combining the results.


readable.drop(limit[, options])

This method returns a new stream with the first limit chunks dropped.

Parameters:

  • limit: The number of chunks to drop from the readable.

  • options: An optional object that can contain the following properties:

    • signal: An AbortSignal that can be used to abort the stream.

Returns:

A new stream with the first limit chunks dropped.

Example:

import { Readable } from "node:stream";

const readable = Readable.from([1, 2, 3, 4]);

readable.drop(2).on("data", (chunk) => {
  console.log(chunk); // Prints: 3, 4
});

Real-world application:

This method can be used to skip the first few chunks of a stream. For example, if you are reading a file and you only want to process the last few lines, you can use this method to drop the first lines of the file.

Potential applications:

  • Skipping the header lines of a CSV file.

  • Ignoring the first few lines of a log file.

  • Dropping the first few frames of a video stream.


Simplified Explanation

Readable.take(limit, options)

This function creates a new stream that takes only the first limit chunks from the original readable stream.

Parameters

  • limit: The number of chunks to take.

  • options: An optional object with the following property:

    • signal: An AbortSignal object that can be used to stop the stream if the signal is aborted.

Return Value

A new readable stream with only the first limit chunks.

Code Example

const { Readable } = require("stream");

const readableStream = Readable.from([1, 2, 3, 4]);

// Create a new stream that takes only the first 2 chunks
const limitedStream = readableStream.take(2);

// Read the chunks from the limited stream
limitedStream.on("data", (chunk) => {
  console.log(chunk); // Output: 1, 2
});

Real-World Applications

  • Limiting the number of results: For example, you could use take() to limit the number of search results returned by a database query.

  • Creating a preview: You could use take() to create a preview of a large file, such as a video or an audio recording.

  • Stopping a stream: You could use take() in conjunction with an AbortSignal to stop a stream when a certain condition is met, such as when a user clicks a "cancel" button.


readable.reduce(fn[, initial[, options]])

The reduce() method is used to combine all the data chunks from a readable stream into a single value. It takes a reducer function, an initial value, and an optional options object as arguments.

Reducer Function

The reducer function is called once for each chunk of data in the stream. It takes two arguments:

  • previousValue: The result of the previous call to the reducer function, or the initial value if this is the first call.

  • currentValue: The current chunk of data from the stream.

The reducer function should return a new value that will be used as the previousValue argument in the next call to the reducer function.

Initial Value

The initial value is the value that will be used as the previousValue argument in the first call to the reducer function. If no initial value is provided, the first chunk of data from the stream will be used as the initial value.

Options Object

The options object can contain the following properties:

  • signal: An AbortSignal object that can be used to abort the stream. If the signal is aborted, the reduce() method will be cancelled and the promise returned by the reduce() method will be rejected with an AbortError.

Example

The following example shows how to use the reduce() method to calculate the total size of all the files in a directory:

const { Readable } = require('stream');
const { readdir, stat } = require('fs/promises');
const { join } = require('path');

const directoryPath = './src';

async function main() {
  const filesInDir = await readdir(directoryPath);

  const totalSize = await Readable.from(filesInDir)
    .reduce((totalSize, file) => {
      const filePath = join(directoryPath, file);
      return totalSize + (await stat(filePath)).size;
    }, 0);

  console.log(`Total size of files in ${directoryPath}: ${totalSize}`);
}

main();

Potential Applications

The reduce() method can be used for a variety of tasks, including:

  • Calculating the sum, average, or other aggregate value of a set of data.

  • Concatenating all the data chunks from a stream into a single string or buffer.

  • Filtering out unwanted data from a stream.

  • Transforming the data in a stream into a different format.


Duplex Streams

Imagine Duplex streams as pipes that can both send and receive data. They're like two-way streets for streaming data.

How it works:

  • A Duplex stream can read data from one end and write data to the other end simultaneously.

  • It's like having a conversation where you can talk and listen at the same time.

Code snippet:

const { Duplex } = require("stream");

const duplexStream = new Duplex({
  // How to read data from this stream
  read(size) {
    // Read data and push it to the stream
  },

  // How to write data to this stream
  write(chunk, encoding, callback) {
    // Write data and call the callback when done
  },
});

// Reading from the stream
duplexStream.on("data", (chunk) => {
  console.log(chunk.toString());
});

// Writing to the stream
duplexStream.write("Hello world");

Real-world application:

  • Handling data transfer between two network sockets.

Transform Streams

Think of Transform streams as filters that process data as it flows through them. They can modify, add, or remove data.

How it works:

  • A Transform stream takes data as input, processes it, and outputs the transformed data.

  • It's like a machine that can read, modify, and write data.

Code snippet:

const { Transform } = require("stream");

const transformStream = new Transform({
  // How to transform the data
  transform(chunk, encoding, callback) {
    const transformedData = chunk.toString().toUpperCase();
    callback(null, transformedData);
  },
});

// Reading from the stream
transformStream.on("data", (chunk) => {
  console.log(chunk.toString());
});

// Writing to the stream
transformStream.write("Hello world");

Real-world application:

  • Converting text to uppercase.

  • Encrypting data as it passes through the stream.


Duplex Streams

What are Duplex Streams?

Duplex streams are like double-sided pipes that allow data to flow in both directions. They can both read and write data, making them suitable for many real-world applications.

Examples of Duplex Streams:

  • TCP Sockets: Used for communication on the internet. Data flows in both directions to send and receive information.

  • zlib Streams: Used for compressing and decompressing data. Data flows in one direction for compression and in the other direction for decompression.

  • crypto Streams: Used for encrypting and decrypting data. Data flows in one direction for encryption and in the other direction for decryption.

How to Use Duplex Streams:

You can create a duplex stream using the Duplex class in Node.js. This class provides methods for reading and writing data.

Example:

const { Duplex } = require("stream");

const duplexStream = new Duplex();

duplexStream.on("data", (chunk) => {
  // Handle incoming data
});

duplexStream.write("Hello from the other side!");

Real-World Applications:

Duplex streams are used in various applications, including:

  • Communication protocols

  • Data compression and decompression

  • Encryption and decryption

  • Websockets

  • File transfers


duplex.allowHalfOpen

In a duplex stream, data can flow in both directions, from the source to the destination and back. Normally, when one side of the stream ends (e.g., the source finishes sending data), the other side will also end automatically.

However, you can set the allowHalfOpen option to true to allow one side of the stream to remain open even after the other side has ended. This is useful in situations where you want to continue processing data on one side of the stream even after the other side has finished sending or receiving data.

For example, let's say you have a duplex stream that is used to transfer files between two computers. The source computer is sending the files, and the destination computer is receiving and saving them. If the destination computer crashes or loses its connection, you would normally want the source computer to stop sending files. However, if you set allowHalfOpen to true, the source computer will continue sending files even if the destination computer is no longer available.

Real-world example

One potential application of the duplex.allowHalfOpen option is in a streaming video application. The video source (e.g., a webcam) is sending data to the video player (e.g., a web browser). If the video player crashes or loses its connection, you would normally want the video source to stop sending data. However, if you set allowHalfOpen to true, the video source will continue sending data even if the video player is no longer available. This will allow the user to continue watching the video even if they temporarily lose their connection.

Code implementation

To set the allowHalfOpen option, you can use the following code:

const { Duplex } = require("stream");

const duplex = new Duplex({
  allowHalfOpen: true,
});

Once the allowHalfOpen option is set, the duplex stream will remain open even after one side of the stream has ended.


Simplified Explanation of Transform Streams

What are Transform Streams?

Imagine a stream as a pipe that carries data. A transform stream is like a water filter attached to the pipe. It takes in water (or data) and changes it in some way before letting it flow out the other end.

How Do Transform Streams Work?

Transform streams have two main parts:

  • Readable Side: This side reads data from the input stream and passes it to the transform function.

  • Writable Side: This side receives the transformed data from the transform function and writes it to the output stream.

Transform Function:

The transform function is the heart of a transform stream. It takes the input data and transforms it in some way. For example, it could:

  • Convert text to uppercase

  • Encrypt data

  • Compress or decompress data

Real-World Examples:

Here are some real-world examples of transform streams:

  • Gzip stream: Compresses data before sending it to a server.

  • Cryptographic stream: Encrypts data before sending it over a secure connection.

  • Text converter stream: Converts text to uppercase, lowercase, or another character set.

Applications:

Transform streams have a wide range of applications, including:

  • Data compression and decompression

  • Data encryption and decryption

  • Text processing

  • Image and video processing

Code Implementation Example

Creating a Transform Stream:

const { Transform } = require("stream");

const uppercaseTransform = new Transform({
  transform(chunk, encoding, callback) {
    // Convert the chunk to uppercase
    const transformedChunk = chunk.toString().toUpperCase();
    // Push the transformed chunk to the output stream
    callback(null, transformedChunk);
  },
});

Using a Transform Stream:

const { createReadStream, createWriteStream } = require("fs");

// Create a readable stream
const readableStream = createReadStream("input.txt");

// Create a writable stream
const writableStream = createWriteStream("output.txt");

// Pipe the readable stream through the uppercase transform stream to the writable stream
readableStream.pipe(uppercaseTransform).pipe(writableStream);

This example will read data from input.txt, convert it to uppercase using the uppercaseTransform, and write the transformed data to output.txt.


Simplified Explanation:

When you want to end a stream and clean it up, you can call the destroy() method. This will stop the stream and release any resources it's using.

Diagram:

                                 +--------------+
                          +------->| Transform    |
                          |        +--------------+
                          |        |  Data Flow  |
                          |        +--------------+
                          |        |  Resource   |  <---- Clean up
                          |        +--------------+
                          |        /
                          |       /
                          +------->

Details:

  • Purpose: Stops the data flow and cleans up the stream.

  • Error Handling: You can pass an error to the destroy() method, which will be emitted as an 'error' event.

  • No More Errors: Once you've called destroy(), the stream won't emit any more errors except the one you passed in (if any).

  • Custom Behavior: You shouldn't override the destroy() method directly. Instead, implement the _destroy() method in your own stream class.

  • Close Event: By default, the destroy() method also emits a 'close' event unless you set the emitClose option to false.

Example:

const { Transform } = require("stream");

const myTransform = new Transform();

// When new data comes in, let's just pass it along.
myTransform._transform = (chunk, encoding, callback) => {
  callback(null, chunk);
};

// Stop the stream and release resources.
myTransform.destroy();

Real-World Applications:

  • File processing: Splitting a large file into smaller chunks.

  • Data filtering: Filtering out specific data points from a stream.

  • Encryption: Encrypting or decrypting data as it flows through the stream.


Simplified Explanation of stream.finished()

What is stream.finished()?

stream.finished() is a function that tells you when a stream is done reading or writing. It's like waiting for the traffic light to turn green before driving through an intersection.

Options for stream.finished()

You can customize how stream.finished() works by providing options:

  • error: Set to false to ignore errors when checking if the stream is finished.

  • readable: Set to false to ignore if the stream is still readable.

  • writable: Set to false to ignore if the stream is still writable.

  • signal: An abort signal that can abort the wait for the stream finish. If aborted, the callback will be called with an AbortError.

  • cleanup: Set to true to remove all registered listeners when the stream is finished or an error occurs.

How to Use stream.finished()

To use stream.finished(), pass in a stream and a callback function:

finished(stream, (err) => {
  if (err) {
    console.error("Stream failed.", err);
  } else {
    console.log("Stream is done reading.");
  }
});

The callback will be called with an error if the stream encountered any problems, or with null if the stream finished successfully.

Real-World Applications

stream.finished() is useful in several scenarios:

Error handling: If a stream fails, stream.finished() can be used to capture the error and handle it appropriately. Premature closing: If a stream is closed unexpectedly, stream.finished() can be used to detect this and take action, such as logging the event or retrying the operation. Graceful shutdown: When shutting down a server or application, stream.finished() can be used to wait for all active streams to finish before proceeding with the shutdown process.

Complete Code Example

Here's a complete example that reads a file and listens for when it's finished reading:

const fs = require("fs");
const { finished } = require("stream");

const rs = fs.createReadStream("archive.tar");

finished(rs, (err) => {
  if (err) {
    console.error("File reading failed.", err);
  } else {
    console.log("File reading completed.");
  }
});

rs.resume(); // Drain the stream.

Simplified Explanation:

What is stream.pipeline()? It's like a plumbing system that connects different parts of your stream. It takes data from one stream (the source) and sends it through a series of transformations (like filters or mappers) before finally sending it to a destination stream (like a file or database).

How it Works:

  1. Source: This is where the data comes from, like a file, socket, or server response.

  2. Transforms (optional): These are stream objects that modify the data as it flows through, like filters, mappers, or splitters.

  3. Destination: This is where the final transformed data ends up, like a file, database, or another stream.

  4. Callback (optional): Once the pipeline is complete, a function can be called to handle the result.

Example:

// Read a file, convert its content to uppercase, and write it to another file
const fs = require("fs");
const { pipeline } = require("stream");

pipeline(
  fs.createReadStream("input.txt"), // Source
  fs.createTransform({
    transform(chunk, encoding, callback) {
      callback(null, chunk.toString().toUpperCase());
    },
  }), // Transform
  fs.createWriteStream("output.txt"), // Destination
  (err) => {
    if (err) {
      console.error("Error writing to file", err);
    } else {
      console.log("File written successfully");
    }
  }
);

Real-World Applications:

  • Data processing: Manipulate, filter, and transform data in real-time.

  • Data export and import: Move data between different systems or databases.

  • Logging: Collect logs from multiple sources and send them to a central storage.

  • Web development: Stream data from a server to a client or vice versa.


What is a Stream?

A stream is a sequence of data that flows over time. In Node.js, streams are represented by objects that implement the stream.Readable, stream.Writable, and stream.Transform interfaces.

Readable streams emit data events, writable streams accept data events, and transform streams modify data events.

What is stream.pipeline()?

stream.pipeline() is a function that connects a series of streams together. It takes an array of streams as its first argument and a callback function as its second argument. The callback function is called when the pipeline is complete, either with an error or a value.

How to Use stream.pipeline()?

To use stream.pipeline(), you simply pass in an array of streams and a callback function. The callback function will be called when the pipeline is complete, either with an error or a value.

For example, the following code uses stream.pipeline() to pipe a file from disk to a gzip stream and then to a file on disk:

const fs = require('fs');
const zlib = require('zlib');
const { pipeline } = require('stream');

const source = fs.createReadStream('input.txt');
const transform = zlib.createGzip();
const destination = fs.createWriteStream('output.gz');

pipeline(
  source,
  transform,
  destination,
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  }
);

What are the Benefits of Using stream.pipeline()?

stream.pipeline() is a convenient way to connect a series of streams together. It handles all of the error handling and cleanup for you, so you don't have to worry about it.

Real-World Applications of stream.pipeline()

stream.pipeline() can be used in a variety of real-world applications, such as:

  • Compressing or decompressing files

  • Converting data from one format to another

  • Filtering data

  • Combining data from multiple sources

Potential Applications in Real World for stream.pipeline()

  • To create a server that streams a video file to clients

  • To process a large amount of data in a distributed system

  • To build a data pipeline that transforms and analyzes data in real time


stream.compose()

Simplified Explanation:

Imagine you have multiple streams of water (think garden hoses). stream.compose() allows you to connect these streams in a way that the water flows from the first stream into the second, from the second into the third, and so on. You can connect as many streams as you want!

Detailed Explanation:

stream.compose() combines multiple streams into a single "duplex" stream. A duplex stream can both receive data (read from) and send data (write to).

When you call stream.compose(), you pass it an array of stream objects. The first stream in the array is where the data will be written to (similar to a garden hose connected to a sink). The last stream in the array is where the data will be read from (like a hose connected to a lawn sprinkler).

If any of the streams in the array encounter an error, all the streams will be stopped, including the duplex stream returned by stream.compose().

Real-World Example:

Let's say you want to remove spaces from a piece of text, convert it to uppercase, and then print it out. You can use stream.compose() to create a duplex stream that performs these operations:

import { compose, Transform } from 'node:stream';

const removeSpaces = new Transform({
  // This function removes spaces from the input data.
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''));
  }
});

async function* toUpper(source) {
  // This function converts the input data to uppercase.
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
}

// Create a duplex stream that combines the two transformations.
const duplexStream = compose(removeSpaces, toUpper);

// Write some text to the stream.
duplexStream.end('Hello World');

// Read the transformed text from the stream.
duplexStream.on('data', (data) => {
  console.log(data.toString());  // Output: 'HELLOWORLD'
});

Applications:

stream.compose() is useful in any situation where you need to perform multiple operations on a stream of data. For example, you could use it to:

  • Filter out certain data from a stream.

  • Transform data into a different format.

  • Combine multiple streams of data into a single stream.

  • Send data to multiple destinations simultaneously.


stream.Readable.from(iterable[, options])

This method helps you create a readable stream out of an iterable (an object that implements the Symbol.asyncIterator or Symbol.iterator iterable protocol, such as arrays, sets, or generators). It's a convenient way to turn non-stream data into a stream that can be processed or piped to other streams.

How it Works:

Imagine you have a list of words like ["hello", "there", "world"]:

const words = ["hello", "there", "world"];

You can create a readable stream from this list using Readable.from():

const readableStream = Readable.from(words);

Now, you can use this stream just like any other readable stream. For example, you can attach event listeners to it to read the data:

readableStream.on("data", (chunk) => {
  console.log(chunk);
});

This will print each word from the list to the console.

Options:

You can specify additional options when creating the stream. One common option is objectMode, which specifies whether the stream will emit objects or strings/buffers. By default, Readable.from() sets objectMode to true, meaning it will emit objects. However, you can set it to false to emit strings or buffers:

Readable.from(words, { objectMode: false }); // Emits strings

Real-World Use Cases:

Readable.from() can be useful in various scenarios:

  • Generating streams from arrays or custom iterables: You can easily create readable streams from arrays or any data structure that you can iterate over.

  • Converting non-stream data to streams: If you have data in a non-stream format, such as an object or a promise, you can convert it to a stream using Readable.from().

Improved Code Snippet:

Here's an improved example that uses async/await to create a readable stream from a generator function:

async function* generateWords() {
  yield "hello";
  yield "there";
  yield "world";
}

const readableStream = Readable.from(generateWords());

readableStream.on("data", (chunk) => {
  console.log(chunk);
});

// Async usage
async function readWords() {
  for await (const chunk of readableStream) {
    console.log(chunk);
  }
}
readWords();

This example creates a readable stream from a generator function that yields the words. You can read the data from the stream using the data event or by using the async iterator provided by for await...of.

Potential Applications:

  • File processing: You can create a readable stream from a list of files and process them one at a time.

  • Database querying: You can create a readable stream from a database query result and process the rows as they are fetched.

  • Data transformation: You can use Readable.from() to convert data from one format to another. For example, you can create a stream that emits JSON objects from an array of raw data.


stream.Readable.fromWeb(readableStream[, options])

This method turns a readable stream into a Node.js Readable stream.

Parameters:

  • readableStream: The readable stream to convert.

  • options (optional): An object with the following properties:

    • encoding: The encoding to use for the data.

    • highWaterMark: The high water mark for the stream.

    • objectMode: If true, the stream will emit objects instead of buffers.

    • signal: An AbortSignal object that can be used to cancel the stream.

Returns:

A Node.js Readable stream.

Example:

const { Readable } = require("stream");
const { ReadableStream } = require("web-streams-polyfill");

const readableStream = new ReadableStream({
  start(controller) {
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

const nodeReadableStream = Readable.fromWeb(readableStream);

nodeReadableStream.on("data", (data) => {
  console.log(data.toString());
});

nodeReadableStream.on("close", () => {
  console.log("stream closed");
});

Output:

hello
world
stream closed

Real-world applications:

This method can be used to convert a readable stream from a web source, such as a fetch request or a WebSocket, into a Node.js Readable stream. This can be useful for working with web streams in Node.js applications.


stream.Readable.isDisturbed(stream)

Simplified Explanation:

When you read from a ReadableStream, a flag is set in the stream to indicate that it has been accessed. This flag is called isDisturbed.

Detailed Explanation:

The isDisturbed(stream) method checks if the stream has been read from or cancelled. It returns true if the stream has been disturbed, and false if it hasn't.

Code Example:

const { Readable } = require("stream");

const stream = new Readable();

console.log(stream.isDisturbed()); // false

stream.read();

console.log(stream.isDisturbed()); // true

Real-World Implementation:

The isDisturbed() method can be used to determine if a stream has been accessed, which can be useful in various scenarios:

  • Error handling: If a stream has been disturbed, it may have been closed or corrupted. Checking the isDisturbed() flag can help you handle errors gracefully.

  • Resource management: If a stream is not being used, you can close it to free up resources. Checking the isDisturbed() flag can help you determine if a stream is still in use.

Potential Applications:

  • Logging: Check if a log file has been accessed to determine if it needs to be rotated or archived.

  • Data pipelines: Check if a data stream has been disrupted to reroute or recover the data.

  • Caching: Check if a cached resource has been modified to invalidate the cache.


Simplified Explanation:

What is stream.isErrored(stream)?

It's like a "doctor" for your stream. It checks if the stream has any problems or "errors."

How does it work?

Imagine your stream is a river flowing smoothly. If there's a big rock or obstacle in the river, the water flow will get blocked or interrupted. In the same way, if there's a problem with your stream, it will stop or slow down.

stream.isErrored(stream) checks if there's any such obstacle or problem in your stream. If there is, it says "Yes, there's an error." If not, it says "No, everything's fine."

Real-World Examples:

  • You're downloading a file from the internet. The download is going smoothly, but suddenly, it stops. You can use stream.isErrored(downloadStream) to check if there was an error that caused the download to stop.

  • You're sending data to a server. The data is being sent successfully, but then the connection drops. stream.isErrored(uploadStream) can tell you if the error was caused by the connection drop.

Potential Applications:

  • Error handling: Respond to errors gracefully by stopping the stream or retrying the operation.

  • Monitoring: Keep track of stream health and identify potential issues before they become major problems.

  • Debugging: Use error information to understand why a stream failed and how to resolve the issue.

Improved Code Example:

const fs = require("fs");

const readableStream = fs.createReadStream("file.txt");

readableStream.on("error", (err) => {
  console.error("An error occurred:", err);
});

if (stream.isErrored(readableStream)) {
  // Handle the error
}

This example shows how to handle errors in a readable stream. When an error occurs, the 'error' event is emitted. You can attach a listener to this event to catch the error and take appropriate action, such as logging the error or handling it in a custom way.


  • stream.isReadable(stream): Checks if a given stream is readable.

    • Parameters:

      • stream: The stream to check.

    • Returns:

      • boolean: true if the stream is readable, false otherwise.

Example:

const fs = require("fs");
const readableStream = fs.createReadStream("file.txt");

console.log(readableStream.isReadable()); // true

Explanation:

Streams are objects that allow data to flow through them. They can be used to read data from a file, write data to a file, or manipulate data in-memory.

Readable streams are streams that can be read from. They have a read() method that can be used to read data from the stream. The isReadable() method checks whether a stream is readable. It returns true if the stream is readable, and false otherwise.

In the example above, we create a readable stream using the createReadStream() method of the fs module. We then call the isReadable() method on the stream to check if it is readable. The output of the isReadable() method is true, indicating that the stream is readable.

Real-world Applications:

  • Reading data from a file: Readable streams can be used to read data from a file. This is useful for tasks such as loading data into a database or displaying data on a web page.

  • Manipulating data in-memory: Readable streams can be used to manipulate data in-memory. This is useful for tasks such as filtering data or sorting data.


Topics:

  • stream.Readable.toWeb

  • ReadableStream

Simplified Explanations:

  • stream.Readable.toWeb: Converts a Node.js readable stream (stream.Readable) to a web readable stream (ReadableStream).

  • ReadableStream: A type of stream that can be read from in a structured manner. It allows for backpressure, which prevents buffer overflow.

Code Snippets:

// Example: Converting a file stream to a web readable stream
const fs = require("fs");
const { ReadableStream } = require("web-streams-polyfill");

const fileStream = fs.createReadStream("file.txt");
const webReadableStream = fileStream.toWeb();

Real-World Applications:

  • Streaming data from a server to a web application: Convert a Node.js server-side readable stream to a web readable stream to send data to a client-side web application.

  • Processing large files in a web application: Create a web readable stream from a large file to process it in chunks, preventing browser crashes due to memory overflow.

Potential Applications:

  • Video and audio streaming: Streaming video and audio files from a server to a web application.

  • Large file uploads: Allowing users to upload large files to a web application without blocking the browser.

  • Data analysis and processing: Processing large datasets on the client side in a streaming fashion.

  • WebSockets: Sending continuous data from a server to a web application through WebSocket connections.


Writable.fromWeb(writableStream[, options])

Converts a WritableStream into a Node.js Writable stream.

The WritableStream can be a writable stream from the Web Streams API. The WritableStream will be written to the Node.js Writable stream.

The options object can contain the following properties:

  • decodeStrings: When true, blobs written to the Node.js Writable stream will be decoded as strings.

  • highWaterMark: The high water mark for the Node.js Writable stream.

  • objectMode: When true, the Node.js Writable stream will be in object mode.

  • signal: An AbortSignal that can be used to cancel the transfer.

The following example shows how to use Writable.fromWeb() to write a file to a web server:

const fs = require("fs");
const { Writable } = require("stream");

const file = fs.createReadStream("file.txt");
const writableStream = new WritableStream();

const writable = Writable.fromWeb(writableStream);
file.pipe(writable);

writableStream
  .getWriter()
  .close()
  .then(() => {
    console.log("File uploaded");
  });

In this example, the file.txt file is read from the disk and piped to the Node.js Writable stream. The Node.js Writable stream is then converted to a Web Streams API WritableStream using Writable.fromWeb(). The WritableStream is then written to the web server.

Once the file has been uploaded, the getWriter().close() method is called to close the WritableStream. This will cause the Node.js Writable stream to emit the finish event.


stream.Writable.toWeb(streamWritable)

The Writable.toWeb() method in Node.js creates a new WebWritable stream which takes data from the input stream and writes it to a WritableStream passed as its first argument.

The output stream will be paused until the input stream is piped into it. This method is useful for creating a writable stream that can be used in a web worker.

Syntax:

static toWeb(streamWritable: stream.Writable): WritableStream;

Parameters:

  • streamWritable: A writable stream to which data will be piped.

Returns:

  • A WritableStream object.

Example:

const { Writable } = require("stream");

const writableStream = new Writable();

// Create a web worker and pipe data to it.
const worker = new Worker("worker.js");
worker.postMessage({
  writableStream: writableStream.toWeb(),
});

// Write data to the stream.
writableStream.write("Hello world!");

Applications:

This method can be used to create a writable stream that can be used in a web worker. This can be useful for offloading computationally expensive operations to a web worker, while still allowing the main thread to continue running.


stream.Duplex.from(src)

The stream.Duplex.from() method is a utility function that helps you create a duplex stream from a variety of sources. A duplex stream is a stream that can both read and write data. This method takes a single argument, src, which can be one of the following types:

  • A stream

  • A Blob

  • An ArrayBuffer

  • A string

  • An iterable

  • An async iterable

  • An async generator function

  • An async function

  • A promise

  • An object with writable and readable properties

  • A ReadableStream

  • A WritableStream

Depending on the type of src you provide, the Duplex.from() method will create a duplex stream that reads from or writes to the appropriate source or destination.

Here are some examples of how to use the Duplex.from() method:

// Create a duplex stream from a readable stream
const readableStream = fs.createReadStream("file.txt");
const duplexStream = Duplex.from(readableStream);

// Create a duplex stream from a writable stream
const writableStream = fs.createWriteStream("file.txt");
const duplexStream = Duplex.from(writableStream);

// Create a duplex stream from a string
const string = "Hello, world!";
const duplexStream = Duplex.from(string);

// Create a duplex stream from an array buffer
const arrayBuffer = new ArrayBuffer(10);
const duplexStream = Duplex.from(arrayBuffer);

// Create a duplex stream from an iterable
const iterable = ["a", "b", "c", "d", "e"];
const duplexStream = Duplex.from(iterable);

// Create a duplex stream from an async iterable
const asyncIterable = async function* () {
  yield "a";
  yield "b";
  yield "c";
};
const duplexStream = Duplex.from(asyncIterable());

// Create a duplex stream from an async generator function
const asyncGeneratorFunction = async function* (source) {
  for await (const chunk of source) {
    yield chunk;
  }
};
const duplexStream = Duplex.from(asyncGeneratorFunction(iterable));

// Create a duplex stream from an async function
const asyncFunction = async function () {
  return "Hello, world!";
};
const duplexStream = Duplex.from(asyncFunction());

// Create a duplex stream from an object with writable and readable properties
const object = {
  writable: writableStream,
  readable: readableStream,
};
const duplexStream = Duplex.from(object);

// Create a duplex stream from a ReadableStream
const readableStream = new ReadableStream();
const duplexStream = Duplex.from(readableStream);

// Create a duplex stream from a WritableStream
const writableStream = new WritableStream();
const duplexStream = Duplex.from(writableStream);

Real-world applications

The Duplex.from() method can be used in a variety of real-world applications, such as:

  • Data transformation: You can use the Duplex.from() method to create a duplex stream that transforms data as it flows through the stream. For example, you could use a duplex stream to convert a CSV file to a JSON file, or to encrypt or decrypt data.

  • Data buffering: You can use the Duplex.from() method to create a duplex stream that buffers data. This can be useful for applications that need to handle large amounts of data, or for applications that need to ensure that data is delivered in a consistent manner.

  • Data piping: You can use the Duplex.from() method to create a duplex stream that pipes data from one stream to another. This can be useful for applications that need to chain together multiple streams, or for applications that need to send data to multiple destinations.

Potential applications

The Duplex.from() method has the potential to be used in a wide variety of real-world applications, including:

  • Streaming media: The Duplex.from() method can be used to create duplex streams that stream media data, such as video or audio data. This can be useful for applications that need to deliver media data to multiple clients, or for applications that need to transcode media data on the fly.

  • Data analytics: The Duplex.from() method can be used to create duplex streams that perform data analytics on the fly. This can be useful for applications that need to analyze large amounts of data, or for applications that need to generate real-time insights from data.

  • Machine learning: The Duplex.from() method can be used to create duplex streams that train machine learning models on the fly. This can be useful for applications that need to train models on large amounts of data, or for applications that need to deploy models in real time.


Creating a Duplex Stream from Web Streams

Concept:

A duplex stream is a type of stream that can both read and write data. Node.js provides a Duplex.fromWeb() method to create a duplex stream from a pair of web streams, which are streams that can be used in the browser.

Usage:

const { Duplex } = require("stream");
const { ReadableStream, WritableStream } = require("stream/web");

const readable = new ReadableStream({
  // Start the stream by enqueuing some data
  start(controller) {
    controller.enqueue("world");
  },
});

const writable = new WritableStream({
  // Process incoming chunks by printing them to the console
  write(chunk) {
    console.log("writable", chunk);
  },
});

const pair = {
  readable,
  writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: "utf8", objectMode: true });

duplex.write("hello");

for await (const chunk of duplex) {
  console.log("readable", chunk);
}

Explanation:

In this example:

  • We create a readable stream that enqueues the string "world" as its initial data.

  • We create a writable stream that simply prints incoming chunks to the console.

  • We create a pair object that contains the readable and writable streams.

  • We use the Duplex.fromWeb() method to create a duplex stream from the pair, specifying the encoding and object mode options.

  • We write the string "hello" to the duplex stream.

  • We use a for await...of loop to read the incoming chunks from the duplex stream and print them to the console.

Real-World Applications:

Duplex streams are useful in various real-world applications, such as:

  • Creating interactive command-line interfaces (CLIs) that allow users to input data and receive responses.

  • Building web servers that handle both requests and responses.

  • Implementing data pipelines or transformations that process data received from one source and write it to another.

Potential Improvements:

  • The code snippet can be improved by handling errors and closing the streams properly:

const { Duplex } = require("stream");
const { ReadableStream, WritableStream } = require("stream/web");

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue("world");
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log("writable", chunk);
  },
  close() {
    console.log("writable closed");
  },
});

const pair = {
  readable,
  writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: "utf8", objectMode: true });

duplex.write("hello");

for await (const chunk of duplex) {
  console.log("readable", chunk);
}

duplex.end();

stream.Duplex.toWeb()

Stability: 1 - Experimental

The stream.Duplex.toWeb() method bridges the Node.js and web streams APIs. It allows creating a Node.js duplex stream that wraps a web readable and writable stream.

Functionality:

  • Converts a web readable stream to a Node.js readable stream.

  • Converts a web writable stream to a Node.js writable stream.

  • Allows for interoperability between Node.js and web streams.

Usage:

const { Duplex } = require('stream');

// Create a Node.js duplex stream
const duplex = Duplex();

// Convert web streams to Node.js streams
const webReadableStream = new ReadableStream({});
const webWritableStream = new WritableStream({});
const { readable, writable } = Duplex.toWeb(duplex);

// Pipe web readable stream to Node.js readable stream
webReadableStream.pipeTo(readable);

// Pipe Node.js writable stream to web writable stream
writable.pipeTo(webWritableStream);

// Read from Node.js readable stream and write to web writable stream
readable.on('data', (chunk) => {
  console.log('Received data: ', chunk);
  writable.write(chunk);
});

Real-World Applications:

  • Integrating web applications with Node.js for real-time data processing.

  • Facilitating data transfer between web browsers and Node.js servers.

  • Enhancing interoperability in microservices architectures that use multiple programming languages.


stream.addAbortSignal(signal, stream)

  • signal {AbortSignal} A signal representing possible cancellation

  • stream {Stream|ReadableStream|WritableStream}

A stream to attach a signal to.

Attaches an AbortSignal to a readable or writeable stream. This lets code control stream destruction using an AbortController.

Calling abort on the AbortController corresponding to the passed AbortSignal will behave the same way as calling .destroy(new AbortError()) on the stream, and controller.error(new AbortError()) for webstreams.

Simplified Explanation:

Imagine you have a stream (like a water pipe) that's sending data. You can attach a "cancel signal" to the stream, which is like a switch that can stop the data flow if you want.

Code Snippet:

// Create an abort controller (the switch)
const controller = new AbortController();

// Attach the abort signal (the switch's handle) to a readable stream (the water pipe)
const read = addAbortSignal(
  controller.signal,
  fs.createReadStream("object.json")
);

// Later, you can "switch off" the stream by calling abort on the controller
controller.abort();

Real-World Application:

  • You can use this to cancel a long-running operation, like downloading a file, if you no longer need the data.

Or using an AbortSignal with a readable stream as an async iterable:

// Create an abort controller (the switch)
const controller = new AbortController();

// Attach the abort signal (the switch's handle) to a readable stream (the water pipe)
const stream = addAbortSignal(
  controller.signal,
  fs.createReadStream("object.json")
);

// Set a timeout to automatically cancel the operation after 10 seconds
setTimeout(() => controller.abort(), 10_000);

// Start reading the stream
(async () => {
  try {
    // Loop through each chunk of data (like sips of water)
    for await (const chunk of stream) {
      // Process the chunk (like drinking the water)
      await process(chunk);
    }
  } catch (e) {
    // If the operation was cancelled (like someone flipping the switch)
    if (e.name === "AbortError") {
      // Handle the cancellation
    } else {
      // Handle any other errors
      throw e;
    }
  }
})();

Real-World Application:

  • You can use this to prevent a stream from consuming too much memory or resources if there's a risk of it taking too long to complete.

Or using an AbortSignal with a ReadableStream:

// Create an abort controller (the switch)
const controller = new AbortController();

// Create a readable stream (the water pipe)
const rs = new ReadableStream({
  start(controller) {
    // Send some data chunks (like filling the pipe with water)
    controller.enqueue("hello");
    controller.enqueue("world");
    controller.close();
  },
});

// Attach the abort signal (the switch's handle) to the readable stream
addAbortSignal(controller.signal, rs);

// Start reading the stream (like opening the tap)
const reader = rs.getReader();

reader.read().then(({ value, done }) => {
  // Read the first chunk (like taking a sip of water)
  console.log(value); // hello
  console.log(done); // false

  // Cancel the operation (like closing the tap)
  controller.abort();
});

// Wait for the stream to finish (like waiting for the pipe to empty)
finished(rs, (err) => {
  if (err) {
    // If the operation was cancelled (like someone flipping the switch)
    if (err.name === "AbortError") {
      // Handle the cancellation
    }
  }
});

Real-World Application:

  • You can use this to cancel a stream that's generating data too quickly or that you're no longer interested in.


stream.getDefaultHighWaterMark(objectMode)

  • objectMode {boolean}

  • Returns: {integer}

The getDefaultHighWaterMark() method returns the default highWaterMark value.

The default highWaterMark can be different for objectMode streams. For objectMode streams, the default highWaterMark is 16, while for non-objectMode streams, the default highWaterMark is 16384. This means that for objectMode streams, the stream will emit a 'data' event 16 times before it starts buffering.

Example

const stream = require("stream");

// Create a new stream
const myStream = new stream.Readable();

// Get the default highWaterMark for the stream
const highWaterMark = myStream.getDefaultHighWaterMark();

// Log the default highWaterMark
console.log(highWaterMark); // 16384

setDefaultHighWaterMark

Sets the default highWaterMark for all streams to value, where value is an integer. highWaterMark signifies the maximum number of bytes that can be buffered before causing the stream to pause.

Simplified Explanation

Think of a stream as a pipe that carries data. The highWaterMark is like a threshold that determines when the pipe gets too full. If the pipe fills beyond the threshold, the stream pauses automatically to prevent overflowing.

Code Snippet

// Set the default highWaterMark to 1024 bytes
stream.setDefaultHighWaterMark(true, 1024);

// Create a stream with a custom highWaterMark
const myStream = new Readable({
  highWaterMark: 2048,
});

Applications

  • Preventing memory leaks: If the highWaterMark is too high, it can cause the server to run out of memory. Setting a reasonable highWaterMark can prevent this.

  • Improving performance: A lower highWaterMark can improve performance by pausing the stream earlier, preventing excessive buffering.

  • Customizing stream behavior: The default highWaterMark may not be suitable for all applications. Setting a custom highWaterMark allows for fine-tuning the stream's behavior.


What is a Stream?

Think of a stream as a water pipe. Data flows through the pipe, and you can read or write data to the pipe.

Types of Streams

There are four main types of streams:

  • Readable: You can read data from the stream. Like drinking water from a tap.

  • Writable: You can write data to the stream. Like filling a water bottle.

  • Duplex: You can both read and write data to the stream. Like a water fountain where you can drink and refill your bottle.

  • Transform: You can read data from the stream, do something to it, and then write it out. Like a water filter that removes impurities.

Implementing a Stream

To create a custom stream, you need to extend one of the basic stream classes and implement specific methods:

  • Readable: Implement the _read() method to read data.

  • Writable: Implement the _write() method to write data.

  • Duplex: Implement both _read() and _write().

  • Transform: Implement the _transform() method to transform data.

Example: Creating a Readable Stream

Here's an example of creating a readable stream that reads data from a file:

const { Readable } = require("stream");

class FileStream extends Readable {
  constructor(filename) {
    super();
    this.filename = filename;
  }

  _read() {
    // Read data from the file and push it to the stream
  }
}

Example: Creating a Writable Stream

Here's an example of creating a writable stream that writes data to a file:

const { Writable } = require("stream");

class FileStream extends Writable {
  constructor(filename) {
    super();
    this.filename = filename;
  }

  _write(chunk, encoding, callback) {
    // Write the chunk of data to the file
    callback();
  }
}

Example: Creating a Duplex Stream

Here's an example of creating a duplex stream that reads data from a socket and writes it to a file:

const { Duplex } = require("stream");

class SocketFileStream extends Duplex {
  constructor(socket, filename) {
    super();
    this.socket = socket;
    this.filename = filename;
  }

  _read() {
    // Read data from the socket and push it to the stream
  }

  _write(chunk, encoding, callback) {
    // Write the chunk of data to the file
    callback();
  }
}

Example: Creating a Transform Stream

Here's an example of creating a transform stream that converts uppercase letters to lowercase:

const { Transform } = require("stream");

class LowercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // Convert the chunk of data to lowercase
    callback(null, chunk.toString().toLowerCase());
  }
}

Applications of Streams

Streams are used in many applications, such as:

  • HTTP servers: To read and write data to HTTP requests and responses.

  • File systems: To read and write data to files.

  • Databases: To read and write data to databases.

  • Data processing: To transform and filter data.


Simplified Construction of Node.js Streams

Overview

Streams are a fundamental part of Node.js for handling data in a continuous way. Instead of working with data as a whole, streams break it down into smaller chunks and process them one at a time. This allows for more efficient handling of large or real-time data.

Creating Streams without Inheritance

In some cases, you may not need to extend the built-in stream classes (Writable, Readable, Duplex, Transform). You can create streams directly by providing your own custom implementations for the required methods.

Writable Streams

What they do: Writable streams allow you to write data to a destination. Think of them as a pipe where you can send data to be stored or processed.

Creating a Writable Stream:

const { Writable } = require("stream");

const myWritable = new Writable({
  // Initialize any internal state or resources
  construct(callback) {
    // ...
    callback(); // Signal that setup is complete
  },

  // The actual writing function
  write(chunk, encoding, callback) {
    // Write the chunk of data to the destination
    // ...
    callback(); // Signal that the writing is complete
  },

  // When the stream is destroyed (closed or aborted)
  destroy(err) {
    // Perform any necessary cleanup actions
    // ...
  },
});

In the above example, construct() is called to set up the stream, write() is used to write data to the destination, and destroy() is used to clean up when the stream is finished. Callbacks are used to signal to Node.js that each operation is complete.

Readable Streams

What they do: Readable streams allow you to read data from a source. They provide a way to access data in a continuous manner, one chunk at a time.

Creating a Readable Stream:

const { Readable } = require("stream");

const myReadable = new Readable({
  // Initialize any internal state or resources
  construct(callback) {
    // ...
    callback(); // Signal that setup is complete
  },

  // The actual reading function
  read(size) {
    // Read a chunk of data from the source
    // ...

    // If there is more data to be read, push it to the stream
    if (moreData) {
      this.push(chunk);
    }

    // If there is no more data, push null to signal the end of the stream
    else {
      this.push(null);
    }
  },
});

In this example, construct() is used to set up the stream, read() is used to read data from the source, and push() is used to push data into the stream. push() can be called multiple times to send multiple chunks of data.

Real-World Applications

Streams have a wide range of applications in the real world:

  • File I/O: Reading and writing data to and from files

  • Networking: Sending and receiving data over a network

  • Data processing: Performing operations on data as it is streamed through a series of transformations

  • Data analytics: Reading and processing large amounts of data in real-time

By understanding and utilizing streams, you can handle data efficiently and effectively in your Node.js applications.


Implementing a Writable Stream

In Node.js, a writable stream allows you to write data to a destination. You can create your own custom writable streams by extending the stream.Writable class.

Creating a Custom Writable Stream

To create a custom writable stream, you need to:

  • Call the new stream.Writable([options]) constructor: This initializes the stream with default options or any custom options you provide.

  • Implement the writable._write() or writable._writev() method: These methods are called when data is written to the stream.

// Example: Custom Writable Stream

const { Writable } = require("stream");

class MyWritableStream extends Writable {
  _write(chunk, encoding, callback) {
    // Do something with the data chunk before writing it
    this.write(chunk, encoding, callback);
  }
}

const myStream = new MyWritableStream();

Real-World Implementation

A writable stream can be used for various tasks, such as:

  • Logging: Writing messages or errors to a file or console.

  • File writing: Saving data to a file.

  • Network communication: Sending data over a network socket.

Example: Custom Log Stream

// Custom Log Stream

const fs = require("fs");
const Writable = require("stream").Writable;

class LogStream extends Writable {
  _write(chunk, encoding, callback) {
    fs.appendFile("log.txt", chunk, callback);
  }
}

const logStream = new LogStream();

process.stdout.pipe(logStream); // Redirect console output to the log file

This script writes all console output to a log.txt file.

Applications

Writable streams are essential for handling data output in various applications, including:

  • Web servers (e.g., Express.js): Sending responses to HTTP requests.

  • Database access: Writing data to databases.

  • File processing: Reading and writing large files efficiently.

  • Cloud computing: Sending data to cloud services like Amazon S3.


new stream.Writable([options])

Writable streams are used to write data to a destination, such as a file or a network socket. They are created using the new Writable([options]) constructor.

Passing Options

You can pass options to the Writable constructor to customize its behavior.

  • highWaterMark:

    • Determines when the stream starts to buffer data.

    • Default: 16384 (16 KiB) for normal streams and 16 for objectMode streams.

  • decodeStrings:

    • Whether to convert strings to buffers before writing them.

    • Default: true.

  • defaultEncoding:

    • The default encoding to use when no encoding is specified.

    • Default: 'utf8'.

  • objectMode:

    • Whether the stream can write JavaScript values other than strings, buffers, or Uint8Arrays.

    • Default: false.

  • emitClose:

    • Whether the stream should emit a 'close' event after it has been destroyed.

    • Default: true.

  • write:

    • The function that will be called when data is written to the stream.

  • writev:

    • The function that will be called when data is written in chunks.

  • destroy:

    • The function that will be called when the stream is destroyed.

  • final:

    • The function that will be called when the stream is ended.

  • construct:

    • The function that will be called when the stream is constructed.

  • autoDestroy:

    • Whether the stream should automatically call .destroy() on itself after ending.

    • Default: true.

  • signal:

    • An AbortSignal representing possible cancellation.

Usage

You can use a Writable stream to write data to a destination.

const fs = require("fs");
const { Writable } = require("stream");

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // Write the chunk to a file.
    fs.writeFile("myfile.txt", chunk, callback);
  },
});

myWritable.write("Hello, world!");
myWritable.end();

Real-World Applications

Writable streams can be used in a variety of real-world applications, such as:

  • Writing data to files

  • Sending data over a network

  • Compressing data

  • Encrypting data

  • Transforming data (e.g., converting a CSV file to JSON)


Writable Stream Construction and Initialization

writable._construct(callback) Method

Purpose:

Initializes the writable stream and allows for asynchronous resource setup before the stream is ready for writing.

Parameters:

  • callback: A function that is called when initialization is complete, optionally with an error argument.

Usage:

This method is not called directly by the user but is implemented by child classes of Writable. It is typically used to perform setup tasks such as:

  • Opening files or databases

  • Establishing network connections

  • Loading data or resources

Example:

class CustomWritable extends Writable {
  constructor(options) {
    super();

    // Asynchronously load data...
    this._construct((err) => {
      if (err) {
        this.emit("error", err);
      } else {
        this.emit("ready");
      }
    });
  }

  _write(chunk, encoding, callback) {
    // Write the chunk to the destination...
    callback();
  }
}

In this example, the CustomWritable class loads data asynchronously before it is ready for writing. The _construct() method is responsible for performing this setup and calling the callback when complete.

Applications:

The _construct() method is useful in situations where the stream requires additional setup or resources before it can be used. It allows you to defer the actual writing operations until the stream is fully initialized.


writable._write() Method

Explanation:

Imagine you have a sink that you use to drain water. The writable._write() method is like the pipe that connects your faucet to the sink. It takes in water (data) from your faucet and sends it down to the sink (the underlying resource).

Parameters:

  • chunk: The data you want to send down the pipe. This can be a string, a Buffer, or any other type of data.

  • encoding: The character encoding of the data if it's a string. For example, "utf-8" or "ascii".

  • callback: A function that you can call when the data has been successfully sent down the pipe.

How it Works:

When you call stream.write(data), the stream calls writable._write() to send the data down the pipe. The writable._write() method takes the data, converts it to a Buffer if necessary, and passes it to the callback. The callback is called when the data has been successfully sent.

Real-World Example:

Let's say you have a web server that generates HTML pages and sends them to clients. The writable._write() method would be responsible for sending the HTML pages to the clients' browsers.

Potential Applications:

  • Sending data to a database

  • Sending data to a file

  • Sending data over a network

Important Notes:

  • The writable._write() method should never be called directly by application code. It should only be called by the stream class itself.

  • The callback function must be called exactly once, even if there is an error.

  • If the callback function is not called, the stream will stop sending data.


What is writable._writev() in Node.js?

writable._writev() is an internal method in Node.js streams that allows stream implementations to write multiple chunks of data at once. It's typically used when the stream implementation can process data in a more efficient way when multiple chunks are provided together.

How does writable._writev() work?

When a stream receives data to write, it typically buffers the data internally. If the stream implementation supports writable._writev(), then instead of buffering the data individually, it will group the chunks together and call writable._writev() with the grouped chunks.

When is writable._writev() used?

writable._writev() is typically used in stream implementations that can process data more efficiently when multiple chunks are provided together. For example, a stream implementation that writes data to a file system may use writable._writev() to write multiple chunks to the file system in a single operation.

Real-world example

Here's an example of a simplified stream implementation that uses writable._writev():

// Example of a simplified stream implementation that uses writable._writev()

const { Writable } = require("stream");

class MyWritable extends Writable {
  _writev(chunks, callback) {
    // Process the chunks here
    // ...

    // Invoke the callback when processing is complete
    callback();
  }
}

// Create a new instance of MyWritable
const myWritable = new MyWritable();

// Write some data to the stream
myWritable.write("Hello ");
myWritable.write("World!");

// Listen for the 'finish' event, which is emitted when all data has been written
myWritable.on("finish", () => {
  console.log("All data has been written");
});

This stream implementation simply processes the data in memory. However, a real-world implementation may write the data to a file system, a database, or another destination.

Potential applications

writable._writev() can be used in a variety of stream implementations, including:

  • File system writers

  • Database writers

  • Network writers

  • Compression writers

Any stream implementation that can process data more efficiently when multiple chunks are provided together can benefit from using writable._writev().


writable._destroy(err, callback)

Simplified Explanation:

When you're done writing to a file or a network connection, the _destroy() method destroys the stream, releasing any resources it holds.

Topics in Detail:

  • Purpose: Called internally by writable.destroy() when you want to stop writing to the stream and clean up.

  • Parameters:

    • err: An optional error that occurred.

    • callback: A function to call when the stream is destroyed.

  • Usage:

    • You cannot call _destroy() directly. It's called by the destroy() method defined on the Writable class.

    • If you override _destroy() in a child class, you must call the super._destroy() method to ensure proper cleanup.

  • Callback:

    • The callback takes an optional error argument.

    • It's executed asynchronously when the stream is fully destroyed.

Code Example:

// A custom stream that writes to a file
const fs = require("fs");

class MyWritableStream extends require("stream").Writable {
  _destroy(err, callback) {
    if (err) {
      console.error("Error during stream destruction:", err);
    }

    fs.close(this.fd, (closeError) => {
      callback(closeError);
    });
  }
}

const stream = new MyWritableStream();
stream.end(); // Will call the _destroy() method when the stream is finished

Real-World Applications:

  • Saving data to a database and closing the connection when finished.

  • Writing a log file and closing it after all the logs are written.

  • Sending data over a network socket and closing the connection when the transmission is complete.


Writable Stream

A writable stream is a stream that you can write data to.

_final() Method

The _final() method is a special method that is called when the writable stream has finished writing all of its data. It is not a public API and should not be called directly. It may be implemented by child classes of Writable to perform cleanup tasks before the stream closes.

When to Use _final()

The _final() method is useful for performing tasks such as:

  • Closing resources that were opened during the writing process.

  • Flushing any buffered data to the underlying file system or network.

  • Performing any other necessary cleanup actions before the stream closes.

Example

Here is an example of how the _final() method might be used in a child class of Writable:

class MyWritableStream extends Writable {
  _final(callback) {
    // Close the file handle.
    this.fileHandle.close();

    // Flush any remaining data to the file.
    this.fileHandle.flush();

    // Call the callback to indicate that the stream has finished writing.
    callback();
  }
}

Real-World Application

The _final() method can be used in any situation where you need to perform cleanup actions before a writable stream closes. For example, it could be used to close a database connection or to flush a cache to disk.


Errors While Writing

When writing to a stream, errors can occur. These errors must be handled properly to avoid undefined behavior.

Handling Errors in Writable Streams

Writable streams handle errors by passing the error as the first argument to the callback function provided in the write() or _writev() methods. It's important to use the callback to propagate errors correctly.

// Example:
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // If the chunk contains the letter 'a', it's invalid.
    if (chunk.toString().indexOf("a") >= 0) {
      callback(new Error("chunk is invalid"));
    } else {
      callback(); // No error
    }
  },
});

Error Propagation in Piped Streams

If a Readable stream is piped into a Writable stream, and the Writable stream emits an error, the Readable stream will be automatically unpiped. This prevents errors from propagating to the Readable stream.

Real-World Applications

Proper error handling in writable streams is essential for robust streaming applications. For example, in a data processing pipeline, errors in writing to a database can be handled gracefully, preventing data loss or corruption.

Simplified Explanation

Imagine you have a water pipe (Writable stream) that you want to fill with water (data). If the pipe has a leak (error), we need to tell someone (callback) so they can fix it. By following these error handling rules, we make sure that the pipe doesn't overflow or damage other parts of the system (Readable stream).


Writable Streams

Imagine a water hose. You can connect it to a faucet (a source of water) and let water flow out. Similarly, a writable stream in Node.js allows you to write data to a destination.

Creating a Writable Stream

To create a writable stream, use the Writable class:

const { Writable } = require("node:stream");

const myWritableStream = new Writable();

Write Data to the Stream

To write data to the stream, use the write() method:

myWritableStream.write("Hello, world!");

Handling Errors

If there's an error while writing data, the stream will emit an 'error' event. You should handle this event to prevent the program from crashing.

myWritableStream.on("error", (err) => {
  console.error("Error:", err);
});

Custom Writable Stream

Sometimes, you need to customize how the stream writes data. For example, you might want to validate the data before writing it. You can create a custom writable stream by extending the Writable class:

class MyCustomStream extends Writable {
  _write(chunk, encoding, callback) {
    // Validate the data here
    // ...

    // If valid, call the callback with no arguments
    callback();

    // If invalid, call the callback with an error
    callback(new Error("Invalid data"));
  }
}

const myCustomStream = new MyCustomStream();

Real-World Applications

  • Logging: Writable streams can be used to write logs to a file or the console.

  • Data processing: Writable streams can be used to process data as it's being written, such as filtering or transforming it.

  • Data storage: Writable streams can be used to write data to a database or other storage system.

Example: Logging to a File

const fs = require("node:fs");

const writableStream = fs.createWriteStream("log.txt");

writableStream.write("This is a log message");
writableStream.end();

This example creates a writable stream that writes to a file called log.txt. The write() method writes data to the stream, and the end() method ends the stream and closes the file.


Decoding Buffers in a Writable Stream

Decoding Buffers

Buffers are a way to store binary data (e.g., images, videos) in Node.js. However, they can't be directly read as strings. Decoding buffers means converting them into a format we can easily read and understand.

StringDecoder

The StringDecoder class helps decode buffers into strings. It takes care of handling multi-byte characters, such as those used in languages like Chinese and Korean.

Writable Stream

A writable stream is a stream that we can write data to. In our case, we'll use it to write decoded data.

StringWritable

StringWritable is a custom writable stream that handles both strings and buffers. When we write buffers, it automatically decodes them using StringDecoder.

Example Code

const { Writable } = require("node:stream");
const { StringDecoder } = require("node:string_decoder");

class StringWritable extends Writable {
  constructor() {
    super();
    this._decoder = new StringDecoder();
    this.data = "";
  }
  _write(chunk, encoding, callback) {
    if (encoding === "buffer") {
      chunk = this._decoder.write(chunk);
    }
    this.data += chunk;
    callback();
  }
  _final(callback) {
    this.data += this._decoder.end();
    callback();
  }
}

const euro = [[0xe2, 0x82], [0xac]].map(Buffer.from);
const w = new StringWritable();

w.write("currency: ");
w.write(euro[0]);
w.end(euro[1]);

console.log(w.data); // 'currency: €'

In this example:

  • We create a custom StringWritable stream.

  • We write both a string ("currency: ") and a buffer containing the euro symbol (euro[0]) to the stream.

  • Finally, we end the stream by writing another buffer (euro[1]).

  • The decoded data is stored in the data property of the stream, and we print it out.

Real-World Applications

Decoding buffers is useful in various scenarios:

  • Web servers: Decoding requests containing data in non-string formats (e.g., multipart form data).

  • Data processing: Converting binary files into human-readable formats.

  • Network communication: Reading and writing data over sockets that use buffers.


Readable Streams in Node.js

Streams are a simple yet powerful way to handle data as it becomes available. They provide a way to read and write data in a continuous flow, making them ideal for processing large amounts of data efficiently.

A Readable Stream is a special type of stream that allows you to read data from it. It's like a water pipe, where data flows through it and you can tap into it at any point to extract the data you need.

Implementing a Readable Stream:

To create a Readable Stream, you need to extend the stream.Readable class and implement the _read() method. The _read() method is responsible for reading data from your data source and making it available to the stream.

const { Readable } = require("stream");

class MyReadableStream extends Readable {
  constructor(options) {
    super(options);
    this.data = ["a", "b", "c"];
  }

  _read() {
    // Read data from your data source and push it to the stream using `push()`.
    if (this.data.length) {
      this.push(this.data.shift());
    } else {
      // If there's no more data, call `push(null)` to signal the end of the stream.
      this.push(null);
    }
  }
}

Real-World Examples:

Readable Streams can be used in various scenarios:

  • File Reading: Reading data from a file in chunks, avoiding loading the entire file into memory.

  • Database Querying: Streaming results from a database query, allowing you to process them as they become available.

  • Network Communication: Reading data from a network socket, allowing you to handle incoming data in real time.

Potential Applications:

  • Data Processing: Pipelining data through a series of transformations, filtering, and aggregation.

  • Streaming Video/Audio: Sending and receiving video or audio data in real time, allowing for smooth playback.

  • Log Analysis: Processing large log files and extracting valuable insights.


What is a Readable Stream?

A Readable Stream allows you to read data from a source, like a file or a network connection. Node.js has a built-in Readable stream class that you can use to create your own readable streams.

Creating a Readable Stream

To create a readable stream, you can use the following code:

const { Readable } = require("stream");

const myReadable = new Readable({
  read(size) {
    // This function will be called when data is requested from the stream.
    // You should implement this function to read data from your source and
    // push it into the stream.
  },
});

Options for Readable Streams

The Readable constructor takes an options object as an argument. This object can contain the following properties:

  • highWaterMark: The maximum number of bytes that can be buffered in the stream before it stops reading from the source.

  • encoding: The encoding used to decode buffers into strings.

  • objectMode: A boolean value that indicates whether the stream should emit objects instead of buffers.

  • emitClose: A boolean value that indicates whether the stream should emit a 'close' event after it has been destroyed.

  • read: The function that will be called when data is requested from the stream.

  • destroy: The function that will be called when the stream is destroyed.

  • construct: The function that will be called when the stream is constructed.

  • autoDestroy: A boolean value that indicates whether the stream should automatically call .destroy() on itself after ending.

  • signal: A signal representing possible cancellation.

Real-World Example

One common use case for readable streams is reading data from a file. Here is an example of how you could use a readable stream to read data from a file:

const fs = require("fs");
const { Readable } = require("stream");

const myReadable = new Readable({
  read(size) {
    const chunk = fs.readSync(fd, Buffer.alloc(size));
    if (chunk === null) {
      this.push(null);
    } else {
      this.push(chunk);
    }
  },
});

myReadable.pipe(process.stdout);

This code will create a readable stream that reads data from a file and pipes it to the standard output.

Applications in the Real World

Readable streams have a wide range of applications in the real world, including:

  • Reading data from files

  • Reading data from network connections

  • Processing data from databases

  • Parsing data from web pages

  • Compressing and decompressing data


Custom Readable Stream with Async Initialization

_construct(callback)

Purpose:

This method is called by the stream internally during construction. It allows you to perform asynchronous initialization before the stream can be used for reading.

Usage:

Instead of calling _construct() directly, extend the Readable class and implement your own _construct() method. This method will be called during stream construction.

Example:

const { Readable } = require("stream");

class MyReadableStream extends Readable {
  constructor() {
    super();

    // Initialize data asynchronously
    this._construct((err) => {
      if (err) {
        this.destroy(err);
      } else {
        // Initialization complete, start reading
        this._read();
      }
    });
  }

  _construct(callback) {
    fs.readFile("data.txt", (err, data) => {
      if (err) {
        callback(err);
      } else {
        this._data = data;
        callback();
      }
    });
  }

  _read() {
    if (this._data) {
      this.push(this._data);
      this._data = null;
    } else {
      this.push(null); // End of stream
    }
  }
}

const stream = new MyReadableStream();

// Usage:
stream.on("data", (data) => {
  console.log(data.toString());
});

Applications:

  • Initialize network connections

  • Load data from a database

  • Perform authentication or security checks

  • Any other asynchronous task needed before reading from the stream.


What is readable._read()?

readable._read() is a function that is automatically called by the Readable stream class to fetch data from the underlying source, such as a file or network connection.

How does readable._read() work?

When readable._read() is called, it starts fetching data from the source. If data is available, it uses the this.push() method to push the data into the read queue. The _read() function will then automatically be called again after each this.push() call to fetch more data.

Advisory size parameter

The size parameter is a suggestion for how much data to fetch. Implementations may ignore this parameter and simply fetch data whenever it becomes available. There's no need to wait for size bytes before calling this.push().

Example

Here's an example of a custom Readable stream implementation that reads data from a file:

const { Readable } = require("stream");

class MyReadable extends Readable {
  _read(size) {
    // Fetch data from the file...
    const data = readFileSync("some-file.txt");

    // Push the data into the read queue
    this.push(data);

    // Tell the stream that we're done reading
    this.push(null);
  }
}

// Create an instance of the custom stream class and pipe it to a destination
const myReadable = new MyReadable();
myReadable.pipe(process.stdout);

Applications in the real world

Readable streams are used in many applications, including:

  • Reading files, database records, and network data

  • Parsing and processing CSV and JSON files

  • Creating pipelines of data transformations, such as filtering and sorting

  • Implementing web servers and APIs to transmit data over the network

By understanding how readable._read() works, you can create custom stream implementations to handle any type of data and perform any type of processing.


_destroy() Method in Node.js Streams

Simplified Explanation:

_destroy() is a special method used to clean up and close a readable stream when it's no longer needed.

Detailed Explanation:

  • Called by readable.destroy(): _destroy() is only called when you call stream.destroy().

  • Must not be called directly: You shouldn't call _destroy() yourself. Instead, use stream.destroy() to trigger it.

  • Can be overridden: Child classes can override _destroy() to perform additional cleanup tasks when the stream closes.

Code Snippet:

// Example of overriding _destroy()

class MyReadableStream extends stream.Readable {
  _destroy(err, callback) {
    // Perform additional cleanup
    super._destroy(err, callback);
  }
}

Real-World Implementation:

Example: Let's say you're reading data from a file and want to close the file properly when the stream is finished.

const fs = require("fs");

const readableStream = fs.createReadStream("file.txt");

readableStream.on("end", () => {
  readableStream.destroy();
});

readableStream.on("close", () => {
  console.log("File closed successfully.");
});

Potential Applications:

  • Closing files and other resources when streams are no longer needed

  • Implementing custom cleanup logic for specialized streams

  • Ensuring that streams are properly closed to prevent data leaks or errors


readable.push(chunk, encoding)

Summary: The readable.push() method in Node.js allows you to push data into a readable stream.

Parameters:

  • chunk: The data to push into the stream. It can be a Buffer, Uint8Array, string, null, or any other value (for object mode streams).

  • encoding (optional): The encoding of the chunk if it's a string. Default is 'utf8'.

Returns:

  • A boolean indicating whether more chunks can be pushed or not.

Explanation:

  • When you push data into a stream, it gets stored in an internal queue.

  • If the stream is in paused mode, you can read the data later by calling readable.read() when the 'readable' event is emitted.

  • If the stream is in flowing mode, the data will be automatically emitted as a 'data' event.

Real-World Example:

Let's say you have a file stream that you want to read and push into a readable stream:

const fs = require("fs");
const { Readable } = require("stream");

const stream = new Readable();

fs.createReadStream("./file.txt")
  .on("data", (chunk) => {
    stream.push(chunk);
  })
  .on("end", () => {
    stream.push(null); // Signal the end of the stream
  });

In this example, we create a readable stream and pipe the data from a file into it using fs.createReadStream(). The readable.push() method is used to push the file data into the stream.

Applications:

  • Data processing: You can use readable.push() to push data from a source into a stream for further processing.

  • Data buffering: If you want to buffer data before processing or emitting it, you can use readable.push() to store it in the stream's internal queue.

  • Object mode streams: For object mode streams, you can use readable.push() to push any type of data, not just strings or buffers. This is useful for passing complex objects through streams.


Errors while reading

When a readable stream encounters an error during processing, it must use the readable.destroy(err) method to propagate the error. Throwing an error or manually emitting an 'error' event instead can lead to unpredictable behavior.

Simplified explanation:

Imagine a water pipe that's supposed to deliver water to your house. If a blockage occurs in the pipe, the pipe shouldn't burst or leak. Instead, it should shut off the water flow and notify you of the problem.

Similarly, a readable stream should handle errors by "shutting off" the stream and sending an error message through the readable.destroy(err) method. This allows the code that's using the stream to be notified of the error and take appropriate action.

Code snippet:

const { Readable } = require("node:stream");

const myReadable = new Readable({
  read(size) {
    // Check for an error condition
    if (someErrorCondition) {
      // If an error occurs, destroy the stream with the error message
      this.destroy(new Error("An error occurred while processing the data"));
    } else {
      // If there's no error, continue processing the data
      // ...
    }
  },
});

Real-world applications:

  • Error handling in data processing pipelines: In data pipelines where multiple streams are used to process data, errors can occur at any stage. Using readable.destroy(err) ensures that errors are propagated properly and the entire pipeline can be shut down gracefully.

  • Fault tolerance in web servers: Web servers often use readable streams to handle incoming HTTP requests. If an error occurs while processing a request, the stream can be destroyed to prevent the server from crashing.

  • Error reporting in logging systems: Logging systems often use readable streams to process and store log messages. By destroying streams with error messages, logs can be generated even when errors occur during logging.


Simplified Explanation of Node.js Stream Module

Streams

A stream is a sequence of data that can be read or written in a continuous flow. In Node.js, streams are implemented using the stream module.

Readable Streams

Readable streams are streams that emit data. They are used to read data from a source, such as a file or a network connection. To create a readable stream, you can use the Readable class.

Writable Streams

Writable streams are streams that receive data. They are used to write data to a destination, such as a file or a network connection. To create a writable stream, you can use the Writable class.

Duplex Streams

Duplex streams are streams that can both read and write data. They are used for situations where you need to read and write data to the same source or destination. To create a duplex stream, you can use the Duplex class.

Transform Streams

Transform streams are streams that can read data, transform it, and then write it to a destination. They are used for situations where you need to modify data as it is being streamed. To create a transform stream, you can use the Transform class.

Example: Counting Stream

The following is a simple example of a readable stream that emits the numbers from 1 to 1,000,000 in ascending order:

const { Readable } = require('stream');

class Counter extends Readable {
  constructor() {
    super();
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    if (this._index > this._max) {
      this.push(null); // End the stream
    } else {
      const str = String(this._index++);
      const buf = Buffer.from(str);
      this.push(buf);
    }
  }
}

const counter = new Counter();

counter.on('data', (data) => {
  console.log(data.toString());
});

counter.on('end', () => {
  console.log('Finished counting.');
});

Real-World Applications

Streams are used in a wide variety of applications, including:

  • File I/O

  • Network I/O

  • Data processing

  • Real-time data streaming


Duplex Streams

What are Duplex Streams?

Duplex streams allow data to flow in both directions, like a two-way street. Imagine a chat application where you can send and receive messages with another person.

How to Create a Duplex Stream

To create a duplex stream, you need to use the Duplex class:

const { Duplex } = require('stream');

const duplexStream = new Duplex();

Implementing Duplex Streams

You need to implement two methods:

  1. _read(): This method is called when data is flowing from the source (e.g., the keyboard) to the stream.

  2. _write(): This method is called when data is flowing from the stream to the destination (e.g., the screen).

Example:

class ChatStream extends Duplex {
  _read() {
    // Read data from the keyboard and pass it to the stream.
  }

  _write(data, encoding, callback) {
    // Write data to the screen.
    callback();  // Call this when done writing.
  }
}

const chatStream = new ChatStream();

Real-World Applications:

Duplex streams are used in various applications, including:

  • Chat Applications: For sending and receiving messages between users.

  • WebSockets: For establishing bidirectional communication between a web client and server.

  • Database Connections: For reading and writing data to a database.

Conclusion:

Duplex streams allow for flexible and efficient data handling in applications where data flows in both directions.


Duplex Streams

A Duplex stream is a stream that can both read and write data. It's like a pipe that connects two streams together.

Creating a Duplex Stream

To create a Duplex stream, you can use the Duplex class. The Duplex class takes an options object as an argument. The options object can contain the following properties:

  • allowHalfOpen: If set to false, then the stream will automatically end the writable side when the readable side ends. Default: true.

  • readable: Sets whether the Duplex should be readable. Default: true.

  • writable: Sets whether the Duplex should be writable. Default: true.

  • readableObjectMode: Sets objectMode for readable side of the stream. Has no effect if objectMode is true. Default: false.

  • writableObjectMode: Sets objectMode for writable side of the stream. Has no effect if objectMode is true. Default: false.

  • readableHighWaterMark: Sets highWaterMark for the readable side of the stream. Has no effect if highWaterMark is provided.

  • writableHighWaterMark: Sets highWaterMark for the writable side of the stream. Has no effect if highWaterMark is provided.

Consuming a Duplex Stream

To consume a Duplex stream, you can use the pipe() method. The pipe() method connects the readable side of one stream to the writable side of another stream.

For example, the following code creates a Duplex stream and then pipes it to a Writable stream:

const { Duplex, Writable } = require('stream');

const duplex = new Duplex();
const writable = new Writable();

duplex.pipe(writable);

Writing to a Duplex Stream

To write data to a Duplex stream, you can use the write() method. The write() method takes a chunk of data as an argument.

For example, the following code writes data to a Duplex stream:

duplex.write('Hello, world!');

Reading from a Duplex Stream

To read data from a Duplex stream, you can use the read() method. The read() method takes a size as an argument. The size is the number of bytes that you want to read.

For example, the following code reads data from a Duplex stream:

const data = duplex.read(100);

Real-World Applications

Duplex streams can be used in a variety of real-world applications, such as:

  • Data compression: A Duplex stream can be used to compress data as it is being written.

  • Data encryption: A Duplex stream can be used to encrypt data as it is being written.

  • Data transformation: A Duplex stream can be used to transform data as it is being written.

  • Network communication: A Duplex stream can be used to communicate data over a network.


Simplified Explanation of Duplex Streams

What is a Duplex Stream?

Imagine a pipe with two ends: one end for pouring water in, and the other end for pouring water out. A "Duplex" stream is like that pipe, but instead of water, it's a way to transfer data in both directions.

How it Works:

A Duplex stream acts as both a "Writable" (like a faucet) and a "Readable" (like a tap). You can write data into the stream like you're filling a cup, and you can read data out of it like you're pouring water from a tap.

Example:

Let's say you have a file on your computer that contains some data. You can create a Duplex stream that lets you both read the contents of the file and write new data to it.

const fs = require("fs");
const { Duplex } = require("stream");

const fileStream = new Duplex();

fileStream.on("data", (chunk) => {
  // This will be called whenever data is read from the file.
  console.log(`Read data: ${chunk}`);
});

fileStream.write("Hello, world!\n");
fileStream.end(); // Signal that we're done writing.

// When the file is fully written, 'finish' will be emitted.
fileStream.on("finish", () => {
  console.log("File written successfully.");
});

Real-World Applications:

Duplex streams are used in various applications, including:

  • Networking: For sending and receiving data over a network.

  • File manipulation: For reading and writing files.

  • Data transformation: For modifying data as it flows through a stream.

Note: The above example uses the core Node.js Duplex stream class. However, there are many third-party libraries that provide more advanced Duplex stream implementations with additional features like compression or encryption.


Object Mode Duplex Streams

In Node.js, streams are used to handle data in a sequential manner. They can be used for reading, writing, or transforming data.

Duplex streams are streams that can both read and write data. They are commonly used in situations where you need to transform data in some way, such as converting text to JSON or encrypting data.

Readable and Writable Modes

Duplex streams have two modes: readable and writable. The readable mode allows you to read data from the stream, while the writable mode allows you to write data to the stream.

By default, streams are created in "byte mode," meaning that they handle data as a sequence of bytes. However, you can also set streams to "object mode," which allows them to handle data as objects.

Object Mode

When a stream is in object mode, it can handle data as objects rather than as bytes. This can be useful when you are working with data that is already in object form, such as JSON or XML.

By setting the objectMode option to true when creating a stream, you can enable object mode for either the readable or writable side of the stream.

Example

The code below provides an example of a Transform stream (a type of Duplex stream) that converts numbers written to the writable side of the stream into hexadecimal strings and reads from the readable side.

const { Transform } = require("stream");

// Create a Transform stream
const myTransform = new Transform({
  // Set the writable side to object mode
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Convert the chunk to a number
    const number = parseInt(chunk);

    // Convert the number to a hexadecimal string
    const hexString = number.toString(16);

    // Push the hex string to the readable side
    callback(null, hexString);
  },
});

// Write a number to the writable side
myTransform.write(100);

// Read the hex string from the readable side
myTransform.on("data", (chunk) => {
  console.log(chunk);
});

Real-World Applications

Object mode streams can be used in a variety of real-world applications, such as:

  • Converting data between different formats (e.g., JSON to XML)

  • Encrypting or decrypting data

  • Validating data

  • Parsing data into objects


What is a Transform Stream?

In Node.js, data flows through streams. A transform stream is a type of stream that can change the data as it flows through. This allows you to perform operations on the data, such as encryption or compression, before it reaches its destination.

How to Create a Transform Stream

To create a transform stream, you can use the stream.Transform class. This class inherits from the stream.Duplex class, which means that it can both read and write data.

The following code shows how to create a simple transform stream that converts all incoming data to uppercase:

const { Transform } = require("stream");

const uppercaseStream = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString().toUpperCase());
    callback();
  },
});

Using Transform Streams

To use a transform stream, you can pipe it into another stream. For example, the following code pipes the uppercase stream into a write stream that writes the data to a file:

const fs = require("fs");

uppercaseStream.pipe(fs.createWriteStream("output.txt")).on("finish", () => {
  console.log("Data written to file");
});

Real World Applications

Transform streams have many potential applications, such as:

  • Encryption: Encrypting data before it is sent over a network.

  • Compression: Compressing data to reduce its size.

  • Data filtering: Removing or modifying specific data from a stream.

  • Data transformation: Changing the format or structure of data.

Additional Methods

In addition to the transform method, transform streams can also implement the _flush method. This method is called when the input stream has ended and all the data has been processed. It allows you to perform any final cleanup operations.

Example:

The following code shows how to use the _flush method to write a final message to the output stream:

const { Transform } = require("stream");

const messageStream = new Transform({
  transform(chunk, encoding, callback) {
    this.push(chunk.toString());
    callback();
  },

  _flush(callback) {
    this.push("All data processed");
    callback();
  },
});

new stream.Transform([options])

The Transform class is a duplex stream that allows you to modify the data as it flows through the stream.

Basic Usage

To create a transform stream, you need to pass a transform function to the constructor. The transform function will be called for each chunk of data that flows through the stream. The function takes three arguments:

  1. chunk: The chunk of data that is being processed.

  2. encoding: The encoding of the chunk of data.

  3. callback: A callback function that you must call when you are finished processing the chunk of data.

The following code shows how to create a simple transform stream that converts all of the data to uppercase:

const { Transform } = require("stream");

const transform = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, chunk.toString().toUpperCase());
  },
});

transform.on("data", (chunk) => {
  console.log(chunk.toString());
});

transform.write("hello");
transform.write("world");
transform.end();

Output:

HELLO
WORLD

Options

The Transform constructor takes an optional options object. The following options are supported:

  • highWaterMark: The high water mark for the stream. The high water mark is the maximum amount of data that can be buffered in the stream before the stream starts to emit the 'drain' event.

  • encoding: The default encoding for the stream.

  • objectMode: If set to true, the stream will operate in object mode. In object mode, the chunk argument to the transform function will be an object instead of a buffer.

  • transform: The transform function.

  • flush: A function that is called when the stream is finished writing data. The flush function takes one argument, which is a callback function. The callback function must be called when the flush operation is complete.

Real-World Applications

Transform streams can be used for a variety of real-world applications, such as:

  • Data filtering

  • Data encryption

  • Data compression

  • Data transformation

For example, you could use a transform stream to filter out all of the vowels from a string, or to encrypt all of the data that flows through the stream.


Event: 'end'

The 'end' event is emitted by a stream when all data has been output. This happens after the callback in the transform._flush() function has been called.

Real-World Example:

Imagine you are reading a file from disk and creating a stream of data from it. When the entire file has been read, the 'end' event will be emitted, signaling that there is no more data to read.

Potential Applications:

  • Closing the stream and releasing resources.

  • Triggering the next step in a data processing pipeline.

  • Displaying a message to the user indicating that the data is finished loading.

Code Example:

const fs = require("fs");
const stream = fs.createReadStream("file.txt");

stream.on("end", () => {
  console.log("All data has been read from the file.");
});

Event: 'finish'

Simplified Explanation:

When you're writing data to a file or something similar, this event is triggered when all the data has been successfully written and the stream is finished.

Detailed Explanation:

  • stream.Writable: This is a special kind of stream that allows data to be written to it, like a file or a network socket.

  • stream.end(): This function tells the stream that you're done writing data and it should finish up.

  • stream._transform(): This is an internal function that processes the data being written to the stream.

  • 'finish' event: This event is triggered after stream.end() is called and all the data has been processed.

Real-World Example:

Suppose you're writing a program that saves data to a file. You would write something like this:

const fs = require('fs');
const writeStream = fs.createWriteStream('data.txt');

// Write some data to the file
writeStream.write('Hello, world!');

// When done writing, finish the stream
writeStream.end();

// The 'finish' event will be triggered when the file is saved
writeStream.on('finish', () => {
  console.log('File saved!');
});

Potential Applications:

  • Logging: Writing error messages or activity logs to a file.

  • Backups: Regularly saving data to a backup location.

  • Data transfer: Sending large files over a network and notifying the receiver when the transfer is complete.


transform._flush(callback)

When you're using a transform operation, you may need to send out one last bit of data at the end of the stream. For instance, when using zlib compression, it stores some internal state to compress the output. But when the stream ends, you need to flush that extra data so the compressed data can be complete.

Custom [Transform][] implementations can include the transform._flush() method. It's called when there's no more data to be consumed, but before the ['end'][] event is sent out signaling the end of the [Readable][] stream.

Inside the transform._flush() method, the transform.push() method can be called any number of times, if needed. The callback function must be called when the flush operation is done.

Here's an example of how you might use transform._flush() in a custom transform stream:

// Assuming this is defined somewhere else
const { Transform } = require('stream');

class MyTransform extends Transform {
  _flush(callback) {
    // Send out any extra data here, if needed
    this.push('extra data');
    callback();
  }
}

const myTransform = new MyTransform();
myTransform.on('data', (data) => {
  console.log(data.toString()); // Prints 'extra data'
});
myTransform.end();

Potential applications

transform._flush() can be used in any situation where you need to send out one last bit of data at the end of a stream. For example, it could be used to:

  • Add a footer to a file

  • Send a summary of the data that was processed

  • Flush any remaining data from a buffer

Simplified explanation

Imagine you're making a sandwich. You have all the ingredients, and you've put them together. But before you can eat it, you need to put the last slice of bread on top. That's what transform._flush() does. It puts the finishing touch on your stream of data.


_transform(chunk, encoding, callback)

The _transform method is a function that must be implemented by all Transform streams. It is called by the Readable class when there is data to be processed. The chunk parameter is the data to be processed, the encoding parameter is the encoding of the data, and the callback parameter is a function that must be called when the data has been processed.

How it works

The _transform method first checks if the chunk parameter is a string or a buffer. If it is a string, the method converts it to a buffer using the encoding parameter. Once the chunk parameter is a buffer, the method processes it and produces an output. The output can be any type of data, but it must be a buffer or a string.

Once the output has been produced, the method calls the callback parameter with the output as the first argument. The callback parameter can also be called with an error as the first argument if an error occurred while processing the data.

Example

The following code shows an example of a _transform method:

const { Transform } = require("stream");

class MyTransform extends Transform {
  _transform(chunk, encoding, callback) {
    // Process the data
    const output = chunk.toString().toUpperCase();

    // Call the callback with the output
    callback(null, output);
  }
}

This _transform method converts the input data to uppercase and then calls the callback parameter with the uppercase data as the first argument.

Real-world applications

The _transform method can be used for a variety of purposes, such as:

  • Converting data from one format to another

  • Filtering data

  • Compressing data

  • Encrypting data

Potential applications

Here are some potential applications of the _transform method:

  • A web server that converts HTTP requests to uppercase before processing them

  • A data pipeline that filters out unwanted data

  • A compression algorithm that compresses data before sending it over a network

  • An encryption algorithm that encrypts data before storing it in a database

Simplified explanation

The _transform method is a function that is called by a stream when there is data to be processed. The method processes the data and produces an output. The output can be any type of data, but it must be a buffer or a string. The method then calls the callback parameter with the output as the first argument. The callback parameter can also be called with an error as the first argument if an error occurred while processing the data.


Class: stream.PassThrough

The stream.PassThrough class is a simple stream that just passes the data it receives to the next stream in the pipeline. It's like a pipe that lets data flow through it without changing it.

Example:

const { PassThrough } = require("stream");

// Create a PassThrough stream
const passThroughStream = new PassThrough();

// Pipe some data into the stream
passThroughStream.write("Hello world!");

// Listen for data events on the stream
passThroughStream.on("data", (chunk) => {
  // The data that was piped in is now available in the chunk
  console.log(chunk.toString()); // Prints: Hello world!
});

// End the stream to indicate that no more data will be sent
passThroughStream.end();

Potential Applications:

  • Data filtering: You can use a PassThrough stream to filter the data passing through it. For example, you could create a stream that only allows data that matches a certain pattern to pass through.

  • Data transformation: You can use a PassThrough stream to transform the data passing through it. For example, you could create a stream that converts all uppercase letters to lowercase.

  • Buffering: You can use a PassThrough stream to buffer data before sending it to the next stream in the pipeline. This can be useful if the next stream is slow or if you want to ensure that all of the data is available before processing it.

Real-World Example:

Here's a real-world example of how you could use a PassThrough stream to filter data:

const filterStream = new PassThrough({
  // Only allow data that matches the pattern "foo" to pass through
  filter: (chunk) => chunk.toString().includes("foo"),
});

// Pipe some data into the filter stream
filterStream.write("Hello foo!");

// Listen for data events on the stream
filterStream.on("data", (chunk) => {
  console.log(chunk.toString()); // Prints: Hello foo!
});

// End the stream to indicate that no more data will be sent
filterStream.end();

Introduction to Node.js Streams

Concept: Streams in Node.js provide a mechanism for handling data in a continuous, flowing manner, similar to a pipe where data flows from one end to another.

Types of Streams

1. Readable Streams:

  • Purpose: Emit chunks of data that can be read from a source.

  • Example: A file reader or an HTTP response.

  • Code Snippet:

// Create a readable stream from a file
const fs = require("fs");
const readableStream = fs.createReadStream("myfile.txt");

2. Writable Streams:

  • Purpose: Accept chunks of data and write them to a destination.

  • Example: A file writer or an HTTP request.

  • Code Snippet:

// Create a writable stream to a file
const fs = require("fs");
const writableStream = fs.createWriteStream("myfile.txt");

3. Duplex Streams:

  • Purpose: Can both read and write data, combining the functionality of Readable and Writable streams.

  • Example: A network socket or a file that can be read and updated.

  • Code Snippet:

// Create a duplex stream for a network socket
const net = require("net");
const duplexStream = net.connect(3000, "localhost");

4. Transform Streams:

  • Purpose: Process chunks of data as they flow through and emit modified chunks downstream.

  • Example: A filter that removes specific characters from a data stream.

  • Code Snippet:

// Create a transform stream to remove spaces from a data stream
const { Transform } = require("stream");
class RemoveSpacesTransform extends Transform {
  _transform(chunk, encoding, callback) {
    callback(null, chunk.toString().replace(/\s/g, ""));
  }
}

Stream Events

Streams emit various events to inform about their state:

  • 'data': Emitted when a chunk of data is available in a Readable stream.

  • 'end': Emitted when the Readable stream has finished emitting data.

  • 'error': Emitted if an error occurs in a stream.

  • 'finish': Emitted when the Writable stream has successfully written all data.

Stream Piping

Piping allows you to connect streams in a sequence so that the output of one stream is fed as the input to the next stream.

Example: Pipe a file reader to a file writer:

const fs = require("fs");
fs.createReadStream("myfile.txt").pipe(fs.createWriteStream("myfile_copy.txt"));

Real-World Applications

Streams have numerous applications, including:

  • File I/O (reading, writing)

  • Audio and video streaming

  • Data processing (filtering, transforming)

  • HTTP request and response handling

  • Logging and debugging


What are Async Generators and Async Iterators?

Imagine you have a machine that can make delicious food, like a pizza oven. A regular generator is like a pizza maker that's not very efficient. You have to tell it how to make each ingredient separately, and then wait for it to finish before moving on.

An async generator, on the other hand, is like a pizza oven that's super efficient. You can tell it to make all the ingredients at once, and it will let you know when each ingredient is ready. This way, you don't have to wait for each ingredient to finish before moving on, and you can make pizza much faster.

Converting Streams to Async Generators

Streams are a way to read or write data in chunks. You can convert a stream to an async generator by using the stream.readable property.

const fs = require('fs');
const stream = fs.createReadStream('my-file.txt');
const asyncGenerator = stream.readable;

Now, you can iterate over the async generator to get each chunk of data as it becomes available.

for await (const chunk of asyncGenerator) {
  console.log(chunk.toString());
}

Converting Async Generators to Streams

You can also convert an async generator to a stream by using the stream.Writable class.

const asyncGenerator = (...);
const stream = new stream.Writable({
  write(chunk, encoding, next) {
    // Do something with the chunk
    next();
  }
});

Now, you can pipe data into the stream, and the async generator will process it.

asyncGenerator.pipe(stream);

Real-World Applications

  • Streaming large files: Async generators can be used to stream large files in a more efficient way.

  • Processing data in parallel: Async generators can be used to process data in parallel, which can improve performance for certain tasks.

  • Creating custom streams: You can create your own custom streams by using async generators and streams.Writable.

Potential Applications

  • File upload: You can use async generators to stream a file upload to a server.

  • Real-time data processing: You can use async generators to process data in real time, such as streaming data from a sensor.

  • Custom data pipelines: You can create your own custom data pipelines using async generators and streams.


Consuming Readable Streams with Async Iterators

Async iterators provide a convenient and efficient way to consume data from readable streams in Node.js. Here's how it works:

1. Async Iterators

Async iterators are a special type of iterator that can be used to iterate over asynchronous data sources, such as streams. They allow you to write code that resembles synchronous loops, but is actually asynchronous:

for await (const chunk of readable) {
  console.log(chunk);
}

In this example, the for await...of loop will iterate over the chunks of data emitted by the readable stream. The chunk variable will hold the current chunk of data.

2. Error Handling

Async iterators automatically register a permanent error handler on the stream. This ensures that any unhandled post-destroy errors are caught and handled appropriately, preventing the application from crashing.

Real-World Applications

Async iterators can be used in a variety of real-world scenarios, including:

  • File processing: Iterate over the lines of a large text file or a sequence of JSON objects.

  • Data streaming: Process data as it is streamed from a server or external source.

  • Queue management: Iterate over items in a queue or buffer.

Improved Code Example

Consider the following complete code example that demonstrates how to use async iterators to consume data from a readable stream:

const { Readable } = require("stream");

// Create a readable stream that emits numbers from 1 to 5
const readable = new Readable({
  read() {
    for (let i = 1; i <= 5; i++) {
      this.push(i);
    }
    this.push(null); // Indicates that the stream has finished
  },
});

// Consume the data from the stream using an async iterator
(async () => {
  for await (const chunk of readable) {
    console.log(chunk);
  }
})();

In this example, the Readable stream emits a sequence of numbers from 1 to 5. The async iterator loop iterates over these numbers and logs them to the console.

Conclusion

Async iterators provide a simple and efficient way to consume data from readable streams in Node.js, while also ensuring proper error handling. They are a valuable tool for a variety of data processing tasks.


Creating Readable Streams with Async Generators

What is a readable stream?

A readable stream is like a data pipe. It lets you read data from one end and send it to another end. You can think of it like a water hose. You can turn on the water to start the flow of data, and you can turn it off to stop the flow.

What is an async generator?

An async generator is a special type of function that can pause and resume its execution. This means you can write code that generates data over time, and the generator will pause when it needs to wait for something.

How to create a readable stream from an async generator:

You can use the Readable.from() utility method to create a readable stream from an async generator. This method takes an async generator as input and returns a readable stream.

Here's an example:

async function* generate() {
  yield "a";
  await someLongRunningFn();
  yield "b";
  yield "c";
}

const readable = Readable.from(generate());

This code creates a readable stream that will emit the values 'a', 'b', and 'c'. The someLongRunningFn() function is a long-running function that pauses the generator until it finishes.

How to use a readable stream:

Once you have a readable stream, you can use it to read data. You can listen for the 'data' event to get notified when new data is available.

Here's an example:

readable.on("data", (chunk) => {
  console.log(chunk);
});

This code prints the chunks of data to the console.

Real-world applications:

Readable streams are used in a variety of applications, including:

  • Streaming audio and video: Readable streams can be used to stream audio and video data from a server to a client.

  • File downloads: Readable streams can be used to download files from a server to a client.

  • Data processing: Readable streams can be used to process data in chunks.

Potential applications:

Here are some potential applications for creating readable streams from async generators:

  • Generate data in chunks: You can use an async generator to generate data in chunks. This can be useful for large datasets or for data that takes a long time to generate.

  • Pause and resume data generation: You can use an async generator to pause and resume data generation. This can be useful for situations where you need to wait for external events.

  • Cancel data generation: You can use an async generator to cancel data generation. This can be useful for situations where you need to stop the flow of data.


Piping to Writable Streams from Async Iterators

What is a Stream?

Think of a stream as a pipe that carries data from one place to another. In Node.js, streams are used to read and write data efficiently.

Writable Streams

A writable stream is like a sink that accepts data and writes it somewhere, like a file or a network socket.

Async Iterators

An async iterator is like a generator that produces data one item at a time, asynchronously.

Backpressure

When a writable stream is receiving data too fast, it can't keep up and starts to buffer the data. This is called backpressure.

stream.pipeline()

The stream.pipeline() function handles the backpressure for you. It connects an async iterator to a writable stream and automatically manages the flow of data between them.

Callback Pattern

In the callback pattern, you pass a function to the stream.pipeline() function. This function is called when the pipeline is complete or an error occurs.

pipeline(iterator, writable, (err, value) => {
  // Handle error or value
});

Promise Pattern

In the promise pattern, you use the stream.pipelinePromise() function instead. It returns a promise that resolves when the pipeline is complete or rejects if an error occurs.

pipelinePromise(iterator, writable)
  .then((value) => {
    // Handle value
  })
  .catch((err) => {
    // Handle error
  });

Real-World Example

You can use stream.pipeline() to write files from a database query:

const fs = require("fs");
const { pipeline } = require("node:stream");

const db = await connectToDatabase();
const query = await db.query("SELECT * FROM files");

pipeline(query, fs.createWriteStream("output.csv"), (err) => {
  if (err) {
    console.error(err);
  } else {
    console.log("File written successfully");
  }
});

Potential Applications

  • File processing: Read from a file, process each line, and write the output to another file.

  • Streaming data: Send data from one service to another over a network connection.

  • Batch processing: Process large datasets in chunks, writing the results to a database or file.


Node.js Stream Module Compatibility with Older Node.js Versions

Overview

In Node.js, streams are used to handle data in a continuous flow, like reading files, receiving data from the network, etc. Prior to Node.js 0.10, the Readable stream interface was different from the current version. This document explains the compatibility changes and provides workarounds for backward compatibility.

Changes in Node.js 0.10

In Node.js 0.10, the Readable class was introduced, bringing several improvements to the stream handling. Here's what changed:

  1. Event-based data consumption: In older Node.js versions, data events would start emitting immediately, requiring applications to store received data to avoid losing it. In Node.js 0.10, data consumption is controlled by the [stream.read()][stream-read] method, giving applications more control over data handling.

  2. Guaranteed stream pausing: The [stream.pause()][stream-pause] method was originally advisory, meaning that data events could still be emitted even when the stream was paused. In Node.js 0.10, this is guaranteed, ensuring that data is not emitted when the stream is paused.

Backward Compatibility for Older Code

To support older Node.js code, Readable streams switch into "flowing mode" when a ['data'][] event handler is added or when the [stream.resume()][stream-resume] method is called. This means that even if you're not using the new [stream.read()][stream-read] and ['readable'][] event methods, you can still safely handle data without worrying about losing it.

However, there's an important exception to this: If you have code that:

  • Does not have a ['data'][] event listener

  • Does not call [stream.resume()][stream-resume]

  • Is not piped to any writable destination

Then the stream will remain paused indefinitely, and data will never be processed.

Workaround

To fix this issue, add a [stream.resume()][stream-resume] call to your code. Here's an example of a corrected code:

// Original code (broken in Node.js 0.10+)
net
  .createServer((socket) => {
    // We add an 'end' listener, but never consume the data.
    socket.on("end", () => {
      // It will never get here.
      socket.end("The message was received but was not processed.");
    });
  })
  .listen(1337);

// Corrected code (works in Node.js 0.10+)
net
  .createServer((socket) => {
    socket.on("end", () => {
      socket.end("The message was received but was not processed.");
    });

    // Start the flow of data, discarding it.
    socket.resume();
  })
  .listen(1337);

Real-World Applications

Streams are widely used in Node.js applications, including:

  • Reading and writing files

  • Handling incoming and outgoing network connections

  • Processing data from databases

  • Building pipelines of data transformations


What is readable.read(0)?

Imagine a stream of water flowing through a pipe. The readable.read(0) function is like opening a faucet for a brief moment without actually letting any water out.

Why would I want to do this?

Sometimes, you need to "refresh" the stream without consuming any data. For example, if you're using a network stream and the data is arriving slowly, you might want to call readable.read(0) to check if there's new data without waiting for the full buffer to fill up.

How does it work?

When you call readable.read(0), it tells the stream that you want to read data, but you don't want to store it in the buffer. The stream then looks for any available data and returns null if there isn't any.

Code example:

const fs = require("fs");

const readableStream = fs.createReadStream("./my-file.txt");

// Check for new data every 100 milliseconds without consuming it
setInterval(() => {
  readableStream.read(0);
}, 100);

Real-world applications:

  • Continuous data monitoring: You can use readable.read(0) to monitor a stream of data without buffering it, which can be useful for applications that need to respond to changes in the data.

  • System alerts: You can set up a system to send alerts when a certain condition is met; for example, you can call readable.read(0) to check for new log entries and send an alert if there's a critical error message.

  • Resource optimization: By avoiding unnecessary buffering, readable.read(0) can help you improve the performance of your application, especially when dealing with large data streams.


readable.push('') should not be used

In Node.js, streams are used to handle data in a continuous way. When dealing with data streams, it's important to use the appropriate methods to add data to the stream.

One of the methods for adding data to a readable stream is readable.push(). However, it's not recommended to use readable.push('') to add an empty string to a stream that is not in object mode.

Why is that?

Pushing an empty string to a non-object mode stream will end the reading process, but no data will be added to the readable buffer. This means that there is nothing for the user to consume.

Object mode streams

In object mode streams, data is handled as JavaScript objects instead of buffers. When using object mode streams, you can push any JavaScript object to the stream. Empty strings are valid JavaScript objects, so pushing an empty string to an object mode stream is acceptable.

Non-object mode streams

In non-object mode streams, data is handled as buffers. When using non-object mode streams, you can only push buffers to the stream. Empty strings are not valid buffers, so pushing an empty string to a non-object mode stream is not recommended.

Real-world example using a readable stream

Here's a simple example of a readable stream that reads data from a file and outputs it to the console:

const fs = require("fs");
const stream = fs.createReadStream("file.txt");

stream.on("data", (data) => {
  console.log(data.toString());
});

stream.on("end", () => {
  console.log("Finished reading file");
});

Pushing an empty string to a non-object mode stream

If you try to push an empty string to the readable stream in the above example, you will get an error:

stream.push(""); // Error: Invalid non-string/buffer chunk

Pushing an empty string to an object mode stream

If you convert the readable stream to an object mode stream, you can push an empty string without getting an error:

stream.setEncoding("utf8"); // Convert stream to object mode
stream.push(""); // No error

Potential applications

Readable streams can be used in a variety of applications, such as:

  • File reading

  • Network communication

  • Data transformation

  • Data filtering


Stream Overview

Streams are a powerful way to handle data in Node.js. They allow you to process data incrementally, without having to load the entire dataset into memory.

There are three types of streams:

  • Readable streams emit data events, which you can listen to and process.

  • Writable streams allow you to write data to them.

  • Duplex streams can both read and write data.

High Water Mark Anomaly

When using readable streams, the highWaterMark option specifies the maximum amount of data that can be buffered in memory before the stream starts emitting pause events.

After calling the setEncoding() method on a readable stream, the highWaterMark will be measured in characters instead of bytes. This can lead to unexpected behavior, especially when working with multi-byte characters.

Real World Example

Consider a stream that reads a large text file and counts the number of words.

const fs = require("fs");
const Readable = require("stream").Readable;

const readableStream = new Readable();
readableStream.push(fs.readFileSync("large-text-file.txt"));
readableStream.push(null); // Signal end of stream

readableStream.on("data", (chunk) => {
  // Count the number of words in the chunk
  const words = chunk.toString().split(" ");
});

In this example, the highWaterMark option is not set. However, if you call setEncoding('utf8') on the stream, the highWaterMark will default to 16,384 characters. This means that the stream will buffer up to 16,384 characters of data before emitting pause events.

The implication of using setEncoding() is that the highWaterMark value should be set in terms of the encoding, not the byte count. This is why it's important to be mindful of this behavior when working with strings that could contain multi-byte characters.

Object Mode

In addition to the default buffer mode, streams also support object mode. In object mode, streams emit objects instead of buffers. This can be useful when working with complex data structures.

To use object mode, you must set the objectMode option to true when creating the stream.

const readableStream = new Readable({
  objectMode: true,
});

Real World Example

Consider a stream that reads JSON data from a file and parses it into objects.

const fs = require("fs");
const Readable = require("stream").Readable;

const readableStream = new Readable({
  objectMode: true,
});
readableStream.push(fs.readFileSync("json-data-file.json"));
readableStream.push(null); // Signal end of stream

readableStream.on("data", (object) => {
  // Process the object
});

In this example, the stream is in object mode. This means that it will emit objects instead of buffers. The data event listener will receive an object that contains the parsed JSON data.

Piping Streams

Streams can be piped together to create complex data processing pipelines. Piping a stream to another stream connects the output of the first stream to the input of the second stream.

To pipe streams, you can use the pipe() method.

readableStream.pipe(writableStream);

Real World Example

Consider a pipeline that reads data from a file, converts it to upper case, and writes it to a new file.

const fs = require("fs");
const Readable = require("stream").Readable;
const Transform = require("stream").Transform;
const Writable = require("stream").Writable;

const readableStream = new Readable();
readableStream.push(fs.readFileSync("input-file.txt"));
readableStream.push(null); // Signal end of stream

const transformStream = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, chunk.toString().toUpperCase());
  },
});

const writableStream = new Writable({
  write(chunk, encoding, callback) {
    fs.writeFileSync("output-file.txt", chunk);
    callback();
  },
});

readableStream.pipe(transformStream).pipe(writableStream);

In this example, the readableStream is piped to the transformStream, which converts the data to upper case. The transformStream is then piped to the writableStream, which writes the data to a file.

Applications

Streams are used in a wide variety of applications, including:

  • File handling

  • Network communication

  • Data processing

  • Logging

  • Testing