본문 바로가기
Node.Js
2020.11.17 09:30

RabbitMQ Wrapper 클래스

다물칸 주소복사
조회 수 63 추천 수 0 댓글 0
?

단축키

Prev이전 문서

Next다음 문서

크게 작게 위로 아래로 댓글로 가기 인쇄
?

단축키

Prev이전 문서

Next다음 문서

크게 작게 위로 아래로 댓글로 가기 인쇄
Extra Form
구분 팁&트릭
출처 내가작성

RabbitMQ 3.8.0 기준으로 작성된 클래스입니다. 

/**
 *  Copyright 2002-2021 Enjoydev.
 *  + RabbitMQ wrapping class
 */
'use strict'

const amqp = require('amqplib');

class RabbitmqWrapper {
  constructor(rabbitMQUrl, queueName, options) {
    this._url = rabbitMQUrl;
    this._queueName = queueName;
    this._options = options || {};

    // public
    this.channel = undefined;
    this.queue = undefined;
  }

  async setup() {
    const connect = await amqp.connect(this._url);
    console.log('RabbitMQ Connect to ' + this._url + '/ Queue Name = ' + this._queueName);
    const channel = await connect.createChannel();
    this.channel = channel;
  }

  async assertQueue() {
    const queue = await this.channel.assertQueue(this._queueName, { durable: false });
    this.queue = queue;
  }

  async sendToQueue(msg) {
    const sending = await this.channel.sendToQueue(this._queueName, this.encode(msg));
    return sending;
  }

  async recvFromQueue() {
    const message = await this.channel.get(this._queueName, {});
    if (message) {
      this.channel.ack(message);
      return message.content.toString();
    } else {
      return null;
    }
  }

  encode(doc) {
    return Buffer.from(JSON.stringify(doc));
  }
}

module.exports = RabbitmqWrapper;

사용법

const rqURL = 'amqp://localhost:5672';
const rqName = 'channel1';
// 클래스선언
const RabbitmqWrapperClass = require('..');
// 클래스 초기화
const rq = new RabbitmqWrapperClass(rqURL, rqName);
await rq.setup();
await rq.assertQueue();

// 큐에 발행
const payload = {
  a: 1,
  b: 'string'
};
await rq.sendToQueue(payload);

// 큐에 있는것 구독 
// 만약 구독프로세스가 따로 존재한다면 위에 클래스 초기화를 똑같이 선언하면 됩니다.
// 구독은 settimer로 돌리면 됩니다. 저는 아래처럼 구현했습니다.
  async setTimer() {
    // Consume은 10ms 주기적으로 실행한다. (실행중이면 SKIP)
    setInterval(async () => {
      if (!this.procConsuming) {
        await this.procConsume();
      }
    }, 100);
  }

실제 구독코드

async procConsume() {
    this.procConsuming = true;

    // 이거 실행하면 queue에서 삭제된다. 저장 프로세스 완료되면 삭제되어야 할것 같은데
    let queData = await this.rq.recvFromQueue();
    if (!queData) {
      this.procConsuming = false;
      return;
    }

    queData = JSON.parse(queData);
    const a = queData.a;
    const b = queData.b;

    // 처리프로세스 진행

    this.procConsuming = false;
}

 

위에서 저장프로세스 완료한 후, 큐에서 삭제하는 방법

큐가 죽었을 경우 처리하는 방법