ConcurrentQueue

ConcurrentQueue<T>는 .NET의 동시성 컬렉션 중 하나로, FIFOFirst In, First Out 방식으로 데이터를 관리하는 큐 구조를 제공합니다. ConcurrentQueue<T>는 여러 스레드에서 동시에 데이터를 추가하거나 제거할 때도 안전하게 사용할 수 있도록 설계되었습니다. 이는 스레드 간 동시 접근이 빈번한 환경에서 효율적으로 큐를 사용할 수 있도록 돕습니다.

ConcurrentQueue의 특성

FIFO 구조

  • 큐의 특성상 먼저 추가된 데이터가 먼저 제거됩니다. ConcurrentQueue<T>는 멀티스레드 환경에서 큐를 사용하는 경우에 적합하며, 비슷한 기능을 제공하는 다른 동시성 컬렉션(ConcurrentStack, ConcurrentBag)과의 차별점은 데이터의 처리 순서에 있습니다. ConcurrentQueue<T>는 선입선출 방식으로 데이터를 처리하기 때문에 먼저 추가된 데이터를 먼저 처리하고자 할 때 유용합니다.

주요 메서드와 사용법

생성자와 초기화

기본 생성자

ConcurrentQueue를 생성합니다.

ConcurrentQueue<int> queue = new ConcurrentQueue<int>();

컬렉션으로 초기화

다른 컬렉션을 사용해 ConcurrentQueue를 초기화할 수 있습니다.

int[] numbers = { 1, 2, 3, 4, 5 };
ConcurrentQueue<int> queueFromCollection = new ConcurrentQueue<int>(numbers);

요소 추가 및 제거

Enqueue

큐의 끝에 요소를 추가합니다.

queue.Enqueue(10);
queue.Enqueue(20);
  • 큐에 여러 요소를 한 번에 추가하는 기능은 제공되지 않으며, 요소를 개별적으로 추가해야 합니다.

TryDequeue

큐의 앞에서 요소를 제거하고 반환합니다.

if (queue.TryDequeue(out int result))
{
    Console.WriteLine(result);  // 큐에서 꺼낸 요소 출력
}
  • 큐가 비어 있는 경우 TryDequeuefalse를 반환하고, result는 기본값으로 설정됩니다.

TryPeek

큐의 앞에 있는 요소를 제거하지 않고 반환합니다.

if (queue.TryPeek(out int frontElement))
{
    Console.WriteLine(frontElement);  // 큐의 앞 요소 출력
}

기타 유용한 메서드

IsEmpty

큐가 비어 있는지 확인합니다.

if (queue.IsEmpty)
{
    Console.WriteLine("큐가 비어 있습니다.");
}

Count

큐에 있는 요소의 개수를 반환합니다.

int count = queue.Count;
Console.WriteLine($"큐에 있는 요소의 개수: {count}");

ToArray

큐의 요소를 배열로 복사합니다.

int[] array = queue.ToArray();
  • 큐의 순서가 반영된 배열이 생성됩니다.

기본 사용 예제

다음은 ConcurrentQueue<T>의 기본적인 사용 예제를 보여줍니다.

using System;
using System.Collections.Concurrent;
class Program
{
    static void Main()
    {
        // ConcurrentQueue 생성
        ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
        // 요소 추가
        queue.Enqueue(1);
        queue.Enqueue(2);
        queue.Enqueue(3);
        // 요소 제거 및 출력
        if (queue.TryDequeue(out int result))
        {
            Console.WriteLine($"Dequeue된 요소: {result}");  
            // 출력: Dequeue된 요소: 1
        }
        // 큐의 앞 요소 확인
        if (queue.TryPeek(out int frontElement))
        {
            Console.WriteLine($"현재 앞 요소: {frontElement}");  
            // 출력: 현재 앞 요소: 2
        }
        // 큐의 모든 요소 출력
        Console.WriteLine("큐의 모든 요소:");
        foreach (var item in queue)
        {
            Console.WriteLine(item);  
            // 출력: 2, 3
        }
    }
}

위 예제에서는 ConcurrentQueue에 요소를 추가하고, 앞 요소를 제거하거나 확인하는 기본적인 작업을 보여줍니다.

비동기 작업 처리 및 동시성 제어

ConcurrentQueue<T>는 멀티스레드 환경에서 데이터에 동시 접근이 가능하므로, 비동기 작업을 큐에 저장하고 여러 스레드가 안전하게 처리하도록 할 수 있습니다. 이를 통해 스레드가 여러 작업을 효율적으로 분배받아 처리할 수 있으며, 데이터 경합을 방지할 수 있습니다.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class AsyncLogger
{
    private ConcurrentQueue<string> logQueue = new ConcurrentQueue<string>();
    public void Log(string message)
    {
        logQueue.Enqueue(message); // 로그를 큐에 추가
    }
    public async Task ProcessLogsAsync()
    {
        while (true)
        {
            if (logQueue.TryDequeue(out string log))
            {
                await SaveLogAsync(log);
            }
            else
            {
                await Task.Delay(100); // 큐가 비었으면 잠시 대기
            }
        }
    }
    private Task SaveLogAsync(string log)
    {
        Console.WriteLine($"Saving log: {log}");
        return Task.CompletedTask; // 실제로는 파일/DB에 저장하는 코드가 들어감
    }
}
  • 비동기적으로 큐의 데이터를 처리하여 로그 저장의 효율성을 높입니다.
  • 다른 작업과 독립적으로 운영됩니다.

스트리밍 파이프라인 구축

ConcurrentQueue<T>는 실시간 데이터를 수집하고, 이를 큐에 저장하여 순차적으로 처리하는 데이터 스트리밍 파이프라인을 구현하는 데 적합합니다. 최신 데이터가 앞에서부터 처리되므로 실시간 분석이 필요한 경우 유용하게 사용할 수 있습니다.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class SensorDataStreamer
{
    private ConcurrentQueue<int> sensorDataQueue = new ConcurrentQueue<int>();
    public void AddData(int data)
    {
        sensorDataQueue.Enqueue(data); // 센서 데이터를 큐에 추가
    }
    public async Task ProcessDataStream(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            if (sensorDataQueue.TryDequeue(out int data))
            {
                ProcessData(data);
            }
            else
            {
                await Task.Delay(50); // 큐가 비었으면 잠시 대기
            }
        }
    }
    private void ProcessData(int data)
    {
        Console.WriteLine($"Processing sensor data: {data}");
        // 실제 처리 로직
    }
}
  • 센서 데이터를 실시간으로 수집하고, 큐를 통해 데이터를 순차적으로 처리하는 파이프라인입니다.
  • 최신 데이터를 실시간으로 처리하면서도, 큐를 이용한 순차적 처리를 통해 데이터 유실을 방지합니다.

스레드풀 및 병렬 작업과의 통합

ConcurrentQueue<T>는 스레드풀과 결합하여 병렬로 작업을 처리할 수 있습니다. 이를 통해 큐에 들어온 작업을 여러 스레드가 병렬로 나누어 처리하여, 높은 처리량을 달성할 수 있습니다.

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class ParallelTaskProcessor
{
    private ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
    public void EnqueueTask(string task)
    {
        taskQueue.Enqueue(task); // 작업을 큐에 추가
    }
    public void ProcessTasksInParallel()
    {
        Parallel.For(0, Environment.ProcessorCount, i =>
        {
            while (taskQueue.TryDequeue(out string task))
            {
                ProcessTask(task);
            }
        });
    }
    private void ProcessTask(string task)
    {
        Console.WriteLine($"Processing task: {task} on Thread {Task.CurrentId}");
        // 작업 처리 로직
    }
}
  • CPU 코어 수에 맞춰 병렬로 작업을 처리하므로, 작업의 처리 속도를 최적화할 수 있습니다.

ConcurrentQueue를 활용한 이중 버퍼링

이중 버퍼링 기법을 통해 쓰기 큐와 읽기 큐를 교차하여 사용하면, 한쪽 큐에 데이터가 쌓이는 동안 다른 쪽 큐에서 데이터를 비동기적으로 처리할 수 있습니다. 이 방식은 특히 다중 스레드 환경에서 데이터를 안전하고 효율적으로 처리하는 데 유리합니다.

예제 코드: ConcurrentQueue를 활용한 이중 버퍼링

using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class DoubleBufferQueue<T>
{
    private ConcurrentQueue<T> _writeQueue = new ConcurrentQueue<T>();
    private ConcurrentQueue<T> _readQueue = new ConcurrentQueue<T>();
    // 데이터 추가 메서드
    public void Enqueue(T item)
    {
        _writeQueue.Enqueue(item); // 쓰기 큐에 데이터 추가
    }
    // 큐 교체 메서드
    public void SwapBuffers()
    {
        // 읽기 큐와 쓰기 큐 교체
        var temp = _readQueue;
        _readQueue = _writeQueue;
        _writeQueue = temp;
    }
    // 비동기적으로 읽기 큐 데이터를 처리하는 메서드
    public async Task ProcessQueueAsync()
    {
        while (!_readQueue.IsEmpty)
        {
            if (_readQueue.TryDequeue(out T item))
            {
                await ProcessItemAsync(item); // 각 항목 비동기 처리
            }
        }
    }
    // 비동기 작업을 수행하는 예제 메서드
    private Task ProcessItemAsync(T item)
    {
        return Task.Run(() => Console.WriteLine($"Processing: {item}"));
    }
}
// 사용 예제
var bufferQueue = new DoubleBufferQueue<int>();
// 데이터 추가
bufferQueue.Enqueue(1);
bufferQueue.Enqueue(2);
bufferQueue.Enqueue(3);
// 큐 교체 후 읽기 큐 데이터 비동기 처리
bufferQueue.SwapBuffers();
await bufferQueue.ProcessQueueAsync();
  • Enqueue 메서드: 쓰기 큐에 데이터를 추가합니다. 이때 ConcurrentQueue<T>의 스레드 안전성을 활용하여 다중 스레드 환경에서도 안전하게 데이터를 추가할 수 있습니다.
  • SwapBuffers 메서드: 읽기 큐와 쓰기 큐를 교체합니다. 이로써 쓰기 큐에 데이터를 추가하는 동안 읽기 큐에서 데이터가 처리될 수 있도록 하여, 데이터를 병렬로 처리할 수 있습니다.
  • ProcessQueueAsync 메서드: 읽기 큐의 데이터를 비동기적으로 처리합니다. TryDequeue를 사용해 데이터가 있는지 확인하고, 데이터가 있으면 ProcessItemAsync 메서드를 통해 비동기 작업을 실행합니다.