Terminate busboy from reading more request with unpipe() and work queue

NAVIGATION

Wrap in common error handling code

Prevent more data being fed to busboy with unpipe()

Prevent already piped data in busboy's internal buffer from firing more events

Busboy is the go-to library for efficiently reading a multipart/form-data file upload request. Instead of writing intermediate files on disk or buffering them in memory, busboy uses streams to give you access to the file data as it arrives. You can then use the streams to write a resilient app that off-loads the files from Node.js server to external file storage server like AWS S3.

It's easy to setup busboy and start reading the incoming request. But, it's a little trickier to terminate busboy and prevent reading the request any further. You might need to prematurely terminate the processing in case an internal error happens in your app or you notice the received data is invalid. Then you'd want to send the HTTP response as soon as possible and not waste any resources processing the request further.

You'd setup busboy with code like

const Busboy = require("busboy");
const express = require("express");

const router = new express.Router();

router.post("/", (req, res, next) => {
  const busboy = new Busboy({ headers: req.headers });

  busboy.on("field", (name, value) => {
    // process fields
  });
  busboy.on("file", (name, stream, filename, encoding, contentType) => {
    // process files
  });
  busboy.on("finish", () => {
    // send response
  });

  req.pipe(busboy);
});

To cancel the processing, you need to do two steps. You need to prevent more data being fed from request to busboy by calling unpipe(). And, you need to prevent any more events firing based on already piped data stored in busboy's internal buffer. You can do that by routing all handlers through a work queue that you can pause.

Wrap in common error handling code

First we need a place to put our error handling code. We'll wrap the 'field', 'file', and 'finish' event handlers with a try-catch logic placed in an error handling function. In case of an error, we'll delegate the error to Express.js's built-in error handler by calling next(e). We'll get code like this:

  async function handleError(fn) {
    try {
      await fn();
    } catch (e) {
      next(e);
    }
  }

  busboy.on("field", (name, value) => {
    handleError(() => {
      // process fields
    });
  });
  busboy.on("file", (name, stream, filename, encoding, contentType) => {
    handleError(() => {
      // process files
    });
  });
  busboy.on("finish", () => {
    handleError(() => {
      // send response
    });
  });

Prevent more data being fed to busboy with unpipe()

In case an error happens, we'll stop feeding any more of the incoming request to busboy by calling unpipe() on the previously pipe()'d request object.

  async function handleError(fn) {
    try {
      await fn();
    } catch (e) {
      req.unpipe(busboy);
      next(e);
    }
  }

Prevent already piped data in busboy's internal buffer from firing more events

Now busboy won't receive any more pieces of the incoming request. But, as streams work with chunks of 16 kB data, it may be that the already piped data stored in busboy's internal buffer contains unprocessed fields and files. This is especially the case if the uploaded files are small.

We need to prevent the previously piped data from causing any more 'field' or 'file' event handlers to be fired. We'll do this by routing work done by the event handlers through a work queue. In the case of an error, we can then stop the work queue from processing more work and thus stop from events being processed.

We'll use the p-queue package as the work queue implementation. Setting { concurrency: 1 } ensures only one event is processed at a time, giving us the result we want.

const Busboy = require("busboy");
const express = require("express");
const PQueue = require("p-queue");

const router = new express.Router();

router.post("/", (req, res, next) => {
  const busboy = new Busboy({ headers: req.headers });
  const workQueue = new PQueue({ concurrency: 1 });

  async function handleError(fn) {
    workQueue.add(async () => {
      try {
        await fn();
      } catch (e) {
        req.unpipe(busboy);
        workQueue.pause();
        next(e);
      }
    });
  }

  busboy.on("field", (name, value) => {
    handleError(() => {
      // process fields
    });
  });
  busboy.on("file", (name, stream, filename, encoding, contentType) => {
    handleError(() => {
      // process files
    });
  });
  busboy.on("finish", () => {
    handleError(() => {
      // send response
    });
  });

  req.pipe(busboy);
});

In case an error happens, we'll pause() the work queue from processing any more already queued events or unprocessed events lurking in the internal buffer.

And this is how we can prematurely terminate busboy from processing any more of the incoming request and respond swiftly to the caller.

Related articles

Semantic Versioning Cheatsheet

Semantic Versioning Cheatsheet

Learn the difference between caret (^) and tilde (~) in package.json.

Get Cheatsheet

Loading Comments