BlockingCollection
BlockingCollection<T>
는 .NET에서 제공하는 동시성 컬렉션으로, 스레드 간의 안전한 데이터 공유를 가능하게 하며, 주로 생산자-소비자Producer-Consumer 패턴을 구현하는 데 사용됩니다. BlockingCollection<T>
는 내부적으로 다양한 데이터 구조(IProducerConsumerCollection<T>
)를 지원하여, 큐queue 스택stack 등 여러 형태로 활용할 수 있습니다. 이는 스레드 안전성을 보장하며, 동기화 메커니즘을 자동으로 처리하여 개발자가 별도의 락lock 없이도 안전하게 데이터를 추가하거나 가져갈 수 있게 합니다.
BlockingCollection이란?
BlockingCollection<T>
는 스레드 간의 안전한 데이터 공유를 위해 설계된 동시성 컬렉션입니다. 이는 생산자 스레드가 데이터를 추가하고, 소비자 스레드가 데이터를 소비하는 구조에서 주로 사용됩니다. BlockingCollection<T>
는 내부적으로 IProducerConsumerCollection<T>
인터페이스를 구현한 다양한 컬렉션(예: ConcurrentQueue, ConcurrentStack)을 기반으로 동작할 수 있어, 큐queue 또는 스택stack 형태로 데이터를 관리할 수 있습니다.
주요 개념
- 생산자-소비자 패턴: 하나 이상의 생산자 스레드가 데이터를 생성하고, 하나 이상의 소비자 스레드가 데이터를 소비하는 패턴입니다.
- 블로킹 연산: 데이터가 없을 경우 소비자 스레드는 데이터가 추가될 때까지 대기하고, 컬렉션이 가득 찬 경우 생산자 스레드는 공간이 생길 때까지 대기합니다.
주요 특징
- 자동 동기화:
BlockingCollection<T>
는 내부적으로 동기화를 처리하여, 여러 스레드에서 안전하게 데이터를 추가하거나 제거할 수 있습니다. 별도의 락lock 없이도 스레드 안전성을 보장합니다. - 유연한 데이터 구조: 기본적으로 큐queue 형태로 동작하지만, 생성 시 다양한
IProducerConsumerCollection<T>
구현체를 지정하여 스택stack이나 다른 형태로 사용할 수 있습니다. - 한계 설정: 컬렉션에 저장될 수 있는 항목의 수를 제한하여, 생산자가 너무 많은 데이터를 생성하는 것을 방지할 수 있습니다.
- 블로킹 및 비블로킹 연산: 데이터가 없는 경우 소비자는 대기하고, 컬렉션이 가득 찬 경우 생산자는는 대기하게 됩니다. 또한, 타임아웃이나 취소 토큰을 사용한 비블로킹 연산도 지원합니다.
기본 사용법
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
public static void Main()
{
var collection = new BlockingCollection<int>(boundedCapacity: 5);
// 프로듀서 스레드
Task producer = Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
collection.Add(i);
Console.WriteLine($"Produced: {i}");
Thread.Sleep(100); // 생산 속도 조절
}
collection.CompleteAdding();
});
// 컨슈머 스레드
Task consumer = Task.Run(() =>
{
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {item}");
Thread.Sleep(150); // 소비 속도 조절
}
});
Task.WaitAll(producer, consumer);
}
}
BlockingCollection<int> collection = new BlockingCollection<int>(boundedCapacity: 5);
- 최대 5개의 항목을 저장할 수 있는 BlockingCollection을 생성합니다.
collection.Add(i);
- 생산자가 데이터를 추가하며, 컬렉션이 가득 찬 경우 대기합니다.
collection.GetConsumingEnumerable()
- 소비자가 데이터가 추가될 때까지 대기하며, 항목을 가져갑니다. 데이터가 더 이상 추가되지 않으면 반복이 종료됩니다.
BlockingCollection의 주요 메서드
Add(item)
: 컬렉션에 항목을 추가합니다. 컬렉션이 가득 찬 경우 공간이 생길 때까지 대기합니다.Take()
: 컬렉션에서 항목을 가져옵니다. 컬렉션이 비어 있으면 데이터가 추가될 때까지 대기합니다.CompleteAdding()
: 더 이상 항목이 추가되지 않음을 표시합니다. 소비자 측에서 반복을 종료하는 데 사용됩니다.TryAdd(item, timeout)
: 지정된 시간 동안 항목을 추가하려 시도합니다.TryTake(out item, timeout)
: 지정된 시간 동안 항목을 가져오려 시도합니다.GetConsumingEnumerable()
: 소비자가 데이터를 가져갈 때까지 대기하는 열거자를 반환합니다.
BlockingCollection 고급 활용
제한된 용량과 메모리 최적화
BlockingCollection<T>
는 용량 제한을 설정할 수 있어 메모리 사용량을 효율적으로 관리할 수 있습니다. 특히 대용량 데이터의 처리 시 메모리 오버로드를 방지하고, 제한된 크기 내에서 데이터를 관리해 성능을 최적화할 수 있습니다.
- 큐의 용량을 설정하여 대용량 데이터 추가 시 스레드가 데이터 공간이 생길 때까지 기다리도록 해 메모리를 효율적으로 사용합니다.
- 제한된 용량으로 대기 시간을 최소화하고, 소비자가 데이터를 빠르게 처리할 수 있도록 생산자와 소비자를 조율합니다.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
public static void Main()
{
var limitedCollection = new BlockingCollection<int>(boundedCapacity: 5);
// 프로듀서 스레드
Task producer = Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
limitedCollection.Add(i);
Console.WriteLine($"Produced: {i}");
Thread.Sleep(100); // 생산 속도 조절
}
limitedCollection.CompleteAdding();
});
// 컨슈머 스레드
Task consumer = Task.Run(() =>
{
foreach (var item in limitedCollection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {item}");
Thread.Sleep(150); // 소비 속도 조절
}
});
Task.WaitAll(producer, consumer);
}
}
파이프라인 처리 시스템 구현
BlockingCollection<T>
를 활용하여 여러 단계의 작업을 처리하는 파이프라인 시스템을 구현할 수 있습니다. 각각의 작업 단계는 BlockingCollection<T>
인스턴스로 연결되며, 각 단계에서 데이터를 처리한 후 다음 단계로 넘겨집니다.
- 여러
BlockingCollection<T>
를 연결하여 작업을 순차적으로 처리하고, 다중 스레드 파이프라인을 구현합니다. - 예를 들어, 이미지 처리 시스템에서 단계별로 이미지 읽기, 처리, 저장 등의 작업을 수행할 때 사용합니다.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
public static void Stage1Producer(BlockingCollection<int> output)
{
for (int i = 0; i < 10; i++)
{
output.Add(i);
Console.WriteLine($"Stage1 Produced: {i}");
Thread.Sleep(100);
}
output.CompleteAdding();
}
public static void StageProcessor(BlockingCollection<int> input, BlockingCollection<int> output, string stageName)
{
foreach (var item in input.GetConsumingEnumerable())
{
int processedItem = item * 2; // 예: 데이터 처리
output.Add(processedItem);
Console.WriteLine($"{stageName} Processed: {processedItem}");
Thread.Sleep(100);
}
output.CompleteAdding();
}
public static void Stage3Consumer(BlockingCollection<int> input)
{
foreach (var item in input.GetConsumingEnumerable())
{
Console.WriteLine($"Stage3 Consumed: {item}");
Thread.Sleep(100);
}
}
public static void Main()
{
var stage1 = new BlockingCollection<int>();
var stage2 = new BlockingCollection<int>();
var stage3 = new BlockingCollection<int>();
Task.Run(() => Stage1Producer(stage1));
Task.Run(() => StageProcessor(stage1, stage2, "Stage2"));
Task.Run(() => StageProcessor(stage2, stage3, "Stage3"));
Task.Run(() => Stage3Consumer(stage3));
// 모든 작업이 완료될 때까지 대기
Task.Delay(5000).Wait();
}
}
- 이 파이프라인 구조에서는 각 단계에서
BlockingCollection<T>
를 통해 데이터가 안전하게 전달됩니다. - 작업이 끝난 후
CompleteAdding()
을 호출해 각 단계가 완료되었음을 알립니다.
멀티 큐와 우선순위 작업 분배
BlockingCollection<T>
를 여러 개 생성하여 우선순위 큐처럼 사용할 수 있습니다. 우선순위에 따라 작업이 할당된 여러 큐Multi-Queue를 소비자 스레드가 동시에 처리함으로써, 중요한 작업을 우선적으로 처리하는 구조를 구현할 수 있습니다.
- 여러 큐를 우선순위별로 설정하고, 소비자가 우선순위가 높은 큐부터 작업을 처리합니다.
- 일정 시간이 지나면 낮은 우선순위 큐의 작업도 처리할 수 있도록 하여 우선순위 기반의 동적 작업 분배를 수행합니다.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
public static void Producer(BlockingCollection<int> highPriority, BlockingCollection<int> lowPriority)
{
for (int i = 0; i < 10; i++)
{
if (i % 3 == 0)
{
highPriority.Add(i);
Console.WriteLine($"Produced High Priority: {i}");
}
else
{
lowPriority.Add(i);
Console.WriteLine($"Produced Low Priority: {i}");
}
Thread.Sleep(100);
}
highPriority.CompleteAdding();
lowPriority.CompleteAdding();
}
public static void PriorityConsumer(BlockingCollection<int> highPriority, BlockingCollection<int> lowPriority)
{
while (!highPriority.IsCompleted || !lowPriority.IsCompleted)
{
int item;
if (highPriority.TryTake(out item, TimeSpan.FromMilliseconds(100)))
{
Console.WriteLine($"Consumed High Priority: {item}");
}
else if (lowPriority.TryTake(out item, TimeSpan.FromMilliseconds(100)))
{
Console.WriteLine($"Consumed Low Priority: {item}");
}
}
}
public static void Main()
{
var highPriorityQueue = new BlockingCollection<int>();
var lowPriorityQueue = new BlockingCollection<int>();
Task.Run(() => Producer(highPriorityQueue, lowPriorityQueue));
Task.Run(() => PriorityConsumer(highPriorityQueue, lowPriorityQueue));
// 모든 작업이 완료될 때까지 대기
Task.Delay(3000).Wait();
}
}
- 높은 우선순위 작업이 먼저 처리되고, 높은 우선순위 큐가 비어 있을 때 낮은 우선순위 큐의 작업이 처리됩니다.
- 이는 우선순위에 따른 작업 처리의 유연성을 제공합니다.
타임아웃 및 취소 기능 활용
BlockingCollection<T>
는 타임아웃과 취소 토큰을 지원하여 데이터 소비 시 대기 시간을 제어하거나 작업을 중단할 수 있습니다. 이 기능을 통해 특정 시간 내에 작업이 완료되지 않으면 대기 중인 스레드를 취소하거나 타임아웃이 발생할 수 있도록 설정할 수 있습니다.
- 데이터 처리 시간이 오래 걸릴 때, 지정된 시간 내에 완료되지 않으면 타임아웃을 발생시켜 다른 작업으로 넘어갑니다.
CancellationToken
을 통해 외부에서 작업을 중단할 수 있습니다.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
public static void Producer(BlockingCollection<int> collection, CancellationToken token)
{
try
{
for (int i = 0; i < 10; i++)
{
collection.Add(i, token);
Console.WriteLine($"Produced: {i}");
Thread.Sleep(100); // 생산 속도 조절
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Producer canceled.");
}
finally
{
collection.CompleteAdding();
}
}
public static void ConsumerWithTimeout(BlockingCollection<int> collection, CancellationToken token)
{
try
{
foreach (var item in collection.GetConsumingEnumerable(token))
{
Console.WriteLine($"Consumed: {item}");
Thread.Sleep(150); // 소비 속도 조절
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Consumer canceled.");
}
}
public static void Main()
{
var collection = new BlockingCollection<int>();
var cts = new CancellationTokenSource();
Task.Run(() => Producer(collection, cts.Token));
Task.Run(() => ConsumerWithTimeout(collection, cts.Token));
// 2초 후에 작업 취소
cts.CancelAfter(2000);
// 모든 작업이 완료될 때까지 대기
Task.Delay(3000).Wait();
}
}
- 타임아웃을 설정하여 지정된 시간이 초과되면 자동으로 작업을 취소합니다.
- CancellationToken을 사용하여 생산자와 소비자 작업을 외부에서 중단할 수 있습니다.
성능 고려사항
메모리 사용량
BlockingCollection<T>
는 내부적으로 사용하는 컬렉션의 특성에 따라 메모리 사용량이 달라집니다. 큐queue나 스택stack을 기반으로 할 때, 각 구조의 메모리 오버헤드를 고려해야 합니다.
생산자와 소비자의 속도 불일치
생산자가 소비자보다 훨씬 빠르게 데이터를 생성하면, 컬렉션이 가득 차 대기 상태가 될 수 있습니다. 반대로 소비자가 생산자보다 빠르면 컬렉션이 자주 비게 되어 효율성이 떨어질 수 있습니다.
동시성 수준
여러 스레드가 동시에 데이터를 추가하거나 제거할 경우, 내부 동기화 비용이 발생할 수 있습니다. 이는 성능에 영향을 줄 수 있으므로, 필요한 만큼의 동시성 수준을 설정하는 것이 중요합니다.
기본 컬렉션 선택
BlockingCollection<T>
를 생성할 때 사용하는 기본 컬렉션(ConcurrentQueue, ConcurrentStack 등)을 상황에 맞게 선택하여 성능을 최적화할 수 있습니다. 예를 들어, FIFOFirst-In-First-Out 방식이 필요한 경우 ConcurrentQueue를 사용하고, LIFOLast-In-First-Out 방식이 필요한 경우 ConcurrentStack를 사용하는 것이 적합합니다.