Node.Js
2020.11.17 09:30
RabbitMQ Wrapper 클래스
다물칸 주소복사
조회 수 63 추천 수 0 댓글 0
구분 | 팁&트릭 |
---|---|
출처 | 내가작성 |
RabbitMQ 3.8.0 기준으로 작성된 클래스입니다.
/** * Copyright 2002-2021 Enjoydev. * + RabbitMQ wrapping class */ 'use strict' const amqp = require('amqplib'); class RabbitmqWrapper { constructor(rabbitMQUrl, queueName, options) { this._url = rabbitMQUrl; this._queueName = queueName; this._options = options || {}; // public this.channel = undefined; this.queue = undefined; } async setup() { const connect = await amqp.connect(this._url); console.log('RabbitMQ Connect to ' + this._url + '/ Queue Name = ' + this._queueName); const channel = await connect.createChannel(); this.channel = channel; } async assertQueue() { const queue = await this.channel.assertQueue(this._queueName, { durable: false }); this.queue = queue; } async sendToQueue(msg) { const sending = await this.channel.sendToQueue(this._queueName, this.encode(msg)); return sending; } async recvFromQueue() { const message = await this.channel.get(this._queueName, {}); if (message) { this.channel.ack(message); return message.content.toString(); } else { return null; } } encode(doc) { return Buffer.from(JSON.stringify(doc)); } } module.exports = RabbitmqWrapper;
사용법
const rqURL = 'amqp://localhost:5672'; const rqName = 'channel1'; // 클래스선언 const RabbitmqWrapperClass = require('..'); // 클래스 초기화 const rq = new RabbitmqWrapperClass(rqURL, rqName); await rq.setup(); await rq.assertQueue(); // 큐에 발행 const payload = { a: 1, b: 'string' }; await rq.sendToQueue(payload); // 큐에 있는것 구독 // 만약 구독프로세스가 따로 존재한다면 위에 클래스 초기화를 똑같이 선언하면 됩니다. // 구독은 settimer로 돌리면 됩니다. 저는 아래처럼 구현했습니다. async setTimer() { // Consume은 10ms 주기적으로 실행한다. (실행중이면 SKIP) setInterval(async () => { if (!this.procConsuming) { await this.procConsume(); } }, 100); }
실제 구독코드
async procConsume() { this.procConsuming = true; // 이거 실행하면 queue에서 삭제된다. 저장 프로세스 완료되면 삭제되어야 할것 같은데 let queData = await this.rq.recvFromQueue(); if (!queData) { this.procConsuming = false; return; } queData = JSON.parse(queData); const a = queData.a; const b = queData.b; // 처리프로세스 진행 this.procConsuming = false; }