Beenslab Blog

비동기 처리는 언제 해야 할까? - Transactional Outbox 패턴으로 메시지 유실 방지하기

beenchangseo·2025년 3월 7일Hits

– Transactional Outbox 패턴으로 메시지 유실 방지하기

P2P 거래 및 에스크로 시스템을 설계하면서 가장 많이 고민했던 것 중 하나는 어디까지를 동기 처리로 하고, 어떤 시점부터 비동기 처리로 전환해야 할까? 였습니다. 특히 사용자 자산이 실시간으로 이동해야 하는 시나리오에서는 비동기 처리가 성능과 신뢰성 사이의 균형을 요구합니다.

이번 글에서는 실제로 겪었던 문제 상황과, Transactional Outbox 패턴을 통해 메시지 유실을 방지하고 안정적인 비동기 처리를 구현한 과정을 공유합니다.


❓ 비동기 처리는 언제 해야 할까?

처음 P2P/에스크로 시스템을 설계할 때 고민은 다음과 같았습니다:

⚙️ 초안 설계 흐름

mermaid1

✅ 1안: REST 요청 후 즉시 큐에 메시지 넣기 (SQS)

  • 장점: 사용자에게 빠른 응답 제공 (200 OK)
  • 단점:
    • 메시지 유실 가능성
    • 클라이언트는 결과를 알 방법이 없음 → UX 불안정

🛡 2안: 모든 처리 완료 후 응답 (동기)

  • 장점: 사용자는 확실한 처리 결과를 받음
  • 단점:
    • 트랜잭션 지속 시간 증가
    • 장애 발생 시 재시도 복잡, 타임아웃 가능성

🔁 3안: DB Insert → 큐 발송 → 비동기 처리

  • 장점: DB에는 저장됨
  • 단점: DB에는 저장되었지만 큐 발송 실패 → 메시지 유실 발생 가능

🧠 고민의 핵심: 메시지 유실을 어떻게 방지할까?

이 문제의 해결을 위해 Transactional Outbox 패턴을 도입했습니다.

핵심 아이디어

  • 메시지를 직접 외부로 발행하는 대신,
    같은 트랜잭션 내에서 DB에 안전하게 저장하고,
  • 별도 프로세스가 이를 읽어 외부 시스템으로 전달하는 구조입니다.

📢 Transactional Outbox 패턴이란?

DB 트랜잭션과 메시지 발행 사이의 일관성을 보장하는 패턴

Transactional Outbox 패턴은 하나의 트랜잭션 내에서 비즈니스 데이터와 이벤트 데이터를 동시에 저장합니다. 이벤트는 별도 Processor가 주기적으로 읽어 외부 시스템으로 전달하거나 비동기 작업을 수행합니다.

  • DB 트랜잭션 안에서 비즈니스 데이터와 이벤트를 함께 저장
  • 저장된 이벤트는 별도의 Outbox Processor가 주기적으로 읽어
    gRPC 또는 MQ 메시지로 전송

아키텍처 다이어그램

mermaid2


✍️ 스키마 설계

CREATE TABLE p2p.transaction_outbox (
    id uuid DEFAULT gen_random_uuid() NOT NULL PRIMARY KEY,
    idempotency_key TEXT NOT NULL, -- 멱등성 보장
    transaction_type TEXT NOT NULL, -- ex: 'ad_create', 'order_complete'
    transaction_ref_id TEXT NOT NULL, -- 광고ID, 주문ID 등
    status TEXT NOT NULL DEFAULT 'pending', -- 'pending', 'processing', 'processed', 'failed'
    payload JSONB NOT NULL, -- 실행에 필요한 데이터
    create_time TIMESTAMPTZ DEFAULT now(),
    processed_time TIMESTAMPTZ,
    last_attempt_time TIMESTAMPTZ,
    retry_count INT DEFAULT 0,
    error_message TEXT
);

🛠 Outbox Processor의 핵심 로직, 실전 코드로 살펴보기

1. 서버 재시작 시 processing 상태 복구

서버가 재시작될 때, 이전에 처리 중이던 항목(processing 상태)을 자동으로 pending으로 복구하여
장애 상황에서도 메시지 유실 없이 안정적으로 재처리가 가능하도록 했습니다.

// 서버 시작 시 processing 상태를 pending으로 복구
async onModuleInit() {
    const resetCount = await this.transactionOutboxRepository.resetStuckEntries();
    if (resetCount > 0) {
        this.logger.log(`Server is started. ${resetCount} items are reset to pending status.`);
    }
    this.isRunning = true;
}

// 실제 복구 쿼리
async resetStuckEntries(): Promise<number> {
    const result = await this.pgUtil.pool.query(sql`
        UPDATE p2p.transaction_outbox
        SET status = 'pending'
        WHERE status = 'processing'
    `);
    return result.rowCount;
}

2. Polling 및 동시성 제어

Outbox Processor는 1초마다 polling 하며,
isProcessingLock으로 동시 실행을 방지하여 멀티 인스턴스 환경에서도 중복 처리를 막습니다.

@Interval(1000) // 1초마다 실행
async processOutbox() {
    if (!this.isRunning) return;
    if (this.isProcessingLock) return;

    this.isProcessingLock = true;
    try {
        await this.processTransactionsByTypes([TransactionType.AD_CREATE, TransactionType.AD_CLOSE, TransactionType.AD_UPDATE]);
        await this.processTransactionsByTypes([TransactionType.ORDER_CREATE, TransactionType.ORDER_COMPLETE, TransactionType.ORDER_CANCEL]);
    } catch (error) {
        this.logger.error(`Error occurred while processing outbox: ${error.message}`, error.stack);
        this.isRunning = false;
    } finally {
        this.isProcessingLock = false;
    }
}

3. 상태 전이 및 재시도 처리

각 엔트리의 상태는 pendingprocessingprocessed/failed로 전이됩니다.
실패 시 retry_count를 증가시키고, 최대 재시도 초과 시 failed로 전환합니다.

private async processTransactionsByTypes(types: TransactionType[]) {
    const entries = await this.transactionOutboxRepository.getPendingOutboxEntries(10, types);
    for (const entry of entries) {
        // 처리 시작: processing 상태로 변경, retry_count 증가
        const updatedEntry = await this.transactionOutboxRepository.markOutboxEntryProcessing(entry.id);
        if (!updatedEntry) continue; // 이미 다른 프로세스에서 처리 중

        await this.processOutboxEntry(updatedEntry);
    }
}

async markOutboxEntryProcessing(transactionOutboxId: string): Promise<TransactionOutboxEntity | null> {
    const result = await this.pgUtil.pool.maybeOne<TransactionOutboxEntity>(sql`
        UPDATE p2p.transaction_outbox
        SET status = 'processing',
            last_attempt_time = NOW(),
            retry_count = retry_count + 1
        WHERE id = ${transactionOutboxId}
          AND status = 'pending'
        RETURNING *
    `);
    return result;
}

4. 실패/재시도 및 알림 처리

실패 시에는 retry_count를 증가시키고,
최대 재시도 횟수(MAX_RETRIES)를 초과하면 failed로 전환하며,
Slack 등 알림 시스템으로 장애를 즉시 감지할 수 있습니다.

private async processOutboxEntry(entry: TransactionOutboxEntity): Promise<void> {
    try {
        let success = false;
        // 트랜잭션 타입별 처리
        switch (entry.transaction_type) {
            case TransactionType.AD_CREATE:
                success = await this.processAdCreate(entry);
                break;
            // ... (생략)
        }
        if (success) {
            // 처리 성공: processed로 상태 변경
            await this.transactionOutboxRepository.transaction(async (conn) => {
                await this.transactionOutboxRepository.markOutboxEntryProcessed(conn, entry.id);
            });
        }
    } catch (error) {
        // 실패 시 재시도 또는 failed 처리
        if (entry.retry_count >= this.MAX_RETRIES) {
            await this.handleMaxRetryExceeded(entry, error);
        } else {
            await this.transactionOutboxRepository.markOutboxEntryRetry(entry.id, error.toString());
        }
    }
}

async markOutboxEntryRetry(transactionOutboxId: string, errorMessage: string): Promise<void> {
    await this.pgUtil.pool.oneFirst<string>(sql`
        UPDATE p2p.transaction_outbox
        SET status = 'pending',
            error_message = ${errorMessage}
        WHERE id = ${transactionOutboxId}
        RETURNING status
    `);
}

private async handleMaxRetryExceeded(entry: TransactionOutboxEntity, error: any) {
    await this.transactionOutboxRepository.transaction(async (conn) => {
        await this.transactionOutboxRepository.markOutboxEntryFailed(conn, entry.id, error.toString());
    });
    // Slack 등 알림 시스템 연동
    this.slackService.sendMessage(
        `[p2p-server-api escrow 에러 발생] Outbox 최대 재시도 초과 (수동介入 필요)`,
        error.message,
        error.stack,
    );
}

✅ 이 패턴을 도입한 효과

항목개선 전개선 후 (Outbox 도입)
메시지 유실 가능성높음없음 (트랜잭션 내 저장)
클라이언트 UX불안정안정된 처리 후 응답
장애 대응 및 추적 가능성부족로그 및 DB 기반 추적 가능, 이벤트 상태 기반 재시도/알림

⛏ 성능 및 운영 최적화 팁

  • Polling 빈도는 상황에 따라 조절 (짧게 하면 실시간성↑, 길게 하면 부하↓)
  • 멱등성 키 필수 (gRPC 서버에서 중복 요청 방지)
  • retry_count 제한 + 실패시 알림 시스템 연계 (Slack, Email 등)
  • Dead Letter Queue(DLQ) 연계 고려 (지속 실패 이벤트 별도 관리)
  • 장애 복구: 서버 재시작 시 processing 상태를 pending으로 자동 복구

🔚 마무리하며

처음에는 단순한 비동기 구조로 출발했지만,
메시지 유실이라는 실제 문제를 겪으면서
Transactional Outbox 패턴의 중요성을 절실히 느끼게 되었습니다.

이 패턴은 단순히 마이크로서비스 환경에 국한된 것이 아니라,
금융/거래/에스크로 시스템처럼 높은 신뢰성이 요구되는 분야에서
반드시 고려해야 할 설계 방식입니다.

비동기 처리를 "언제" 하느냐보다 중요한 것은,
그 비동기 처리가 "신뢰할 수 있느냐"는 점이다.