From 3b9b84c88060d21406f1f458d4f8c7bbfec14f15 Mon Sep 17 00:00:00 2001 From: sooyeon-kr Date: Mon, 9 Sep 2024 13:14:15 +0900 Subject: [PATCH] =?UTF-8?q?feat:=20queue=20=EC=9D=B4=EB=A6=84=20=EB=B3=80?= =?UTF-8?q?=EA=B2=BD=20=EB=B0=8F=20=EC=B2=98=EB=A6=AC=20=EC=A4=91=20?= =?UTF-8?q?=EC=98=A4=EB=A5=98=20=EB=B0=9C=EC=83=9D=20=EC=8B=9C=20=EB=A9=94?= =?UTF-8?q?=EC=8B=9C=EC=A7=80=ED=81=90=EC=97=90=20=EB=8B=A4=EC=8B=9C=20?= =?UTF-8?q?=EB=84=A3=EA=B8=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/constants.js | 2 +- src/mq_handler.js | 113 ++++++++++++++++++++++++++-------------------- 2 files changed, 64 insertions(+), 51 deletions(-) diff --git a/src/constants.js b/src/constants.js index dafc390..a9ec811 100644 --- a/src/constants.js +++ b/src/constants.js @@ -6,7 +6,7 @@ export const USER_TYPES = { // 큐 이름 상수 export const QUEUE_NAMES = { - CONSULTING_ROOM_CREATION: 'consulting.room.creation', + CONSULTING_ROOM_CREATION: 'consultingRoomCreationQueue', }; // 디바이스 타입 상수 diff --git a/src/mq_handler.js b/src/mq_handler.js index 0603ed5..653a651 100644 --- a/src/mq_handler.js +++ b/src/mq_handler.js @@ -4,59 +4,72 @@ import { saveConsultingRoomInfo } from './redis_client.js'; import { QUEUE_NAMES } from './constants.js'; function setupMQ() { - amqp.connect( - `amqp://${ENV.RABBITMQ_USER}:${ENV.RABBITMQ_PASSWORD}@${ENV.RABBITMQ_HOST}:${ENV.RABBITMQ_PORT}`, - - (error0, connection) => { - if (error0) { - throw error0; - } - console.log(`rabbitMQ(${ENV.RABBITMQ_HOST})에 연결 완료`); - - connection.createChannel((error1, channel) => { - if (error1) { - throw error1; - } + amqp.connect( + `amqp://${ENV.RABBITMQ_USER}:${ENV.RABBITMQ_PASSWORD}@${ENV.RABBITMQ_HOST}:${ENV.RABBITMQ_PORT}`, + + (error0, connection) => { + if (error0) { + throw error0; + } + console.log(`rabbitMQ(${ENV.RABBITMQ_HOST})에 연결 완료`); - const queues = [QUEUE_NAMES.CONSULTING_ROOM_CREATION]; - - queues.forEach(queue => { - // channel.deleteQueue(queue); - channel.assertQueue(queue, { - durable: true, - }); - - console.log( - `[*] Queue(${queue})에서 메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.` - ); - - channel.consume( - queue, - async message => { - if (message != null) { - try { - const messageContent = JSON.parse(message.content.toString()); - console.log(` [x] Received at Queue(${queue})`); - console.log(messageContent); - - if (queue === QUEUE_NAMES.CONSULTING_ROOM_CREATION) { - const { roomId } = messageContent; - await saveConsultingRoomInfo({ roomId, roomInfo: messageContent }); - channel.ack(message); - } - } catch (error) { - console.error('메시지 처리 중 오류 발생: ', error); - // channel.nack(message, false, true); + connection.createChannel((error1, channel) => { + if (error1) { + throw error1; } - } - }, - { noAck: false } - ); - }); - }); - } - ); + const queues = [QUEUE_NAMES.CONSULTING_ROOM_CREATION]; + + queues.forEach((queue) => { + // channel.deleteQueue(queue); + channel.assertQueue(queue, { + durable: true, + }); + + console.log( + `[*] Queue(${queue})에서 메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.` + ); + + channel.consume( + queue, + async (message) => { + if (message != null) { + try { + // 메시지 내용이 JSON 형식인지 검증 + const messageContent = JSON.parse( + message.content.toString() + ); + console.log( + ` [x] Received at Queue(${queue})` + ); + console.log(messageContent); + + if ( + queue === + QUEUE_NAMES.CONSULTING_ROOM_CREATION + ) { + const { roomId } = messageContent; + await saveConsultingRoomInfo({ + roomId, + roomInfo: messageContent, + }); + channel.ack(message); + } + } catch (error) { + console.error( + '메시지 처리 중 오류 발생: ', + error + ); + channel.nack(message, false, false); + } + } + }, + { noAck: false } + ); + }); + }); + } + ); } export default setupMQ;