Passthrough streams

·

5 min read

Handling large files in Node.js can be challenging, especially when dealing with operations like compression and uploading to cloud storage. A typical scenario is when you need to zip multiple files which are stored on a cloud provider and upload the zipped file elsewhere. Downloading these files first, compressing them, and then uploading the compressed file can be extremely memory-inefficient, especially if the files are large. In this blog, we'll explore how to use passthrough streams in Node.js to handle this process efficiently, without the need to download the files to your server.

What are Streams?

Streams in Node.js are powerful abstractions for handling data flow. They enable processing data piece by piece, rather than loading entire datasets into memory. This approach is crucial for efficiently handling large files, network communications, or any scenario involving substantial data transfer. Streams support various data types, including text, JSON, and binary data (like images or video).

Node.js offers four fundamental types of streams, each serving a specific purpose in data flow management:

  1. Readable Streams: Sources of data that can be read from. Examples include reading from a file, incoming HTTP requests, or process.stdin.

  2. Writable Streams: Destinations for data. They allow writing data to a specific target, such as files, HTTP responses, or process.stdout.

  3. Duplex Streams: Combine both readable and writable capabilities. They can both read and write data independently, like network sockets.

  4. Transform Streams: A special type of duplex stream that modifies data as it's read and written. Useful for tasks like compression, encryption, or data parsing.

For a detailed introduction head over to the official documentation at https://nodejs.org/api/stream.html

How is Transform stream a variation of Duplex stream?

In a regular Duplex stream, the readable and writable sides operate independently. Imagine having separate readable and writable stream, but linked with a single object and having shared data access. Whereas in a Transform stream, data written to the writable side is modified and then made available on the readable side. When you write to a Transform stream, it automatically calls the _transform() method. It simplifies the process of modifying data as it passes through.

In a Transform stream, the _transform method effectively combines the functionality of both _write and _read:

  • It receives input data (like _write)

  • It processes that data

  • It makes the processed data available for reading (like _read) by calling this.push()

The Transform stream automatically pipes the data from its writable side to its readable side through the _transform method.

const { Transform } = require('stream');

class UppercaseTransform extends Transform {
    // notice how _read and _write methods are not defined
    _transform(chunk, encoding, callback) {
        try {
            const upperCaseChunk = chunk.toString().toUpperCase();
            this.push(upperCaseChunk);
            callback();
        } catch (err) {
            callback(err);
        }
    }
}

// Usage
const upperCaseStream = new UppercaseTransform();
process.stdin.pipe(upperCaseStream).pipe(process.stdout);

Passthrough Streams

Passthrough streams are a type of transform stream that allows data to pass through unchanged. They are useful when you want to pipe data through multiple operations without modifying it. This sounds trivial, why do we need a separate stream type for this? why can't transform stream receive the data and push the data unchanged? PassThrough is just a ready-made implementation. You don't have to create your own class each time you need this functionality.

Efficiently archiving large files

For the compression logic, we are relying on the https://www.npmjs.com/package/archiver package. The utility module takes in a list of input file links, generates the archived file and outputs the result to the target location.

const archiver = require('archiver');
const { PassThrough } = require('stream');
const { basename } = require('path');

/**
 * This utility can be used to generate a zip file out of a list of cloud storage files.
 *
 * @param {object} params
 * @param {Array<LinkObject>} params.sourceLinks 
 * @param {LinkObject} params.targetLink
 * @param {CloudStorageSDK} cloudSDK - An instance of the cloud storage SDK.
 * 
 * @returns {Promise<void>}
 */
const ZipGenerator = async function (params, cloudSDK) {
  const { sourceLinks, targetLink } = params;

  const archiveStream = archiver('zip');
  archiveStream.on('error', (error) => {
    throw error;
  });

  const passthrough = new PassThrough();

  const { bucket, key } = targetLink;
  const putParams = {
    Bucket: bucket,
    Key: key,
    Body: passthrough,
    ContentType: 'application/zip'
  };

  const uploadPromise = cloudSDK.uploadObject(putParams);

  archiveStream.pipe(passthrough);

  try {
    await Promise.all(sourceLinks.map(async (source) => {
      const s3Params = {
        Bucket: source.bucket,
        Key: source.key
      };
      try {
        const objectStream = await cloudSDK.getObject(s3Params);
        archiveStream.append(objectStream, { name: basename(source.key) });
      } catch (err) {
        throw err;
      }
    }));

    await archiveStream.finalize();
    await uploadPromise;
  } catch (err) {
    throw err;
  }
};

module.exports = ZipGenerator;

Step-by-Step Implementation

  1. Input Files - Each input file is fetched as a readable stream from its respective location in cloud storage. For example, if using AWS S3, the getObjectCommand is used to fetch a file as a stream. These streams are then provided to the archiver, which reads their contents for compression.

  2. Output File Object - The final output file does not exist initially as a physical file. Instead, we create a stream that acts as a placeholder for the file. This stream will receive the compressed data as it is being created. This stream is a PassThrough stream, which allows the data to be piped through various processing steps without being buffered fully in memory. This ensures that the process remains memory efficient, even for large files.

  3. Archiver Instance - The archiver instance requires a writable stream where it can send the compressed data. This writable stream is provided by the PassThrough stream mentioned above. The archiver reads the data from the input files, compresses it, and writes the compressed data to the PassThrough stream. The PassThrough stream is then piped directly to the cloud storage service (e.g., S3), where it is uploaded as a file.

  4. Appending Input File Streams to the Archive - Each input file stream is appended to the archiveStream instance. The archiveStream is a transform stream that compresses the input data. As each file is appended, the archiver handles the transformation (compression), and the transformed (compressed) data is then written to the output stream (the PassThrough stream).

  5. Transformation and Output Stream - The archiveStream is a transform stream, meaning it has both a readable and a writable side. The input file streams are written to the writable side of the archiveStream. The transformation (compression) occurs within the archiveStream, and the resulting compressed chunks are made available on the readable side of the stream. These compressed chunks are then piped to the PassThrough stream, which ultimately sends the data to the cloud storage service for final storage.