웹사이트 검색

Node.js 및 BullMQ로 비동기 작업을 처리하는 방법


저자는 Write for DOnations 프로그램을 선택했습니다.

소개

웹 애플리케이션에는 작업이 완료될 때까지 Node.js 스레드가 있습니다. 몇 초 또는 몇 분이 걸릴 수 있습니다. 사용자는 서버에서 응답을 받으려면 작업이 완료될 때까지 기다려야 합니다.

요청/응답 주기를 늦추지 않으려면 bullmq 작업자를 사용한 다음 대기열에서 각 작업을 제거하고 실행하여 완료되면 완료된 것으로 표시할 수 있습니다.

이 기사에서는 bulmq를 사용하여 시간 소모적인 작업을 백그라운드로 오프로드하여 애플리케이션이 사용자에게 신속하게 응답할 수 있도록 합니다. 먼저 bulmq를 사용하지 않고 시간이 많이 걸리는 작업으로 앱을 만듭니다. 그런 다음 bulmq를 사용하여 작업을 비동기식으로 실행합니다. 마지막으로 시각적 대시보드를 설치하여 Redis 대기열에서 bulmq 작업을 관리합니다.

전제 조건

이 자습서를 따르려면 다음이 필요합니다.

  • Node.js 개발 환경 설정. Ubuntu 22.04의 경우 Node.js를 설치하고 로컬 개발 환경을 만드는 방법에 대한 자습서를 따르십시오.\n
  • Redis가 시스템에 설치되었습니다. Ubuntu 22에서는 Redis 설치 및 보안 방법에 대한 자습서의 1~3단계를 따르십시오.\n
  • 이벤트 루프, 콜백, 약속 및 JavaScript의 Async/Await 이해에 익숙합니다.\n
  • Node.js 및 Express 시작 방법 사용 방법에 대한 기본 지식.\n
  • 자세한 내용은 EJS를 사용하여 노드 애플리케이션을 템플릿화하는 방법에 익숙합니다.\n
  • How To Process Images in Node.js with Sharp로 이미지를 처리하는 방법에 대한 기본적인 이해.\n

1단계 - 프로젝트 디렉토리 설정

이 단계에서는 디렉토리를 만들고 애플리케이션에 필요한 종속 항목을 설치합니다. 이 자습서에서 빌드할 애플리케이션은 사용자가 이미지를 업로드할 수 있도록 허용하며, 그런 다음 sharp 패키지를 사용하여 처리됩니다. 이미지 처리는 시간 집약적이며 요청/응답 주기를 늦출 수 있으므로 작업을 bullmq가 백그라운드로 오프로드하기에 좋은 후보로 만듭니다. 작업을 오프로드하는 데 사용할 기술은 시간이 많이 걸리는 다른 작업에도 적용됩니다.

시작하려면 image_processor라는 디렉터리를 만들고 디렉터리로 이동합니다.

  1. mkdir image_processor && cd image_processor

그런 다음 디렉터리를 npm 패키지로 초기화합니다.

  1. npm init -y

이 명령은 package.json 파일을 생성합니다. -y 옵션은 npm이 모든 기본값을 수락하도록 지시합니다.

명령을 실행하면 출력이 다음과 일치합니다.

Output
Wrote to /home/sammy/image_processor/package.json: { "name": "image_processor", "version": "1.0.0", "description": "", "main": "index.js", "scripts": { "test": "echo \"Error: no test specified\" && exit 1" }, "keywords": [], "author": "", "license": "ISC" }

출력에서 package.json 파일이 생성되었음을 확인합니다. 중요한 속성에는 앱 이름(name), 애플리케이션 버전 번호(version) 및 프로젝트 시작 지점(main)이 포함됩니다. ). 다른 속성에 대해 자세히 알아보려면 npm의 package.json 문서를 검토할 수 있습니다.

이 튜토리얼에서 빌드할 애플리케이션에는 다음 종속성이 필요합니다.

  • express: 웹 앱 구축을 위한 웹 프레임워크
  • express-fileupload: 양식에서 파일을 업로드할 수 있도록 하는 미들웨어입니다.
  • sharp: 이미지 처리 라이브러리.
  • ejs: Node.js로 HTML 마크업을 생성할 수 있는 템플릿 언어입니다.
  • bulmq: 분산 작업 대기열.
  • bull-board: bulmq를 기반으로 하는 대시보드로 멋진 사용자 인터페이스(UI)로 작업 상태를 표시합니다.

이러한 모든 종속성을 설치하려면 다음 명령을 실행하십시오.

  1. npm install express express-fileupload sharp ejs bullmq @bull-board/express

설치한 종속성 외에도 이 자습서의 뒷부분에서 다음 이미지를 사용합니다.

curl을 사용하여 로컬 컴퓨터의 원하는 위치에 이미지를 다운로드합니다.

  1. curl -O https://deved-images.nyc3.digitaloceanspaces.com/CART-68886/underwater.png

다음에 수행할 bullmq가 없는 Node.js 앱을 빌드하는 데 필요한 종속 항목이 있습니다.

2단계 — bullmq 없이 시간 집약적인 작업 구현

이 단계에서는 사용자가 이미지를 업로드할 수 있는 Express를 사용하여 애플리케이션을 빌드합니다. 앱은 sharp를 사용하여 시간이 많이 걸리는 작업을 시작하여 이미지 크기를 여러 크기로 조정한 다음 응답을 보낸 후 사용자에게 표시합니다. 이 단계는 시간 집약적인 작업이 요청/응답 주기에 미치는 영향을 이해하는 데 도움이 됩니다.

nano 또는 원하는 텍스트 편집기를 사용하여 index.js 파일을 만듭니다.

  1. nano index.js

index.js 파일에서 다음 코드를 추가하여 종속성을 가져옵니다.

const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

첫 번째 줄에서 Node로 파일 경로를 계산하기 위해 path 모듈을 가져옵니다. 두 번째 줄에서는 디렉터리와 상호 작용하기 위해 fs 모듈을 가져옵니다. 그런 다음 express 웹 프레임워크를 가져옵니다. body-parser 모듈을 가져와 미들웨어를 추가하여 HTTP 요청에서 데이터를 구문 분석합니다. 그런 다음 이미지 처리를 위해 sharp 모듈을 가져옵니다. 마지막으로 HTML 양식에서 업로드를 처리하기 위해 express-fileupload를 가져옵니다.

다음으로 다음 코드를 추가하여 앱에서 미들웨어를 구현합니다.

...
const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

먼저 app 변수를 Express 인스턴스로 설정합니다. 둘째, app 변수를 사용하여 set() 메서드는 ejs 템플릿 언어를 사용하도록 Express를 구성합니다. 그런 다음 use() 메서드와 함께 body-parser 모듈 미들웨어를 추가하여 HTTP 요청의 JSON 데이터를 JavaScript로 액세스할 수 있는 변수로 변환합니다. 다음 줄에서는 URL 인코딩 입력과 동일한 작업을 수행합니다.

다음으로 다음 줄을 추가하여 파일 업로드를 처리하고 정적 파일을 제공하는 미들웨어를 더 추가합니다.

...
app.use(fileUpload());
app.use(express.static("public"));

미들웨어를 추가하여 fileUpload() 메서드를 호출하여 업로드된 파일을 구문 분석하고 Express가 이미지 및 CSS와 같은 정적 파일을 보고 제공하는 디렉토리를 설정합니다.

미들웨어 세트를 사용하여 이미지 업로드를 위한 HTML 형식을 표시하는 경로를 만듭니다.

...
app.get("/", function (req, res) {
  res.render("form");
});

여기에서 Express 모듈의 get() 메서드를 사용하여 / 경로와 사용자가 홈페이지 또는 / 를 방문할 때 실행할 콜백을 지정합니다. 경로. 콜백에서 res.render()를 호출하여 views 디렉토리의 form.ejs 파일을 렌더링합니다. 아직 form.ejs 파일이나 views 디렉토리를 만들지 않았습니다.

그것을 만들려면 먼저 파일을 저장하고 닫으십시오. 터미널에서 다음 명령을 입력하여 프로젝트 루트 디렉터리에 views 디렉터리를 만듭니다.

  1. mkdir views

views 디렉터리로 이동합니다.

  1. cd views

편집기에서 form.ejs 파일을 생성합니다.

  1. nano form.ejs

form.ejs 파일에서 다음 코드를 추가하여 양식을 만듭니다.

<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="home-wrapper">
      <h1>Image Processor</h1>
      <p>
        Resizes an image to multiple sizes and converts it to a
        <a href="https://en.wikipedia.org/wiki/WebP">webp</a> format.
      </p>
      <form action="/upload" method="POST" enctype="multipart/form-data">
        <input
          type="file"
          name="image"
          placeholder="Select image from your computer"
        />
        <button type="submit">Upload Image</button>
      </form>
    </div>
  </body>
</html>

먼저 아직 만들지 않은 head.ejs 파일을 참조합니다. head.ejs 파일에는 다른 HTML 페이지에서 참조할 수 있는 HTML head 요소가 포함됩니다.

body 태그에서 다음 특성을 사용하여 양식을 만듭니다.

  • action은 양식이 제출될 때 양식 데이터가 전송되어야 하는 경로를 지정합니다.
  • method는 데이터 전송을 위한 HTTP 방법을 지정합니다. POST 메서드는 HTTP 요청에 데이터를 포함합니다.
  • encytype은 양식 데이터를 인코딩하는 방법을 지정합니다. multipart/form-data 값은 HTML input 요소가 파일 데이터를 업로드할 수 있도록 합니다.

form 요소에서 input 태그를 생성하여 파일을 업로드합니다. 그런 다음 양식을 제출할 수 있도록 submit으로 설정된 type 특성을 사용하여 button 요소를 정의합니다.

완료되면 파일을 저장하고 닫습니다.

다음으로 head.ejs 파일을 만듭니다.

  1. nano head.ejs

head.ejs 파일에서 다음 코드를 추가하여 앱의 헤드 섹션을 생성합니다.

<head>
  <meta charset="UTF-8" />
  <meta http-equiv="X-UA-Compatible" content="IE=edge" />
  <meta name="viewport" content="width=device-width, initial-scale=1.0" />
  <title>Image Processor</title>
  <link rel="stylesheet" href="css/main.css" />
</head>

여기에서 이 단계의 뒷부분에서 public 디렉터리에 만들 main.css 파일을 참조합니다. 해당 파일에는 이 애플리케이션의 스타일이 포함됩니다. 지금은 정적 자산에 대한 프로세스 설정을 계속합니다.

파일을 저장하고 닫습니다.

양식에서 제출된 데이터를 처리하려면 Express에서 post 메서드를 정의해야 합니다. 이렇게 하려면 프로젝트의 루트 디렉터리로 돌아갑니다.

  1. cd ..

index.js 파일을 다시 엽니다.

  1. nano index.js

index.js 파일에서 강조 표시된 줄을 추가하여 /upload 경로에서 양식 제출을 처리하는 방법을 정의합니다.

app.get("/", function (req, res) {
  ...
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

});

app 변수를 사용하여 /upload 경로에서 제출된 양식을 처리할 post() 메서드를 호출합니다. 다음으로 HTTP 요청에서 업로드된 이미지 데이터를 image 변수로 추출합니다. 그런 다음 사용자가 이미지를 업로드하지 않으면 400 상태 코드를 반환하도록 응답을 설정합니다.

업로드된 이미지에 대한 프로세스를 설정하려면 다음 강조 표시된 코드를 추가합니다.

...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
});

이 선은 앱이 이미지를 처리하는 방법을 나타냅니다. 먼저 업로드된 이미지에서 이미지 확장을 제거하고 imageName 변수에 이름을 저장합니다. 다음으로 processImage() 함수를 정의합니다. 이 함수는 크기를 조정하는 동안 이미지 크기를 결정하는 데 사용되는 값인 size 매개변수를 사용합니다. 함수에서 webp 이미지 형식인 image.datasharp()를 호출합니다. 그런 다음 public/images/ 디렉토리에 이미지를 저장합니다.

후속 숫자 목록은 업로드된 이미지의 크기를 조정하는 데 사용되는 크기를 정의합니다. 그런 다음 JavaScript의 map() 메서드를 사용하여 sizes 배열의 각 요소에 대해 processImage()를 호출한 후 새 배열을 반환합니다. . map() 메서드는 processImage() 함수를 호출할 때마다 새 배열에 약속을 반환합니다. Promise.all() 메서드를 사용하여 문제를 해결합니다.

컴퓨터 처리 속도는 사용자가 업로드할 수 있는 이미지 크기와 마찬가지로 다양하며, 이는 이미지 처리 속도에 영향을 미칠 수 있습니다. 데모 목적으로 이 코드를 지연하려면 강조 표시된 줄을 삽입하여 CPU 집약적인 증분 루프를 추가하고 강조 표시된 줄과 함께 크기 조정된 이미지를 표시할 페이지로 리디렉션합니다.

...
app.post("/upload", async function (req, res) {
  ...
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

루프는 카운터 변수를 증가시키기 위해 100억 번 실행됩니다. res.redirect() 함수를 호출하여 앱을 /result 경로로 리디렉션합니다. 경로는 public/images 디렉토리에 이미지를 표시할 HTML 페이지를 렌더링합니다.

/result 경로가 아직 존재하지 않습니다. 생성하려면 index.js 파일에 강조 표시된 코드를 추가합니다.

...

app.get("/", function (req, res) {
 ...
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  ...
});

app.get() 메서드를 사용하여 /result 경로를 정의합니다. 함수에서 imgDirPath 변수를 public/images 디렉토리의 전체 경로로 정의합니다. fs 모듈의 readdirSync() 메서드를 사용하여 지정된 디렉토리의 모든 파일을 읽습니다. 여기에서 map() 메서드를 연결하여 images/ 접두사가 붙은 이미지 경로가 있는 새 배열을 반환합니다.

마지막으로 res.render()를 호출하여 아직 존재하지 않는 result.ejs 파일을 렌더링합니다. 모든 이미지의 상대 경로 배열을 포함하는 imgFiles 변수를 result.ejs 파일에 전달합니다.

파일을 저장하고 닫습니다.

result.ejs 파일을 생성하려면 views 디렉토리로 돌아갑니다.

  1. cd views

편집기에서 result.ejs 파일을 만들고 엽니다.

  1. nano result.ejs

result.ejs 파일에서 다음 줄을 추가하여 이미지를 표시합니다.

<!DOCTYPE html>
<html lang="en">
  <%- include('./head'); %>
  <body>
    <div class="gallery-wrapper">
      <% if (imgFiles.length > 0){%>
      <p>The following are the processed images:</p>
      <ul>
        <% for (let imgFile of imgFiles){ %>
        <li><img src=<%= imgFile %> /></li>
        <% } %>
      </ul>
      <% } else{ %>
      <p>
        The image is being processed. Refresh after a few seconds to view the
        resized images.
      </p>
      <% } %>
    </div>
  </body>
</html>

먼저 head.ejs 파일을 참조합니다. body 태그에서 imgFiles 변수가 비어 있는지 확인합니다. 데이터가 있는 경우 각 파일을 반복하고 각 배열 요소에 대한 이미지를 만듭니다. imgFiles가 비어 있으면 사용자에게 크기가 조정된 이미지를 보려면 몇 초 후에 새로고침하라는 메시지를 인쇄합니다.

파일을 저장하고 닫습니다.

그런 다음 루트 디렉터리로 돌아가 정적 자산을 포함할 public 디렉터리를 만듭니다.

  1. cd .. && mkdir public

public 디렉터리로 이동합니다.

  1. cd public

업로드된 이미지를 보관할 images 디렉토리를 만듭니다.

  1. mkdir images

다음으로 css 디렉토리를 생성하고 해당 디렉토리로 이동합니다.

  1. mkdir css && cd css

편집기에서 이전에 head.ejs 파일에서 참조한 main.css 파일을 만들고 엽니다.

  1. nano main.css

main.css 파일에서 다음 스타일을 추가합니다.

body {
  background: #f8f8f8;
}

h1 {
  text-align: center;
}

p {
  margin-bottom: 20px;
}

a:link,
a:visited {
  color: #00bcd4;
}

/** Styles for the "Choose File"  button **/
button[type="submit"] {
  background: none;
  border: 1px solid orange;
  padding: 10px 30px;
  border-radius: 30px;
  transition: all 1s;
}

button[type="submit"]:hover {
  background: orange;
}

/** Styles for the "Upload Image"  button **/
input[type="file"]::file-selector-button {
  border: 2px solid #2196f3;
  padding: 10px 20px;
  border-radius: 0.2em;
  background-color: #2196f3;
}

ul {
  list-style: none;
  padding: 0;
  display: flex;
  flex-wrap: wrap;
  gap: 20px;
}

.home-wrapper {
  max-width: 500px;
  margin: 0 auto;
  padding-top: 100px;
}

.gallery-wrapper {
  max-width: 1200px;
  margin: 0 auto;
}

이 선은 앱의 요소 스타일을 지정합니다. HTML 속성을 사용하여 16진수 코드 #2196f3(파란색 음영)로 파일 선택 버튼 배경의 스타일을 지정하고 이미지 업로드 버튼 테두리를 주황색으로 지정합니다. 또한 /result 경로의 요소에 스타일을 지정하여 보기 쉽게 만듭니다.

완료되면 파일을 저장하고 닫습니다.

프로젝트 루트 디렉터리로 돌아갑니다.

  1. cd ../..

편집기에서 index.js를 엽니다.

  1. nano index.js

index.js에서 다음 코드를 추가하면 서버가 시작됩니다.

...
app.listen(3000, function () {
  console.log("Server running on port 3000");
});

이제 전체 index.js 파일이 다음과 일치합니다.

const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);

app.use(fileUpload());

app.use(express.static("public"));

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  }

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

변경을 마치면 파일을 저장하고 닫습니다.

node 명령을 사용하여 앱을 실행합니다.

  1. node index.js

다음과 같은 출력을 받게 됩니다.

Output
Server running on port 3000

이 출력은 서버가 문제 없이 실행되고 있음을 확인합니다.

원하는 브라우저를 열고 http://localhost:3000/을 방문하세요.

참고: 원격 서버에서 자습서를 따르는 경우 포트 전달을 사용하여 로컬 브라우저에서 앱에 액세스할 수 있습니다.

Node.js 서버가 실행되는 동안 다른 터미널을 열고 다음 명령을 입력합니다.

  1. ssh -L 3000:localhost:3000 your-non-root-user@yourserver-ip

서버에 연결했으면 node index.js를 실행한 다음 로컬 시스템의 웹 브라우저에서 http://localhost:3000/로 이동합니다.

페이지가 로드되면 다음과 일치합니다.

다음으로 파일 선택 버튼을 누르고 로컬 컴퓨터에서 underwater.png 이미지를 선택합니다. 디스플레이가 No file selected에서 underwater.png로 전환됩니다. 그런 다음 이미지 업로드 버튼을 누릅니다. 이미지를 처리하고 증분 루프를 실행하는 동안 앱이 잠시 로드됩니다.

작업이 완료되면 /result 경로가 크기 조정된 이미지와 함께 로드됩니다.

이제 CTRL+C로 서버를 중지할 수 있습니다. Node.js는 파일이 변경될 때 서버를 자동으로 다시 로드하지 않으므로 파일을 업데이트할 때마다 서버를 중지했다가 다시 시작해야 합니다.

이제 시간 집약적인 작업이 애플리케이션의 요청/응답 주기에 어떤 영향을 미칠 수 있는지 알게 되었습니다. 다음에 작업을 비동기적으로 실행합니다.

3단계 — 시간 집약적인 작업을 bullmq로 비동기식으로 실행

이 단계에서는 bulmq를 사용하여 시간이 많이 걸리는 작업을 백그라운드로 오프로드합니다. 이 조정은 요청/응답 주기를 해제하고 이미지가 처리되는 동안 앱이 사용자에게 즉시 응답할 수 있도록 합니다.

그렇게 하려면 작업에 대한 간결한 설명을 만들고 bullmq를 사용하여 대기열에 추가해야 합니다. FIFO(First-In, First-Out) 프로세스에서는 대기열에 추가된 첫 번째 항목이 제거될 첫 번째 항목입니다(대기열에서 빼기). bulmq를 사용하면 생산자가 작업을 대기열에 추가하고 소비자(또는 작업자)가 작업을 제거합니다. 대기열에서 작업을 가져와서 실행합니다.

bulmq의 대기열은 Redis에 있습니다. 작업을 설명하고 대기열에 추가하면 작업 항목이 Redis 대기열에 생성됩니다. 작업 설명은 bullmq가 나중에 작업을 실행할 수 있도록 허용하는 데이터에 대한 참조 또는 최소한의 데이터를 포함하는 속성이 있는 문자열 또는 개체일 수 있습니다. 대기열에 작업을 추가하는 기능을 정의한 후에는 시간 집약적인 코드를 별도의 함수로 옮깁니다. 나중에 bulmq는 작업이 대기열에서 제거될 때 대기열에 저장한 데이터로 이 함수를 호출합니다. 작업이 완료되면 bulmq는 완료된 것으로 표시하고 대기열에서 다른 작업을 가져와서 실행합니다.

편집기에서 index.js를 엽니다.

  1. nano index.js

index.js 파일에서 강조 표시된 줄을 추가하여 bullmq를 사용하여 Redis에서 대기열을 만듭니다.

...
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}
...

Redis에서 큐를 만드는 데 사용되는 bulmq에서 Queue 클래스를 추출하여 시작합니다. 그런 다음 redisOptions 변수를 Queue 클래스 인스턴스가 Redis와의 연결을 설정하는 데 사용할 속성이 있는 객체로 설정합니다. Redis가 로컬 시스템에서 실행 중이므로 host 속성 값을 localhost로 설정합니다.

참고: Redis가 앱과 별도의 원격 서버에서 실행 중인 경우 host 속성 값을 원격 서버의 IP 주소로 업데이트합니다. 또한 port 속성 값을 Redis가 연결을 수신하는 데 사용하는 기본 포트인 6379로 설정합니다.

Redis와 앱을 함께 실행하는 원격 서버에 대한 포트 포워딩을 설정한 경우 host 속성을 업데이트할 필요는 없지만 로그인할 때마다 포트 포워딩 연결을 사용해야 합니다. 앱을 실행하기 위해 서버에.

다음으로 imageJobQueue 변수를 Queue 클래스의 인스턴스로 설정하고 대기열의 이름을 첫 번째 인수로 사용하고 개체를 두 번째 인수로 사용합니다. 객체에는 redisOptions 변수의 객체로 설정된 값이 있는 connection 속성이 있습니다. Queue 클래스를 인스턴스화한 후 Redis에 imageJobQueue라는 큐가 생성됩니다.

마지막으로 imageJobQueue에 작업을 추가하는 데 사용할 addJob() 함수를 정의합니다. 이 함수는 작업에 대한 정보를 포함하는 job 매개변수를 사용합니다. \u2060(대기열에 저장하려는 데이터와 함께 addJob() 함수를 호출합니다). 함수에서 imageJobQueueadd() 메서드를 호출하고 작업 이름을 첫 번째 인수로 사용하고 작업 데이터를 두 번째 인수로 사용합니다.

강조 표시된 코드를 추가하여 addJob() 함수를 호출하여 대기열에 작업을 추가합니다.

...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  ...
  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

여기에서 작업을 설명하는 객체와 함께 addJob() 함수를 호출합니다. 개체에는 작업 이름 값이 있는 type 특성이 있습니다. 두 번째 속성인 image는 사용자가 업로드한 이미지 데이터를 포함하는 개체로 설정됩니다. image.data의 이미지 데이터는 버퍼(바이너리 형식)에 있으므로 JavaScript의 toString() 메서드를 호출하여 Redis에 저장할 수 있는 문자열로 변환합니다. , data 속성을 결과로 설정합니다. image 속성은 업로드된 이미지의 이름으로 설정됩니다(이미지 확장 포함).

이제 bulmq가 나중에 이 작업을 실행하는 데 필요한 정보를 정의했습니다. 작업에 따라 작업 정보를 더 많이 추가하거나 더 적게 추가할 수 있습니다.

경고: Redis는 메모리 내 데이터베이스이므로 작업에 대한 대량의 데이터를 대기열에 저장하지 마십시오. 작업을 처리해야 하는 큰 파일이 있는 경우 파일을 디스크나 클라우드에 저장한 다음 파일에 대한 링크를 문자열로 대기열에 저장합니다. bulmq가 작업을 실행하면 Redis에 저장된 링크에서 파일을 가져옵니다.

파일을 저장하고 닫습니다.

다음으로 이미지 처리 코드를 포함할 utils.js 파일을 만들고 엽니다.

  1. nano utils.js

utils.js 파일에서 다음 코드를 추가하여 이미지 처리 기능을 정의합니다.

const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
}

module.exports = { processUploadedImages };

처음 두 줄에서 이미지를 처리하고 경로를 계산하는 데 필요한 모듈을 가져옵니다. 그런 다음 시간 집약적인 이미지 처리 작업을 포함할 processUploadedImages() 함수를 정의합니다. 이 함수는 작업자가 대기열에서 작업 데이터를 가져온 다음 대기열 데이터로 processUploadedImages() 함수를 호출할 때 채워지는 job 매개변수를 사용합니다. 또한 다른 파일에서 참조할 수 있도록 processUploadedImages() 함수를 내보냅니다.

파일을 저장하고 닫습니다.

index.js 파일로 돌아갑니다.

  1. nano index.js

index.js 파일에서 강조 표시된 줄을 복사한 다음 이 파일에서 삭제합니다. 잠시 복사한 코드가 필요하므로 클립보드에 저장하십시오. nano를 사용하는 경우 다음 줄을 강조 표시하고 마우스 오른쪽 버튼을 클릭하여 줄을 복사할 수 있습니다.

...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage))
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
...
  res.redirect("/result");
});

upload 경로에 대한 post 메서드는 이제 다음과 일치합니다.

...
app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: image.data.toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});
...

이 파일을 저장하고 닫은 다음 utils.js 파일을 엽니다.

  1. nano utils.js

utils.js 파일에서 /upload 경로 콜백에 대해 방금 복사한 줄을 processUploadedImages 함수에 붙여넣습니다.

...
function processUploadedImages(job) {
  const imageName = path.parse(image.name).name;
  const processImage = (size) =>
    sharp(image.data)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}
...

이제 이미지 처리를 위한 코드를 이동했으므로 앞에서 정의한 processUploadedImages() 함수의 job 매개변수의 이미지 데이터를 사용하도록 업데이트해야 합니다.

그렇게 하려면 아래 강조 표시된 줄을 추가하고 업데이트하십시오.


function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);
  ...
}

Buffer.from() 메서드를 사용하여 이미지 데이터의 문자열화된 버전을 다시 바이너리로 변환합니다. 그런 다음 대기열에 저장된 이미지 이름에 대한 참조로 path.parse()를 업데이트합니다. 그런 다음 sharp() 메서드를 업데이트하여 imageFileData 변수에 저장된 이미지 이진 데이터를 가져옵니다.

완전한 utils.js 파일은 이제 다음과 일치합니다.

const path = require("path");
const sharp = require("sharp");

function processUploadedImages(job) {
  const imageFileData = Buffer.from(job.image.data, "base64");
  const imageName = path.parse(job.image.name).name;
  const processImage = (size) =>
    sharp(imageFileData)
      .resize(size, size)
      .webp({ lossless: true })
      .toFile(`./public/images/${imageName}-${size}.webp`);

  sizes = [90, 96, 120, 144, 160, 180, 240, 288, 360, 480, 720, 1440];
  Promise.all(sizes.map(processImage));
  let counter = 0;
  for (let i = 0; i < 10_000_000_000; i++) {
    counter++;
  };
}

module.exports = { processUploadedImages };

파일을 저장하고 닫은 다음 index.js로 돌아갑니다.

  1. nano index.js

이미지가 이제 utils.js 파일에서 처리되기 때문에 sharp 변수는 더 이상 종속성으로 필요하지 않습니다. 파일에서 강조 표시된 줄을 삭제합니다.

const bodyParser = require("body-parser");
const sharp = require("sharp");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
...

파일을 저장하고 닫습니다.

이제 Redis에서 대기열을 생성하고 작업을 추가하는 기능을 정의했습니다. 업로드된 이미지를 처리하기 위해 processUploadedImages() 함수도 정의했습니다.

남은 작업은 대기열에서 작업을 가져오고 processUploadedImages() 함수를 작업 데이터.

편집기에서 worker.js 파일을 만듭니다.

  1. nano worker.js

worker.js 파일에 다음 코드를 추가합니다.

const { Worker } = require("bullmq");

const { processUploadedImages } = require("./utils");

const workerHandler = (job) => {
  console.log("Starting job:", job.name);
  processUploadedImages(job.data);
  console.log("Finished job:", job.name);
  return;
};

첫 번째 줄에서는 bulmq에서 Worker 클래스를 가져옵니다. 인스턴스화되면 Redis의 대기열에서 작업을 제거하고 실행하는 작업자를 시작합니다. 다음으로 utils.js 파일에서 processUploadedImages() 함수를 참조하여 작업자가 대기열의 데이터로 함수를 호출할 수 있도록 합니다.

대기열의 작업 데이터를 포함하는 job 매개변수를 사용하는 workerHandler() 함수를 정의합니다. 함수에서 작업이 시작되었음을 기록한 다음 작업 데이터로 processUploadedImages()를 호출합니다. 그런 다음 성공 메시지를 기록하고 null을 반환합니다.

작업자가 Redis에 연결하고 대기열에서 작업을 제거하고 작업 데이터로 workerHandler()를 호출하도록 허용하려면 파일에 다음 줄을 추가합니다.

...
const workerOptions = {
  connection: {
    host: "localhost",
    port: 6379,
  },
};

const worker = new Worker("imageJobQueue", workerHandler, workerOptions);

console.log("Worker started!");

여기에서 workerOptions 변수를 Redis의 연결 설정을 포함하는 개체로 설정합니다. worker 변수를 다음 매개변수를 사용하는 Worker 클래스의 인스턴스로 설정합니다.

  • imageJobQueue: 작업 대기열의 이름입니다.
  • workerHandler: 작업이 Redis 대기열에서 제거된 후 실행될 함수입니다.
  • workerOptions: 작업자가 Redis와의 연결을 설정하는 데 사용하는 Redis 구성 설정입니다.

마지막으로 성공 메시지를 기록합니다.

줄을 추가한 후 파일을 저장하고 닫습니다.

이제 대기열에서 작업을 제거하고 실행하는 bulmq 작업자 기능을 정의했습니다.

터미널에서 public/images 디렉토리의 이미지를 제거하여 앱 테스트를 새로 시작할 수 있습니다.

  1. rm public/images/*

다음으로 index.js 파일을 실행합니다.

  1. node index.js

앱이 시작됩니다:

Output
Server running on port 3000

이제 작업자를 시작합니다. 두 번째 터미널 세션을 열고 프로젝트로 직접 이동합니다.

  1. cd image_processor/

다음 명령을 사용하여 작업자를 시작합니다.

  1. node worker.js

작업자는 다음을 시작합니다.

Output
Worker started!

브라우저에서 http://localhost:3000/을 방문하세요. 파일 선택 버튼을 누르고 컴퓨터에서 underwater.png를 선택한 다음 이미지 업로드 버튼을 누릅니다.

몇 초 후에 페이지를 새로고침하라는 즉각적인 응답을 받을 수 있습니다.

또는 다른 이미지가 아직 처리되는 동안 페이지에서 일부 처리된 이미지에 대한 즉각적인 응답을 받을 수 있습니다.

크기가 조정된 모든 이미지를 로드하려면 페이지를 몇 번 새로 고칠 수 있습니다.

작업자가 실행 중인 터미널로 돌아갑니다. 해당 터미널에는 다음과 일치하는 메시지가 있습니다.

Output
Worker started! Starting job: processUploadedImages Finished job: processUploadedImages

출력은 bulmq가 작업을 성공적으로 실행했음을 확인합니다.

작업자가 실행되고 있지 않더라도 앱은 여전히 시간 집약적인 작업을 오프로드할 수 있습니다. 이를 시연하려면 CTRL+C를 사용하여 두 번째 터미널에서 작업자를 중지합니다.

초기 터미널 세션에서 Express 서버를 중지하고 public/images에서 이미지를 제거합니다.

  1. rm public/images/*

그런 다음 서버를 다시 시작하십시오.

  1. node index.js

브라우저에서 http://localhost:3000/을 방문하여 underwater.png 이미지를 다시 업로드하세요. /result 경로로 리디렉션되면 작업자가 실행되고 있지 않기 때문에 페이지에 이미지가 표시되지 않습니다.

작업자를 실행한 터미널로 돌아가 작업자를 다시 시작합니다.

  1. node worker.js

출력은 다음과 일치하여 작업이 시작되었음을 알 수 있습니다.

Output
Worker started! Starting job: processUploadedImages

작업이 완료되고 출력에 Finished job: processUploadedImages라는 줄이 포함된 후 브라우저를 새로 고칩니다. 이제 이미지가 로드됩니다.

서버와 작업자를 중지합니다.

이제 시간 집약적인 작업을 백그라운드로 오프로드하고 bulmq를 사용하여 비동기식으로 실행할 수 있습니다. 다음 단계에서는 대시보드를 설정하여 대기열의 상태를 모니터링합니다.

4단계 - bullmq 대기열 모니터링을 위한 대시보드 추가

이 단계에서는 bul-board 패키지를 사용하여 시각적 대시보드에서 Redis 대기열의 작업을 모니터링합니다. 이 패키지는 Redis 대기열에 저장된 bulmq 작업에 대한 정보를 표시하고 구성하는 사용자 인터페이스(UI) 대시보드를 자동으로 생성합니다. 브라우저를 사용하여 터미널에서 Redis CLI를 열지 않고도 완료되었거나 대기 중이거나 실패한 작업을 모니터링할 수 있습니다.

텍스트 편집기에서 index.js 파일을 엽니다.

  1. nano index.js

강조 표시된 코드를 추가하여 bul-board를 가져옵니다.

...
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");
...

이전 코드에서는 bullboard에서 createBullBoard() 메서드를 가져옵니다. 또한 bulmq 큐에 대한 bulmq 액세스를 허용하는 BullMQAdapter와 다음을 위한 기능을 제공하는 ExpressAdapter를 가져옵니다. Express는 대시보드를 표시합니다.

다음으로 강조 표시된 코드를 추가하여 bulmqbul-board를 연결합니다.

...
async function addJob(job) {
  ...
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
...

먼저 serverAdapterExpressAdapter의 인스턴스로 설정합니다. 다음으로 createBullBoard()를 호출하여 bullmq 대기열 데이터로 대시보드를 초기화합니다. 함수에 queuesserverAdapter 속성이 있는 개체 인수를 전달합니다. 첫 번째 속성인 queues는 여기서는 imageJobQueuebullmq로 정의한 대기열의 배열을 허용합니다. 두 번째 속성 serverAdapter에는 Express 서버 어댑터의 인스턴스를 허용하는 개체가 포함되어 있습니다. 그런 다음 setBasePath() 메서드로 대시보드에 액세스할 /admin 경로를 설정합니다.

다음으로 /admin 경로에 대한 serverAdapter 미들웨어를 추가합니다.

app.use(express.static("public"))

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  ...
});

전체 index.js 파일은 다음과 일치합니다.

const path = require("path");
const fs = require("fs");
const express = require("express");
const bodyParser = require("body-parser");
const fileUpload = require("express-fileupload");
const { Queue } = require("bullmq");
const { createBullBoard } = require("@bull-board/api");
const { BullMQAdapter } = require("@bull-board/api/bullMQAdapter");
const { ExpressAdapter } = require("@bull-board/express");

const redisOptions = { host: "localhost", port: 6379 };

const imageJobQueue = new Queue("imageJobQueue", {
  connection: redisOptions,
});

async function addJob(job) {
  await imageJobQueue.add(job.type, job);
}

const serverAdapter = new ExpressAdapter();
const bullBoard = createBullBoard({
  queues: [new BullMQAdapter(imageJobQueue)],
  serverAdapter: serverAdapter,
});
serverAdapter.setBasePath("/admin");

const app = express();
app.set("view engine", "ejs");
app.use(bodyParser.json());
app.use(
  bodyParser.urlencoded({
    extended: true,
  })
);
app.use(fileUpload());

app.use(express.static("public"));

app.use("/admin", serverAdapter.getRouter());

app.get("/", function (req, res) {
  res.render("form");
});

app.get("/result", (req, res) => {
  const imgDirPath = path.join(__dirname, "./public/images");
  let imgFiles = fs.readdirSync(imgDirPath).map((image) => {
    return `images/${image}`;
  });
  res.render("result", { imgFiles });
});

app.post("/upload", async function (req, res) {
  const { image } = req.files;

  if (!image) return res.sendStatus(400);

  await addJob({
    type: "processUploadedImages",
    image: {
      data: Buffer.from(image.data).toString("base64"),
      name: image.name,
    },
  });

  res.redirect("/result");
});

app.listen(3000, function () {
  console.log("Server running on port 3000");
});

변경을 완료한 후 파일을 저장하고 닫습니다.

index.js 파일을 실행합니다.

  1. node index.js

브라우저로 돌아가서 http://localhost:3000/admin을 방문하십시오. 대시보드는 다음을 로드합니다.

대시보드에서 작업 유형, 사용하는 데이터 및 작업에 대한 추가 정보를 검토할 수 있습니다. 완료된 작업에 대한 정보를 보려면 완료됨 탭, 실패한 작업에 대한 자세한 정보를 보려면 실패 탭, 일시 중지된 작업에 대한 자세한 정보를 보려면 일시 중지됨 탭과 같은 다른 탭으로 전환할 수도 있습니다.

이제 게시판 대시보드를 사용하여 대기열을 모니터링할 수 있습니다.

결론

이 문서에서는 bulmq를 사용하여 시간이 많이 걸리는 작업을 작업 대기열로 오프로드했습니다. 먼저 bulmq를 사용하지 않고 요청/응답 주기가 느린 시간 집약적인 작업으로 앱을 만들었습니다. 그런 다음 bulmq를 사용하여 시간 집약적인 작업을 오프로드하고 비동기식으로 실행하여 요청/응답 주기를 높였습니다. 그런 다음 bull-board를 사용하여 Redis에서 bulmq 대기열을 모니터링하는 대시보드를 만들었습니다.

게시판 문서를 방문하여 대시보드 기능에 대해 자세히 알아볼 수 있습니다.