생산자 – 소비자 문제와 BlockingQueue

문제 정의

생산자 – 소비자 문제는 한정된 크기의 버퍼를 통해 데이터를 주고 받는 멀티스레드 환경에서 발생하는 동시성 문제이다

  • 생산자 (Producer): 데이터를 생성하여 버퍼에 저장
  • 소비자 (Consumer): 버퍼에서 데이터를 꺼내 소비
  • 버퍼 (Buffer): 한정된 크기의 공유자원

핵심 문제

  • 버퍼가 가득 차면 생산자는 대기해야 한다
  • 버퍼가 비면 소비자는 대기해야 한다

해결 방안의 진화

단순 synchronized (문제 발생)

public synchronized void put(String data) {
    if (queue.size() == max) {
        log("큐가 가득 참, 버림: " + data);
        return;  // 데이터 버림
    }
    queue.offer(data);
}

public synchronized String take() {
    if (queue.isEmpty()) {
        return null;  // null 반환
    }
    return queue.poll();
}
  • 문제점: 버퍼가 가득 차면 데이터를 버리고 버퍼가 비면 null을 반환한다

sleep()으로 대기 (더 심각한 문제)

public synchronized void put(String data) {
    while (queue.size() == max) {
        log("큐가 가득 참, 생산자 대기");
        sleep(1000);  // 락을 잡은 채로 대기!
    }
    queue.offer(data);
}
  • 문제점: 락을 보유한 채 sleep하여 다른 스레드가 진입 불가 → 무한 대기 (데드락)

wait/nofity 사용 (비효율 존재)

public synchronized void put(String data) {
    while (queue.size() == max) {
        log("큐가 가득 참, 생산자 대기");
        wait();  // 락 반납하고 대기
        log("생산자 깨어남");
    }
    queue.offer(data);
    notify();  // 대기 중인 스레드 하나 깨움
}

public synchronized String take() {
    while (queue.isEmpty()) {
        log("큐에 데이터가 없음, 소비자 대기");
        wait();  // 락 반납하고 대기
        log("소비자 깨어남");
    }
    String data = queue.poll();
    notify();  // 대기 중인 스레드 하나 깨움
    return data;
}
  • 개선점: 락을 반납하고 대기하여 무한 대기 문제 해결
  • 한계점
    • 생산자/소비자가 같은 대기 집한(wait set) 사용한다
    • notify()가 같은 종류의 스레드를 꺠울 수 있다 (비효율)
    • 예시: 소비자가 소비자를 꺠우면 큐가 비어있어 다시 대기한다

Lock + Condition으로 개선

핵심은 생산자와 소비자의 대기 공간을 분리하는 것이다

public class BoundedQueueV5 implements BoundedQueue {
    private final Lock lock = new ReentrantLock();
    private final Condition producerCond = lock.newCondition();  // 생산자 전용
    private final Condition consumerCond = lock.newCondition();  // 소비자 전용
    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    @Override
    public void put(String data) {
        lock.lock();
        try {
            while (queue.size() == max) {
                log("큐가 가득 참, 생산자 대기");
                producerCond.await();  // 생산자 전용 대기 공간에서 대기
                log("생산자 깨어남");
            }
            queue.offer(data);
            log("생산자 데이터 저장, consumerCond.signal() 호출");
            consumerCond.signal();  // 소비자만 깨움!
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                log("큐에 데이터가 없음, 소비자 대기");
                consumerCond.await();  // 소비자 전용 대기 공간에서 대기
                log("소비자 깨어남");
            }
            String data = queue.poll();
            log("소비자 데이터 획득, producerCond.signal() 호출");
            producerCond.signal();  // 생산자만 깨움!
            return data;
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }
    }
}
  • 장점
    • 생산자는 항상 소비자를 깨우고 소비자는 항상 생산자를 깨운다.
    • 같은 종류의 스레드를 깨우는 비효율을 제거한다

BlockingQueue – 자바 표준 API

실무에서는 직접 구현하는 것보다 java.util.concurrent.BlockingQueue를 사용한다

public class BoundedQueueV6_1 implements BoundedQueue {
    private BlockingQueue<String> queue;

    public BoundedQueueV6_1(int max) {
        queue = new ArrayBlockingQueue<>(max);
    }

    public void put(String data) {
        try {
            queue.put(data);  // 버퍼가 가득 차면 대기
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public String take() {
        try {
            return queue.take();  // 버퍼가 비면 대기
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

BlockingQueue의 다양한 메서드

동작예외 발생즉시 반환대기시간 제한 대기
추가add(e)offer(e)put(e)offer(e, time, unit)
제거remove()poll()take()poll(time, unit)
조회element()peek()

사용 예시

BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);

// 1. 예외 발생 방식
queue.add("data");  // 가득 차면 IllegalStateException

// 2. 즉시 반환 방식  
boolean result = queue.offer("data");  // 가득 차면 false 반환

// 3. 무한 대기 방식
queue.put("data");  // 공간 생길 때까지 대기

// 4. 시간 제한 대기
boolean result = queue.offer("data", 1, TimeUnit.SECONDS);  // 1초만 대기

주요 구현체

  • ArrayBlockingQueue: 배열 기반, 고정 크기
  • LinkedBlockingQueue: 링크 기반, 크기 조정 가능

스레드 대기 메커니즘 비교

synchronized + wait/notify

모니터 락 → 락 대기 집합(BLOCKED) → 스레드 대기 집합(WAITING)
↑ ↓
└────────── notify() ─────┘

ReentrantLock + Condition

ReentrantLock → 대기 큐(WAITING) → Condition 대기 공간(WAITING)
↑ ↓
└────────── signal() ─────┘
  • 차이점
    • ReentrantLock은 여러 Condition 생성 가능 (대기 공간 분리)
    • Condition.signal()은 일반적으로 FIFO 순서로 깨운다
    • Object.notify()는 임의의 스레드를 깨운다

주의사항

while 루프 사용이 필수
// 잘못된 예 (if 사용)
if (queue.isEmpty()) {
    wait();
}
   
// 올바른 예 (while 사용)
while (queue.isEmpty()) {
    wait();  // Spurious wakeup 대응
}
notifyAll() vs signal()
  • notifyAll(): 모든 대기 스레드 깨움 (비효율적이지만 안전하다)
  • signal(): 하나만 깨운다 (효율적이지만 정확한 조건 필요)
  • Condition 분리 시 signal() 사용이 가능하다
InterruptedException 처리
  • BlockingQueue 메서드들은 인터럽트 가능하다
  • 적절한 예외 처리가 필요하다

실무 권장사항

  • BlockingQueue 사용: 직접 구현하지 말고 자바 표준 AP를 활용
  • 적절한 메서드 선택
    • 중요한 데이터: put() / take() (대기)
    • 타임아웃 필요: offer(e, time) / poll(time) (시간 제한)
    • 빠른 실패 필요: offer() / poll() (즉시 반환)
  • 적절한 큐 크기 설정: 너무 작으면 대기가 빈번해지고 너무 크면 메모리 낭비가 심해진다

출처 – 김영한 님의 강의 중 김영한의 실전 자바 – 고급 1편, 멀티스레드와 동시성