문제 정의
생산자 – 소비자 문제는 한정된 크기의 버퍼를 통해 데이터를 주고 받는 멀티스레드 환경에서 발생하는 동시성 문제이다
- 생산자 (Producer): 데이터를 생성하여 버퍼에 저장
- 소비자 (Consumer): 버퍼에서 데이터를 꺼내 소비
- 버퍼 (Buffer): 한정된 크기의 공유자원
핵심 문제
- 버퍼가 가득 차면 생산자는 대기해야 한다
- 버퍼가 비면 소비자는 대기해야 한다
해결 방안의 진화
단순 synchronized (문제 발생)
public synchronized void put(String data) {
if (queue.size() == max) {
log("큐가 가득 참, 버림: " + data);
return; // 데이터 버림
}
queue.offer(data);
}
public synchronized String take() {
if (queue.isEmpty()) {
return null; // null 반환
}
return queue.poll();
}
- 문제점: 버퍼가 가득 차면 데이터를 버리고 버퍼가 비면 null을 반환한다
sleep()으로 대기 (더 심각한 문제)
public synchronized void put(String data) {
while (queue.size() == max) {
log("큐가 가득 참, 생산자 대기");
sleep(1000); // 락을 잡은 채로 대기!
}
queue.offer(data);
}
- 문제점: 락을 보유한 채 sleep하여 다른 스레드가 진입 불가 → 무한 대기 (데드락)
wait/nofity 사용 (비효율 존재)
public synchronized void put(String data) {
while (queue.size() == max) {
log("큐가 가득 참, 생산자 대기");
wait(); // 락 반납하고 대기
log("생산자 깨어남");
}
queue.offer(data);
notify(); // 대기 중인 스레드 하나 깨움
}
public synchronized String take() {
while (queue.isEmpty()) {
log("큐에 데이터가 없음, 소비자 대기");
wait(); // 락 반납하고 대기
log("소비자 깨어남");
}
String data = queue.poll();
notify(); // 대기 중인 스레드 하나 깨움
return data;
}
- 개선점: 락을 반납하고 대기하여 무한 대기 문제 해결
- 한계점
- 생산자/소비자가 같은 대기 집한(wait set) 사용한다
- notify()가 같은 종류의 스레드를 꺠울 수 있다 (비효율)
- 예시: 소비자가 소비자를 꺠우면 큐가 비어있어 다시 대기한다
Lock + Condition으로 개선
핵심은 생산자와 소비자의 대기 공간을 분리하는 것이다
public class BoundedQueueV5 implements BoundedQueue {
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition(); // 생산자 전용
private final Condition consumerCond = lock.newCondition(); // 소비자 전용
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("큐가 가득 참, 생산자 대기");
producerCond.await(); // 생산자 전용 대기 공간에서 대기
log("생산자 깨어남");
}
queue.offer(data);
log("생산자 데이터 저장, consumerCond.signal() 호출");
consumerCond.signal(); // 소비자만 깨움!
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("큐에 데이터가 없음, 소비자 대기");
consumerCond.await(); // 소비자 전용 대기 공간에서 대기
log("소비자 깨어남");
}
String data = queue.poll();
log("소비자 데이터 획득, producerCond.signal() 호출");
producerCond.signal(); // 생산자만 깨움!
return data;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
- 장점
- 생산자는 항상 소비자를 깨우고 소비자는 항상 생산자를 깨운다.
- 같은 종류의 스레드를 깨우는 비효율을 제거한다
BlockingQueue – 자바 표준 API
실무에서는 직접 구현하는 것보다 java.util.concurrent.BlockingQueue를 사용한다
public class BoundedQueueV6_1 implements BoundedQueue {
private BlockingQueue<String> queue;
public BoundedQueueV6_1(int max) {
queue = new ArrayBlockingQueue<>(max);
}
public void put(String data) {
try {
queue.put(data); // 버퍼가 가득 차면 대기
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public String take() {
try {
return queue.take(); // 버퍼가 비면 대기
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
BlockingQueue의 다양한 메서드
| 동작 | 예외 발생 | 즉시 반환 | 대기 | 시간 제한 대기 |
| 추가 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 제거 | remove() | poll() | take() | poll(time, unit) |
| 조회 | element() | peek() | – | – |
사용 예시
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 1. 예외 발생 방식
queue.add("data"); // 가득 차면 IllegalStateException
// 2. 즉시 반환 방식
boolean result = queue.offer("data"); // 가득 차면 false 반환
// 3. 무한 대기 방식
queue.put("data"); // 공간 생길 때까지 대기
// 4. 시간 제한 대기
boolean result = queue.offer("data", 1, TimeUnit.SECONDS); // 1초만 대기
주요 구현체
- ArrayBlockingQueue: 배열 기반, 고정 크기
- LinkedBlockingQueue: 링크 기반, 크기 조정 가능
스레드 대기 메커니즘 비교
synchronized + wait/notify
모니터 락 → 락 대기 집합(BLOCKED) → 스레드 대기 집합(WAITING)
↑ ↓
└────────── notify() ─────┘
ReentrantLock + Condition
ReentrantLock → 대기 큐(WAITING) → Condition 대기 공간(WAITING)
↑ ↓
└────────── signal() ─────┘
- 차이점
- ReentrantLock은 여러 Condition 생성 가능 (대기 공간 분리)
- Condition.signal()은 일반적으로 FIFO 순서로 깨운다
- Object.notify()는 임의의 스레드를 깨운다
주의사항
while 루프 사용이 필수
// 잘못된 예 (if 사용)
if (queue.isEmpty()) {
wait();
}
// 올바른 예 (while 사용)
while (queue.isEmpty()) {
wait(); // Spurious wakeup 대응
}
notifyAll() vs signal()
- notifyAll(): 모든 대기 스레드 깨움 (비효율적이지만 안전하다)
- signal(): 하나만 깨운다 (효율적이지만 정확한 조건 필요)
- Condition 분리 시 signal() 사용이 가능하다
InterruptedException 처리
- BlockingQueue 메서드들은 인터럽트 가능하다
- 적절한 예외 처리가 필요하다
실무 권장사항
- BlockingQueue 사용: 직접 구현하지 말고 자바 표준 AP를 활용
- 적절한 메서드 선택
- 중요한 데이터: put() / take() (대기)
- 타임아웃 필요: offer(e, time) / poll(time) (시간 제한)
- 빠른 실패 필요: offer() / poll() (즉시 반환)
- 적절한 큐 크기 설정: 너무 작으면 대기가 빈번해지고 너무 크면 메모리 낭비가 심해진다