Skip to content

Commit

Permalink
Merge pull request #24 from PizzaPickle/deploy
Browse files Browse the repository at this point in the history
Deploy
  • Loading branch information
sooyeon-kr authored Sep 9, 2024
2 parents 719d203 + 2418884 commit 38f8ee3
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 51 deletions.
2 changes: 1 addition & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export const USER_TYPES = {

// 큐 이름 상수
export const QUEUE_NAMES = {
CONSULTING_ROOM_CREATION: 'consulting.room.creation',
CONSULTING_ROOM_CREATION: 'consultingRoomCreationQueue',
};

// 디바이스 타입 상수
Expand Down
113 changes: 63 additions & 50 deletions src/mq_handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,72 @@ import { saveConsultingRoomInfo } from './redis_client.js';
import { QUEUE_NAMES } from './constants.js';

function setupMQ() {
amqp.connect(
`amqp://${ENV.RABBITMQ_USER}:${ENV.RABBITMQ_PASSWORD}@${ENV.RABBITMQ_HOST}:${ENV.RABBITMQ_PORT}`,

(error0, connection) => {
if (error0) {
throw error0;
}
console.log(`rabbitMQ(${ENV.RABBITMQ_HOST})에 연결 완료`);

connection.createChannel((error1, channel) => {
if (error1) {
throw error1;
}
amqp.connect(
`amqp://${ENV.RABBITMQ_USER}:${ENV.RABBITMQ_PASSWORD}@${ENV.RABBITMQ_HOST}:${ENV.RABBITMQ_PORT}`,

(error0, connection) => {
if (error0) {
throw error0;
}
console.log(`rabbitMQ(${ENV.RABBITMQ_HOST})에 연결 완료`);

const queues = [QUEUE_NAMES.CONSULTING_ROOM_CREATION];

queues.forEach(queue => {
// channel.deleteQueue(queue);
channel.assertQueue(queue, {
durable: true,
});

console.log(
`[*] Queue(${queue})에서 메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.`
);

channel.consume(
queue,
async message => {
if (message != null) {
try {
const messageContent = JSON.parse(message.content.toString());
console.log(` [x] Received at Queue(${queue})`);
console.log(messageContent);

if (queue === QUEUE_NAMES.CONSULTING_ROOM_CREATION) {
const { roomId } = messageContent;
await saveConsultingRoomInfo({ roomId, roomInfo: messageContent });
channel.ack(message);
}
} catch (error) {
console.error('메시지 처리 중 오류 발생: ', error);
// channel.nack(message, false, true);
connection.createChannel((error1, channel) => {
if (error1) {
throw error1;
}
}
},
{ noAck: false }
);
});
});
}
);

const queues = [QUEUE_NAMES.CONSULTING_ROOM_CREATION];

queues.forEach((queue) => {
// channel.deleteQueue(queue);
channel.assertQueue(queue, {
durable: true,
});

console.log(
`[*] Queue(${queue})에서 메시지를 기다리고 있습니다. 종료하려면 CTRL+C를 누르세요.`
);

channel.consume(
queue,
async (message) => {
if (message != null) {
try {
// 메시지 내용이 JSON 형식인지 검증
const messageContent = JSON.parse(
message.content.toString()
);
console.log(
` [x] Received at Queue(${queue})`
);
console.log(messageContent);

if (
queue ===
QUEUE_NAMES.CONSULTING_ROOM_CREATION
) {
const { roomId } = messageContent;
await saveConsultingRoomInfo({
roomId,
roomInfo: messageContent,
});
channel.ack(message);
}
} catch (error) {
console.error(
'메시지 처리 중 오류 발생: ',
error
);
channel.nack(message, false, false);
}
}
},
{ noAck: false }
);
});
});
}
);
}

export default setupMQ;

0 comments on commit 38f8ee3

Please sign in to comment.