Java 동시성 컬렉션에서 Collections.synchronizedXxx()를 살펴보았다. 이번에는 그 한계를 극복한 java.util.concurrent 패키지와 고성능 동시성 컬렉션들을 알아보자
왜 별도의 동시성 컬렉션이 필요할까?
Collections.synchronized의 문제
- 모든 메서드에 단순하게 synchronized 적용
- 잠금 범위가 너무 넓음
- 한 번에 하나의 스레드만 접근 가능
- 성능 최적화 불가능
Concurrent 컬렉션의 해결책
- 정교한 잠금 메커니즘 (세그먼트 락, CAS 등)
- 최소한의 잠금 범위
- 여러 스레드의 동시 접근 지원
- 다양한 최적화 기법 적용
동시성 컬렉션 개요
java.util.concurrent 패키지 핵심 특징
- synchronized: 기본 동기화
- ReentrantLock: 고급 락
- CAS 연산: Compare-And-Swap (락 없는 동기화)
- volatile 변수: 가시성 보장
- 세그먼트 락: 분할 잠금 기술
- 락-프리 알고리즘: Non-blocking 알고리즘
제공되는 컬렉션 종류
| 타입 | 일반 컬렉션 | Concurrent 컬렉션 | 특징 |
| List | ArrayList | CopyOnWriteArrayList | 읽기 많을 때 최적 |
| Set | HashSet | CopyOnWriteArraySet | 읽기 많을 때 최적 |
| TreeSet | ConcurrentSkipListSet | 정렬 + 동시성 | |
| Map | HashMap | ConcurrentHashMap | 가장 많이 사용 |
| TreeMap | ConcurrentSkipListMap | 정렬 + 동시성 | |
| Queue | – | ConcurrentLinkedQueue | 비차단 큐 |
| Deque | ConcurrentLinkedDeque | 비차단 덱 |
- LinkedHashSet, LinkedHashMap의 동시성 버전은 제공되지 않는다. 필요시 Collections.synchronizedXxx()를 사용해야 한다
List 구현체
CopyOnWriteArrayList
ArrayList의 Thread-safe 버전
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class ListExample {
public static void main(String[] args) {
List<Integer> list = new CopyOnWriteArrayList<>();
list.add(1);
list.add(2);
list.add(3);
System.out.println("list = " + list);
}
}
출력
list = [1, 2, 3]
Copy-On-Write 전략
핵심 아이디어: 쓰기 시 전체 배열을 복사
// 쓰기 작업 (add, remove 등)
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 전체 배열 복사
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
// 읽기 작업 (get 등)
public E get(int index) {
// 동기화 없음
return get(getArray(), index);
}
장점
- 읽기 작업은 락 없이 매우 빠르다
- Iterator가 ConcurrentModificationException을 던지지 않는다
- 반복 중 수정이 가능하다
List<String> list = new CopyOnWriteArrayList<>();
list.add("A");
list.add("B");
list.add("C");
// 안전하게 반복 중 수정 가능
for (String item : list) {
if (item.equals("B")) {
list.remove(item); // OK
}
}
단점
- 쓰기 작업이 매우 느리다 (전체 배열 복사)
- 메모리 사용량이 증가한다
- 쓰기가 많으면 성능이 급격히 저하된다
사용 시나리오
// 적합: 읽기 작업이 쓰기보다 훨씬 많은 경우
// 예: 이벤트 리스너 리스트, 설정 정보
public class EventBus {
private final List<EventListener> listeners =
new CopyOnWriteArrayList<>();
public void register(EventListener listener) {
listeners.add(listener); // 가끔 발생
}
public void fireEvent(Event event) {
for (EventListener listener : listeners) { // 자주 발생
listener.onEvent(event);
}
}
}
// 부적합: 쓰기가 빈번한 경우
// 예: 실시간 로그 수집, 채팅 메시지 리스트
Set 구현체
CopyOnWriteArraySet
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
public class SetExample {
public static void main(String[] args) {
Set<Integer> set = new CopyOnWriteArraySet<>();
set.add(1);
set.add(2);
set.add(3);
set.add(2); // 중복 제거됨
System.out.println("copySet = " + set);
}
}
출력
copySet = [1, 2, 3]
특징
- HashSet의 대안
- 내부적으로 CopyOnWriteArrayList 사용
- 순서 보장이 안 된다(입력 순서가 나올 수도 있지만 보장하지 않음)
- 읽기 많을 때 최적
ConcurrentSkipListSet
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
public class SetExample {
public static void main(String[] args) {
Set<Integer> set = new ConcurrentSkipListSet<>();
// 순서와 관계없이 추가
set.add(3);
set.add(1);
set.add(2);
System.out.println("skipSet = " + set);
}
}
출력
skipSet = [1, 2, 3] // 항상 정렬된 순서
특징
- TreeSet의 대안
- 정렬된 순서를 유지한다
- Comparator 사용이 가능하다
- Skip List 자료구조 기반
// 커스텀 정렬
Set<String> set = new ConcurrentSkipListSet<>(
Comparator.comparing(String::length)
.thenComparing(String::compareTo)
);
set.add("apple");
set.add("pie");
set.add("banana");
System.out.println(set); // [pie, apple, banana] (길이순, 같으면 사전순)
Map 구현체
ConcurrentHashMap
가장 많이 사용되는 동시성 컬렉션
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MapExample {
public static void main(String[] args) {
Map<Integer, String> map = new ConcurrentHashMap<>();
map.put(3, "data3");
map.put(2, "data2");
map.put(1, "data1");
System.out.println("map = " + map);
}
}
출력
map = {1=data1, 2=data2, 3=data3}
세그먼트 락 메커니즘
// 개념적인 구조
전체 맵을 여러 세그먼트로 분할
┌──────────┬──────────┬──────────┬──────────┐
│Segment 0 │Segment 1 │Segment 2 │Segment 3 │
└──────────┴──────────┴──────────┴──────────┘
↓ ↓ ↓ ↓
Thread-1 Thread-2 Thread-3 Thread-4
→ 각 세그먼트는 독립적으로 락
→ 서로 다른 세그먼트는 동시 접근 가능
성능 최적화 기법
// 1. 읽기 작업 - 대부분 락 없음
V get(K key) {
// volatile 읽기로 최신 값 보장
// 락 획득 없이 빠르게 처리
}
// 2. 쓰기 작업 - 해당 세그먼트만 락
V put(K key, V value) {
int hash = hash(key);
int segment = hash & segmentMask;
// 해당 세그먼트만 락
synchronized (segments[segment]) {
// 실제 작업
}
}
// 3. CAS 연산 활용
boolean putIfAbsent(K key, V value) {
// Compare-And-Swap으로 락 없이 처리
}
유용한 메서드들
Map<String, Integer> map = new ConcurrentHashMap<>();
// 원자적 연산들 - 복합 작업도 안전
map.putIfAbsent("count", 0);
map.computeIfAbsent("count", k -> 0);
map.computeIfPresent("count", (k, v) -> v + 1);
map.merge("count", 1, Integer::sum);
// 일반 Map vs ConcurrentHashMap
// 일반 Map - 경쟁 상태 발생
if (!map.containsKey("count")) {
map.put("count", 1); // 다른 스레드가 끼어들 수 있음
}
// ConcurrentHashMap - 안전
map.putIfAbsent("count", 1); // 원자적 연산
실무 활용 예시
// 캐시 구현
public class CacheService {
private final Map<String, User> cache = new ConcurrentHashMap<>();
public User getUser(String id) {
return cache.computeIfAbsent(id, this::loadFromDatabase);
}
private User loadFromDatabase(String id) {
// DB 조회
return userRepository.findById(id);
}
}
// 카운터
public class AccessCounter {
private final Map<String, AtomicInteger> counts = new ConcurrentHashMap<>();
public void increment(String page) {
counts.computeIfAbsent(page, k -> new AtomicInteger())
.incrementAndGet();
}
public int getCount(String page) {
return counts.getOrDefault(page, new AtomicInteger()).get();
}
}
ConcurrentSkipListMap
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
public class MapExample {
public static void main(String[] args) {
Map<Integer, String> map = new ConcurrentSkipListMap<>();
map.put(2, "data2");
map.put(3, "data3");
map.put(1, "data1");
System.out.println("map = " + map);
}
}
출력
map = {1=data1, 2=data2, 3=data3} // 항상 키 순서로 정렬
특징
- TreeMap의 대안
- 정렬된 순서를 유지한다
- Comparator 사용 가능
- NavigableMap 인터페이스 구현
ConcurrentSkipListMap<String, Integer> map = new ConcurrentSkipListMap<>();
map.put("apple", 1);
map.put("banana", 2);
map.put("cherry", 3);
// NavigableMap 메서드 활용
System.out.println(map.firstKey()); // apple
System.out.println(map.lastKey()); // cherry
System.out.println(map.higherKey("apple")); // banana
Queue 구현체
ConcurrentLinkedQueue
비차단(Non-Blocking) 큐
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
public class QueueExample {
public static void main(String[] args) {
Queue<String> queue = new ConcurrentLinkedQueue<>();
queue.offer("task1");
queue.offer("task2");
queue.offer("task3");
System.out.println(queue.poll()); // task1
System.out.println(queue.poll()); // task2
System.out.println(queue.peek()); // task3 (제거 안 됨)
}
}
특징
- 무한 크기
- CAS 기반 구현 (락 없음)
- 매우 빠른 성능
- 큐기 비어도 블록하지 않음 (null 반환)
// 작업 큐 예시
public class TaskQueue {
private final Queue<Task> queue = new ConcurrentLinkedQueue<>();
public void addTask(Task task) {
queue.offer(task);
}
public Task getTask() {
return queue.poll(); // 없으면 null, 블록 안 함
}
}
ConcurrentLinkedDeque
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
Deque<String> deque = new ConcurrentLinkedDeque<>();
deque.addFirst("first");
deque.addLast("last");
System.out.println(deque.removeFirst()); // first
System.out.println(deque.removeLast()); // last
특징
- 양방향 큐
- 앞뒤 모두 추가/제거 가능
- 비차단 방식
BlockingQueue – 생산자-소비자 패턴
스레드를 차단(Block)하는 큐로 생산자 – 소비자 패턴의 핵심
ArrayBlockingQueue
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
// 생산자
new Thread(() -> {
try {
System.out.println("생산자: 데이터 추가");
queue.put("data"); // 큐가 가득 차면 대기
System.out.println("생산자: 추가 완료");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer").start();
// 소비자
new Thread(() -> {
try {
Thread.sleep(1000); // 1초 대기
System.out.println("소비자: 데이터 가져가기");
String data = queue.take(); // 큐가 비면 대기
System.out.println("소비자: " + data);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer").start();
}
}
출력
생산자: 데이터 추가
생산자: 추가 완료
(1초 대기)
소비자: 데이터 가져가기
소비자: data
특징
- 크기 고정
- 큐가 가득 차면 생산자 대기 (put)
- 큐가 비면 소비자 대기 (take)
- 공정(Fair) 모드 지원
// 공정 모드 - FIFO 순서로 스레드에게 락 부여 BlockingQueue<String> fairQueue = new ArrayBlockingQueue<>(10, true); // 일반 모드 - 성능은 좋지만 순서 보장 안 됨 BlockingQueue<String> normalQueue = new ArrayBlockingQueue<>(10, false);
BlockingQueue 메서드 비교
| 동작 | 예외 발생 | 특별한 값 반환 | 블록 | 타임아웃 |
| 삽입 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 제거 | remove() | poll() | take() | poll(time, unit) |
| 검사 | element() | peek() | – | – |
BlockingQueue<String> queue = new ArrayBlockingQueue<>(2);
// 1. 예외 발생
queue.add("A");
queue.add("B");
queue.add("C"); // IllegalStateException - 큐가 가득 참
// 2. 특별한 값 반환
queue.offer("A"); // true
queue.offer("B"); // true
queue.offer("C"); // false - 큐가 가득 참
// 3. 블록
queue.put("A"); // 성공
queue.put("B"); // 성공
queue.put("C"); // 여기서 대기... 공간이 생길 때까지
// 4. 타임아웃
boolean success = queue.offer("C", 1, TimeUnit.SECONDS);
// 1초 동안 대기, 실패하면 false 반환
LinkedBlockingQueue
// 무한 크기 BlockingQueue<String> unboundedQueue = new LinkedBlockingQueue<>(); // 크기 제한 BlockingQueue<String> boundedQueue = new LinkedBlockingQueue<>(100);
특징
- 크기 무한 또는 고정 가능
- 링크드 리스트 기반
- ArrayBlockingQueue보다 처리량 높음
- 생산자와 소비자가 다른 락 사용(더 나은 동시성)
// 실무 예시: 작업 큐
public class WorkerPool {
private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
private final List<Thread> workers = new ArrayList<>();
public WorkerPool(int workerCount) {
for (int i = 0; i < workerCount; i++) {
Thread worker = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Runnable task = taskQueue.take(); // 작업 대기
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
worker.start();
workers.add(worker);
}
}
public void submit(Runnable task) {
try {
taskQueue.put(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
PriorityBlockingQueue
import java.util.concurrent.PriorityBlockingQueue;
public class PriorityQueueExample {
static class Task implements Comparable<Task> {
String name;
int priority;
Task(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public int compareTo(Task o) {
return Integer.compare(this.priority, o.priority);
}
@Override
public String toString() {
return name + "(우선순위: " + priority + ")";
}
}
public static void main(String[] args) throws InterruptedException {
PriorityBlockingQueue<Task> queue = new PriorityBlockingQueue<>();
queue.put(new Task("낮은 작업", 3));
queue.put(new Task("높은 작업", 1));
queue.put(new Task("중간 작업", 2));
System.out.println(queue.take()); // 높은 작업(우선순위: 1)
System.out.println(queue.take()); // 중간 작업(우선순위: 2)
System.out.println(queue.take()); // 낮은 작업(우선순위: 3)
}
}
특징
- 우선순위 기반 처리
- 힙(Heap) 자료 구조
- 무한 크기
- put()은 블록 안 함(무한 크기라서)
SynchronousQueue – 직거래 큐
import java.util.concurrent.SynchronousQueue;
public class SynchronousQueueExample {
public static void main(String[] args) {
SynchronousQueue<String> queue = new SynchronousQueue<>();
// 생산자
new Thread(() -> {
try {
System.out.println("생산자: 대기 중...");
queue.put("data");
System.out.println("생산자: 전달 완료");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 소비자 (2초 후 실행)
new Thread(() -> {
try {
Thread.sleep(2000);
System.out.println("소비자: 수신 - " + queue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
출력
생산자: 대기 중...
(2초 대기 - 생산자가 블록됨)
소비자: 수신 - data
생산자: 전달 완료
특징
- 크기가 0인 큐(버퍼 없음)
- 생산자와 소비자가 직접 핸드오프
- 생산자는 소비자가 받을 때까지 대기
- 소비자는 생산자가 줄 때가지 대기
- 매우 빠른 직접 전달
DelayQueue
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueExample {
static class DelayedTask implements Delayed {
private final String name;
private final long startTime;
DelayedTask(String name, long delayMillis) {
this.name = name;
this.startTime = System.currentTimeMillis() + delayMillis;
}
@Override
public long getDelay(TimeUnit unit) {
long diff = startTime - System.currentTimeMillis();
return unit.convert(diff, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.startTime, ((DelayedTask) o).startTime);
}
@Override
public String toString() {
return name;
}
}
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
System.out.println("작업 추가: " + System.currentTimeMillis());
queue.put(new DelayedTask("3초 후 작업", 3000));
queue.put(new DelayedTask("1초 후 작업", 1000));
queue.put(new DelayedTask("2초 후 작업", 2000));
while (!queue.isEmpty()) {
DelayedTask task = queue.take(); // 지연 시간까지 대기
System.out.println(System.currentTimeMillis() + ": " + task);
}
}
}
출력
작업 추가: 1704067200000
1704067201000: 1초 후 작업
1704067202000: 2초 후 작업
1704067203000: 3초 후 작업
특징
- 지정된 지연 시간 후 처리
- 스케줄링 작업에 유용
- 만료 시간 관리에 최적
컬렉션 선택 플로우차트
시작
↓
단일 스레드인가?
├─ Yes → ArrayList, HashMap 등 일반 컬렉션
└─ No
↓
List가 필요한가?
├─ Yes
│ └─ 읽기가 쓰기보다 훨씬 많은가?
│ ├─ Yes → CopyOnWriteArrayList
│ └─ No → Collections.synchronizedList()
│
Set이 필요한가?
├─ Yes
│ ├─ 정렬이 필요한가?
│ │ ├─ Yes → ConcurrentSkipListSet
│ │ └─ No → CopyOnWriteArraySet
│
Map이 필요한가?
├─ Yes
│ ├─ 정렬이 필요한가?
│ │ ├─ Yes → ConcurrentSkipListMap
│ │ └─ No → ConcurrentHashMap ⭐
│
Queue가 필요한가?
└─ Yes
├─ 블록킹이 필요한가?
│ ├─ Yes → ArrayBlockingQueue / LinkedBlockingQueue
│ └─ No → ConcurrentLinkedQueue
└─ 우선순위가 필요한가?
└─ Yes → PriorityBlockingQueue
실무 상황별 추천
캐시 구현
// ConcurrentHashMap
public class Cache<K, V> {
private final Map<K, V> cache = new ConcurrentHashMap<>();
public V get(K key, Function<K, V> loader) {
return cache.computeIfAbsent(key, loader);
}
public void invalidate(K key) {
cache.remove(key);
}
}
이벤트 리스너 관리
// CopyOnWriteArrayList
public class EventBus {
private final List<EventListener> listeners =
new CopyOnWriteArrayList<>();
public void register(EventListener listener) {
listeners.add(listener); // 가끔
}
public void fire(Event event) {
for (EventListener listener : listeners) { // 자주
listener.onEvent(event);
}
}
}
작업 큐 (ThreadPool)
// LinkedBlockingQueue
public class SimpleThreadPool {
private final BlockingQueue<Runnable> workQueue;
private final List<Worker> workers;
public SimpleThreadPool(int threadCount, int queueSize) {
this.workQueue = new LinkedBlockingQueue<>(queueSize);
this.workers = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
Worker worker = new Worker(workQueue);
worker.start();
workers.add(worker);
}
}
public void submit(Runnable task) throws InterruptedException {
workQueue.put(task);
}
private static class Worker extends Thread {
private final BlockingQueue<Runnable> queue;
Worker(BlockingQueue<Runnable> queue) {
this.queue = queue;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Runnable task = queue.take();
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
통계/카운터
// ⭐ 추천: ConcurrentHashMap + AtomicInteger
public class Statistics {
private final Map<String, AtomicInteger> counters =
new ConcurrentHashMap<>();
public void increment(String key) {
counters.computeIfAbsent(key, k -> new AtomicInteger())
.incrementAndGet();
}
public Map<String, Integer> getSnapshot() {
Map<String, Integer> snapshot = new HashMap<>();
counters.forEach((k, v) -> snapshot.put(k, v.get()));
return snapshot;
}
}
주의 사항
복합 연산은 여전히 주의 필요
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 잘못된 방법
if (!map.containsKey("count")) {
map.put("count", 0); // 경쟁 상태!
}
// 올바른 방법
map.putIfAbsent("count", 0);
반복 중 수정
// CopyOnWriteArrayList - 안전
List<String> list = new CopyOnWriteArrayList<>();
for (String item : list) {
list.remove(item); // OK
}
// ConcurrentHashMap - 약한 일관성
Map<String, String> map = new ConcurrentHashMap<>();
for (String key : map.keySet()) {
map.remove(key); // 일부만 제거될 수 있음
}
LinkedHashMap 동시성 버전 없음
// 동시성 구현 없음 // LinkedHashSet, LinkedHashMap // 필요시 이렇게 Set<String> set = Collections.synchronizedSet(new LinkedHashSet<>()); Map<String, String> map = Collections.synchronizedMap(new LinkedHashMap<>());
핵심 원칙
- 단일 스레드 → 일반 컬렉션 (가장 빠르다)
- 멀티 스레드 + 읽기 많음 → CopyOnWrite 계열
- 멀티 스레드 + 일반적 → Concurrent 계열 (특히 ConcurrentHashMap)
- 생산자-소비자 → BlockingQueue 계열
실무에서 조심해야할 것
// 대부분의 경우 이것만 알아도 충분 Map<K, V> map = new ConcurrentHashMap<>(); // 99% 케이스 List<T> list = new CopyOnWriteArrayList<>(); // 리스너, 설정 BlockingQueue<T> queue = new LinkedBlockingQueue<>(); // 작업 큐 // 정렬이 필요한 경우만 Map<K, V> sortedMap = new ConcurrentSkipListMap<>(); Set<T> sortedSet = new ConcurrentSkipListSet<>();
절대 잊지 말 것
멀티스레드 환경에서 일반 컬렉션을 사용하면 해결하기 매우 어려운 버그가 발생한다