BlockingCollection

BlockingCollection<T>는 .NET에서 제공하는 동시성 컬렉션으로, 스레드 간의 안전한 데이터 공유를 가능하게 하며, 주로 생산자-소비자Producer-Consumer 패턴을 구현하는 데 사용됩니다. BlockingCollection<T>는 내부적으로 다양한 데이터 구조(IProducerConsumerCollection<T>)를 지원하여, queue 스택stack 등 여러 형태로 활용할 수 있습니다. 이는 스레드 안전성을 보장하며, 동기화 메커니즘을 자동으로 처리하여 개발자가 별도의 락lock 없이도 안전하게 데이터를 추가하거나 가져갈 수 있게 합니다.

BlockingCollection이란?

BlockingCollection<T>는 스레드 간의 안전한 데이터 공유를 위해 설계된 동시성 컬렉션입니다. 이는 생산자 스레드가 데이터를 추가하고, 소비자 스레드가 데이터를 소비하는 구조에서 주로 사용됩니다. BlockingCollection<T>는 내부적으로 IProducerConsumerCollection<T> 인터페이스를 구현한 다양한 컬렉션(예: ConcurrentQueue, ConcurrentStack)을 기반으로 동작할 수 있어, 큐queue 또는 스택stack 형태로 데이터를 관리할 수 있습니다.

주요 개념

  • 생산자-소비자 패턴: 하나 이상의 생산자 스레드가 데이터를 생성하고, 하나 이상의 소비자 스레드가 데이터를 소비하는 패턴입니다.
  • 블로킹 연산: 데이터가 없을 경우 소비자 스레드는 데이터가 추가될 때까지 대기하고, 컬렉션이 가득 찬 경우 생산자 스레드는 공간이 생길 때까지 대기합니다.

주요 특징

  • 자동 동기화: BlockingCollection<T>는 내부적으로 동기화를 처리하여, 여러 스레드에서 안전하게 데이터를 추가하거나 제거할 수 있습니다. 별도의 락lock 없이도 스레드 안전성을 보장합니다.
  • 유연한 데이터 구조: 기본적으로 큐queue 형태로 동작하지만, 생성 시 다양한 IProducerConsumerCollection<T> 구현체를 지정하여 스택stack이나 다른 형태로 사용할 수 있습니다.
  • 한계 설정: 컬렉션에 저장될 수 있는 항목의 수를 제한하여, 생산자가 너무 많은 데이터를 생성하는 것을 방지할 수 있습니다.
  • 블로킹 및 비블로킹 연산: 데이터가 없는 경우 소비자는 대기하고, 컬렉션이 가득 찬 경우 생산자는는 대기하게 됩니다. 또한, 타임아웃이나 취소 토큰을 사용한 비블로킹 연산도 지원합니다.

기본 사용법

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
    public static void Main()
    {
        var collection = new BlockingCollection<int>(boundedCapacity: 5);
        // 프로듀서 스레드
        Task producer = Task.Run(() =>
        {
            for (int i = 0; i < 10; i++)
            {
                collection.Add(i);
                Console.WriteLine($"Produced: {i}");
                Thread.Sleep(100); // 생산 속도 조절
            }
            collection.CompleteAdding();
        });
        // 컨슈머 스레드
        Task consumer = Task.Run(() =>
        {
            foreach (var item in collection.GetConsumingEnumerable())
            {
                Console.WriteLine($"Consumed: {item}");
                Thread.Sleep(150); // 소비 속도 조절
            }
        });
        Task.WaitAll(producer, consumer);
    }
}
  • BlockingCollection<int> collection = new BlockingCollection<int>(boundedCapacity: 5);
    • 최대 5개의 항목을 저장할 수 있는 BlockingCollection을 생성합니다.
  • collection.Add(i);
    • 생산자가 데이터를 추가하며, 컬렉션이 가득 찬 경우 대기합니다.
  • collection.GetConsumingEnumerable()
    • 소비자가 데이터가 추가될 때까지 대기하며, 항목을 가져갑니다. 데이터가 더 이상 추가되지 않으면 반복이 종료됩니다.

BlockingCollection의 주요 메서드

  • Add(item): 컬렉션에 항목을 추가합니다. 컬렉션이 가득 찬 경우 공간이 생길 때까지 대기합니다.
  • Take(): 컬렉션에서 항목을 가져옵니다. 컬렉션이 비어 있으면 데이터가 추가될 때까지 대기합니다.
  • CompleteAdding(): 더 이상 항목이 추가되지 않음을 표시합니다. 소비자 측에서 반복을 종료하는 데 사용됩니다.
  • TryAdd(item, timeout): 지정된 시간 동안 항목을 추가하려 시도합니다.
  • TryTake(out item, timeout): 지정된 시간 동안 항목을 가져오려 시도합니다.
  • GetConsumingEnumerable(): 소비자가 데이터를 가져갈 때까지 대기하는 열거자를 반환합니다.

BlockingCollection 고급 활용

제한된 용량과 메모리 최적화

BlockingCollection<T>는 용량 제한을 설정할 수 있어 메모리 사용량을 효율적으로 관리할 수 있습니다. 특히 대용량 데이터의 처리 시 메모리 오버로드를 방지하고, 제한된 크기 내에서 데이터를 관리해 성능을 최적화할 수 있습니다.

  • 큐의 용량을 설정하여 대용량 데이터 추가 시 스레드가 데이터 공간이 생길 때까지 기다리도록 해 메모리를 효율적으로 사용합니다.
  • 제한된 용량으로 대기 시간을 최소화하고, 소비자가 데이터를 빠르게 처리할 수 있도록 생산자와 소비자를 조율합니다.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
    public static void Main()
    {
        var limitedCollection = new BlockingCollection<int>(boundedCapacity: 5);
        // 프로듀서 스레드
        Task producer = Task.Run(() =>
        {
            for (int i = 0; i < 10; i++)
            {
                limitedCollection.Add(i);
                Console.WriteLine($"Produced: {i}");
                Thread.Sleep(100); // 생산 속도 조절
            }
            limitedCollection.CompleteAdding();
        });
        // 컨슈머 스레드
        Task consumer = Task.Run(() =>
        {
            foreach (var item in limitedCollection.GetConsumingEnumerable())
            {
                Console.WriteLine($"Consumed: {item}");
                Thread.Sleep(150); // 소비 속도 조절
            }
        });
        Task.WaitAll(producer, consumer);
    }
}

파이프라인 처리 시스템 구현

BlockingCollection<T>를 활용하여 여러 단계의 작업을 처리하는 파이프라인 시스템을 구현할 수 있습니다. 각각의 작업 단계는 BlockingCollection<T> 인스턴스로 연결되며, 각 단계에서 데이터를 처리한 후 다음 단계로 넘겨집니다.

  • 여러 BlockingCollection<T>를 연결하여 작업을 순차적으로 처리하고, 다중 스레드 파이프라인을 구현합니다.
  • 예를 들어, 이미지 처리 시스템에서 단계별로 이미지 읽기, 처리, 저장 등의 작업을 수행할 때 사용합니다.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
    public static void Stage1Producer(BlockingCollection<int> output)
    {
        for (int i = 0; i < 10; i++)
        {
            output.Add(i);
            Console.WriteLine($"Stage1 Produced: {i}");
            Thread.Sleep(100);
        }
        output.CompleteAdding();
    }
    public static void StageProcessor(BlockingCollection<int> input, BlockingCollection<int> output, string stageName)
    {
        foreach (var item in input.GetConsumingEnumerable())
        {
            int processedItem = item * 2; // 예: 데이터 처리
            output.Add(processedItem);
            Console.WriteLine($"{stageName} Processed: {processedItem}");
            Thread.Sleep(100);
        }
        output.CompleteAdding();
    }
    public static void Stage3Consumer(BlockingCollection<int> input)
    {
        foreach (var item in input.GetConsumingEnumerable())
        {
            Console.WriteLine($"Stage3 Consumed: {item}");
            Thread.Sleep(100);
        }
    }
    public static void Main()
    {
        var stage1 = new BlockingCollection<int>();
        var stage2 = new BlockingCollection<int>();
        var stage3 = new BlockingCollection<int>();
        Task.Run(() => Stage1Producer(stage1));
        Task.Run(() => StageProcessor(stage1, stage2, "Stage2"));
        Task.Run(() => StageProcessor(stage2, stage3, "Stage3"));
        Task.Run(() => Stage3Consumer(stage3));
        // 모든 작업이 완료될 때까지 대기
        Task.Delay(5000).Wait();
    }
}
  • 이 파이프라인 구조에서는 각 단계에서 BlockingCollection<T>를 통해 데이터가 안전하게 전달됩니다.
  • 작업이 끝난 후 CompleteAdding()을 호출해 각 단계가 완료되었음을 알립니다.

멀티 큐와 우선순위 작업 분배

BlockingCollection<T>를 여러 개 생성하여 우선순위 큐처럼 사용할 수 있습니다. 우선순위에 따라 작업이 할당된 여러 큐Multi-Queue를 소비자 스레드가 동시에 처리함으로써, 중요한 작업을 우선적으로 처리하는 구조를 구현할 수 있습니다.

  • 여러 큐를 우선순위별로 설정하고, 소비자가 우선순위가 높은 큐부터 작업을 처리합니다.
  • 일정 시간이 지나면 낮은 우선순위 큐의 작업도 처리할 수 있도록 하여 우선순위 기반의 동적 작업 분배를 수행합니다.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
    public static void Producer(BlockingCollection<int> highPriority, BlockingCollection<int> lowPriority)
    {
        for (int i = 0; i < 10; i++)
        {
            if (i % 3 == 0)
            {
                highPriority.Add(i);
                Console.WriteLine($"Produced High Priority: {i}");
            }
            else
            {
                lowPriority.Add(i);
                Console.WriteLine($"Produced Low Priority: {i}");
            }
            Thread.Sleep(100);
        }
        highPriority.CompleteAdding();
        lowPriority.CompleteAdding();
    }
    public static void PriorityConsumer(BlockingCollection<int> highPriority, BlockingCollection<int> lowPriority)
    {
        while (!highPriority.IsCompleted || !lowPriority.IsCompleted)
        {
            int item;
            if (highPriority.TryTake(out item, TimeSpan.FromMilliseconds(100)))
            {
                Console.WriteLine($"Consumed High Priority: {item}");
            }
            else if (lowPriority.TryTake(out item, TimeSpan.FromMilliseconds(100)))
            {
                Console.WriteLine($"Consumed Low Priority: {item}");
            }
        }
    }
    public static void Main()
    {
        var highPriorityQueue = new BlockingCollection<int>();
        var lowPriorityQueue = new BlockingCollection<int>();
        Task.Run(() => Producer(highPriorityQueue, lowPriorityQueue));
        Task.Run(() => PriorityConsumer(highPriorityQueue, lowPriorityQueue));
        // 모든 작업이 완료될 때까지 대기
        Task.Delay(3000).Wait();
    }
}
  • 높은 우선순위 작업이 먼저 처리되고, 높은 우선순위 큐가 비어 있을 때 낮은 우선순위 큐의 작업이 처리됩니다.
  • 이는 우선순위에 따른 작업 처리의 유연성을 제공합니다.

타임아웃 및 취소 기능 활용

BlockingCollection<T>는 타임아웃과 취소 토큰을 지원하여 데이터 소비 시 대기 시간을 제어하거나 작업을 중단할 수 있습니다. 이 기능을 통해 특정 시간 내에 작업이 완료되지 않으면 대기 중인 스레드를 취소하거나 타임아웃이 발생할 수 있도록 설정할 수 있습니다.

  • 데이터 처리 시간이 오래 걸릴 때, 지정된 시간 내에 완료되지 않으면 타임아웃을 발생시켜 다른 작업으로 넘어갑니다.
  • CancellationToken을 통해 외부에서 작업을 중단할 수 있습니다.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class Program
{
    public static void Producer(BlockingCollection<int> collection, CancellationToken token)
    {
        try
        {
            for (int i = 0; i < 10; i++)
            {
                collection.Add(i, token);
                Console.WriteLine($"Produced: {i}");
                Thread.Sleep(100); // 생산 속도 조절
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Producer canceled.");
        }
        finally
        {
            collection.CompleteAdding();
        }
    }
    public static void ConsumerWithTimeout(BlockingCollection<int> collection, CancellationToken token)
    {
        try
        {
            foreach (var item in collection.GetConsumingEnumerable(token))
            {
                Console.WriteLine($"Consumed: {item}");
                Thread.Sleep(150); // 소비 속도 조절
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Consumer canceled.");
        }
    }
    public static void Main()
    {
        var collection = new BlockingCollection<int>();
        var cts = new CancellationTokenSource();
        Task.Run(() => Producer(collection, cts.Token));
        Task.Run(() => ConsumerWithTimeout(collection, cts.Token));
        // 2초 후에 작업 취소
        cts.CancelAfter(2000);
        // 모든 작업이 완료될 때까지 대기
        Task.Delay(3000).Wait();
    }
}
  • 타임아웃을 설정하여 지정된 시간이 초과되면 자동으로 작업을 취소합니다.
  • CancellationToken을 사용하여 생산자와 소비자 작업을 외부에서 중단할 수 있습니다.

성능 고려사항

메모리 사용량

BlockingCollection<T>는 내부적으로 사용하는 컬렉션의 특성에 따라 메모리 사용량이 달라집니다. 큐queue나 스택stack을 기반으로 할 때, 각 구조의 메모리 오버헤드를 고려해야 합니다.

생산자와 소비자의 속도 불일치

생산자가 소비자보다 훨씬 빠르게 데이터를 생성하면, 컬렉션이 가득 차 대기 상태가 될 수 있습니다. 반대로 소비자가 생산자보다 빠르면 컬렉션이 자주 비게 되어 효율성이 떨어질 수 있습니다.

동시성 수준

여러 스레드가 동시에 데이터를 추가하거나 제거할 경우, 내부 동기화 비용이 발생할 수 있습니다. 이는 성능에 영향을 줄 수 있으므로, 필요한 만큼의 동시성 수준을 설정하는 것이 중요합니다.

기본 컬렉션 선택

BlockingCollection<T>를 생성할 때 사용하는 기본 컬렉션(ConcurrentQueue, ConcurrentStack 등)을 상황에 맞게 선택하여 성능을 최적화할 수 있습니다. 예를 들어, FIFOFirst-In-First-Out 방식이 필요한 경우 ConcurrentQueue를 사용하고, LIFOLast-In-First-Out 방식이 필요한 경우 ConcurrentStack를 사용하는 것이 적합합니다.