ConcurrentQueue
ConcurrentQueue<T>
는 .NET의 동시성 컬렉션 중 하나로, FIFOFirst In, First Out 방식으로 데이터를 관리하는 큐 구조를 제공합니다. ConcurrentQueue<T>
는 여러 스레드에서 동시에 데이터를 추가하거나 제거할 때도 안전하게 사용할 수 있도록 설계되었습니다. 이는 스레드 간 동시 접근이 빈번한 환경에서 효율적으로 큐를 사용할 수 있도록 돕습니다.
ConcurrentQueue의 특성
FIFO 구조
- 큐의 특성상 먼저 추가된 데이터가 먼저 제거됩니다.
ConcurrentQueue<T>
는 멀티스레드 환경에서 큐를 사용하는 경우에 적합하며, 비슷한 기능을 제공하는 다른 동시성 컬렉션(ConcurrentStack, ConcurrentBag)과의 차별점은 데이터의 처리 순서에 있습니다.ConcurrentQueue<T>
는 선입선출 방식으로 데이터를 처리하기 때문에 먼저 추가된 데이터를 먼저 처리하고자 할 때 유용합니다.
주요 메서드와 사용법
생성자와 초기화
기본 생성자
빈 ConcurrentQueue
를 생성합니다.
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
컬렉션으로 초기화
다른 컬렉션을 사용해 ConcurrentQueue
를 초기화할 수 있습니다.
int[] numbers = { 1, 2, 3, 4, 5 };
ConcurrentQueue<int> queueFromCollection = new ConcurrentQueue<int>(numbers);
요소 추가 및 제거
Enqueue
큐의 끝에 요소를 추가합니다.
queue.Enqueue(10);
queue.Enqueue(20);
- 큐에 여러 요소를 한 번에 추가하는 기능은 제공되지 않으며, 요소를 개별적으로 추가해야 합니다.
TryDequeue
큐의 앞에서 요소를 제거하고 반환합니다.
if (queue.TryDequeue(out int result))
{
Console.WriteLine(result); // 큐에서 꺼낸 요소 출력
}
- 큐가 비어 있는 경우
TryDequeue
는false
를 반환하고,result
는 기본값으로 설정됩니다.
TryPeek
큐의 앞에 있는 요소를 제거하지 않고 반환합니다.
if (queue.TryPeek(out int frontElement))
{
Console.WriteLine(frontElement); // 큐의 앞 요소 출력
}
기타 유용한 메서드
IsEmpty
큐가 비어 있는지 확인합니다.
if (queue.IsEmpty)
{
Console.WriteLine("큐가 비어 있습니다.");
}
Count
큐에 있는 요소의 개수를 반환합니다.
int count = queue.Count;
Console.WriteLine($"큐에 있는 요소의 개수: {count}");
ToArray
큐의 요소를 배열로 복사합니다.
int[] array = queue.ToArray();
- 큐의 순서가 반영된 배열이 생성됩니다.
기본 사용 예제
다음은 ConcurrentQueue<T>
의 기본적인 사용 예제를 보여줍니다.
using System;
using System.Collections.Concurrent;
class Program
{
static void Main()
{
// ConcurrentQueue 생성
ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
// 요소 추가
queue.Enqueue(1);
queue.Enqueue(2);
queue.Enqueue(3);
// 요소 제거 및 출력
if (queue.TryDequeue(out int result))
{
Console.WriteLine($"Dequeue된 요소: {result}");
// 출력: Dequeue된 요소: 1
}
// 큐의 앞 요소 확인
if (queue.TryPeek(out int frontElement))
{
Console.WriteLine($"현재 앞 요소: {frontElement}");
// 출력: 현재 앞 요소: 2
}
// 큐의 모든 요소 출력
Console.WriteLine("큐의 모든 요소:");
foreach (var item in queue)
{
Console.WriteLine(item);
// 출력: 2, 3
}
}
}
위 예제에서는 ConcurrentQueue
에 요소를 추가하고, 앞 요소를 제거하거나 확인하는 기본적인 작업을 보여줍니다.
비동기 작업 처리 및 동시성 제어
ConcurrentQueue<T>
는 멀티스레드 환경에서 데이터에 동시 접근이 가능하므로, 비동기 작업을 큐에 저장하고 여러 스레드가 안전하게 처리하도록 할 수 있습니다. 이를 통해 스레드가 여러 작업을 효율적으로 분배받아 처리할 수 있으며, 데이터 경합을 방지할 수 있습니다.
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class AsyncLogger
{
private ConcurrentQueue<string> logQueue = new ConcurrentQueue<string>();
public void Log(string message)
{
logQueue.Enqueue(message); // 로그를 큐에 추가
}
public async Task ProcessLogsAsync()
{
while (true)
{
if (logQueue.TryDequeue(out string log))
{
await SaveLogAsync(log);
}
else
{
await Task.Delay(100); // 큐가 비었으면 잠시 대기
}
}
}
private Task SaveLogAsync(string log)
{
Console.WriteLine($"Saving log: {log}");
return Task.CompletedTask; // 실제로는 파일/DB에 저장하는 코드가 들어감
}
}
- 비동기적으로 큐의 데이터를 처리하여 로그 저장의 효율성을 높입니다.
- 다른 작업과 독립적으로 운영됩니다.
스트리밍 파이프라인 구축
ConcurrentQueue<T>
는 실시간 데이터를 수집하고, 이를 큐에 저장하여 순차적으로 처리하는 데이터 스트리밍 파이프라인을 구현하는 데 적합합니다. 최신 데이터가 앞에서부터 처리되므로 실시간 분석이 필요한 경우 유용하게 사용할 수 있습니다.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class SensorDataStreamer
{
private ConcurrentQueue<int> sensorDataQueue = new ConcurrentQueue<int>();
public void AddData(int data)
{
sensorDataQueue.Enqueue(data); // 센서 데이터를 큐에 추가
}
public async Task ProcessDataStream(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
if (sensorDataQueue.TryDequeue(out int data))
{
ProcessData(data);
}
else
{
await Task.Delay(50); // 큐가 비었으면 잠시 대기
}
}
}
private void ProcessData(int data)
{
Console.WriteLine($"Processing sensor data: {data}");
// 실제 처리 로직
}
}
- 센서 데이터를 실시간으로 수집하고, 큐를 통해 데이터를 순차적으로 처리하는 파이프라인입니다.
- 최신 데이터를 실시간으로 처리하면서도, 큐를 이용한 순차적 처리를 통해 데이터 유실을 방지합니다.
스레드풀 및 병렬 작업과의 통합
ConcurrentQueue<T>
는 스레드풀과 결합하여 병렬로 작업을 처리할 수 있습니다. 이를 통해 큐에 들어온 작업을 여러 스레드가 병렬로 나누어 처리하여, 높은 처리량을 달성할 수 있습니다.
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class ParallelTaskProcessor
{
private ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
public void EnqueueTask(string task)
{
taskQueue.Enqueue(task); // 작업을 큐에 추가
}
public void ProcessTasksInParallel()
{
Parallel.For(0, Environment.ProcessorCount, i =>
{
while (taskQueue.TryDequeue(out string task))
{
ProcessTask(task);
}
});
}
private void ProcessTask(string task)
{
Console.WriteLine($"Processing task: {task} on Thread {Task.CurrentId}");
// 작업 처리 로직
}
}
- CPU 코어 수에 맞춰 병렬로 작업을 처리하므로, 작업의 처리 속도를 최적화할 수 있습니다.
ConcurrentQueue를 활용한 이중 버퍼링
이중 버퍼링 기법을 통해 쓰기 큐와 읽기 큐를 교차하여 사용하면, 한쪽 큐에 데이터가 쌓이는 동안 다른 쪽 큐에서 데이터를 비동기적으로 처리할 수 있습니다. 이 방식은 특히 다중 스레드 환경에서 데이터를 안전하고 효율적으로 처리하는 데 유리합니다.
예제 코드: ConcurrentQueue를 활용한 이중 버퍼링
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class DoubleBufferQueue<T>
{
private ConcurrentQueue<T> _writeQueue = new ConcurrentQueue<T>();
private ConcurrentQueue<T> _readQueue = new ConcurrentQueue<T>();
// 데이터 추가 메서드
public void Enqueue(T item)
{
_writeQueue.Enqueue(item); // 쓰기 큐에 데이터 추가
}
// 큐 교체 메서드
public void SwapBuffers()
{
// 읽기 큐와 쓰기 큐 교체
var temp = _readQueue;
_readQueue = _writeQueue;
_writeQueue = temp;
}
// 비동기적으로 읽기 큐 데이터를 처리하는 메서드
public async Task ProcessQueueAsync()
{
while (!_readQueue.IsEmpty)
{
if (_readQueue.TryDequeue(out T item))
{
await ProcessItemAsync(item); // 각 항목 비동기 처리
}
}
}
// 비동기 작업을 수행하는 예제 메서드
private Task ProcessItemAsync(T item)
{
return Task.Run(() => Console.WriteLine($"Processing: {item}"));
}
}
// 사용 예제
var bufferQueue = new DoubleBufferQueue<int>();
// 데이터 추가
bufferQueue.Enqueue(1);
bufferQueue.Enqueue(2);
bufferQueue.Enqueue(3);
// 큐 교체 후 읽기 큐 데이터 비동기 처리
bufferQueue.SwapBuffers();
await bufferQueue.ProcessQueueAsync();
- Enqueue 메서드: 쓰기 큐에 데이터를 추가합니다. 이때
ConcurrentQueue<T>
의 스레드 안전성을 활용하여 다중 스레드 환경에서도 안전하게 데이터를 추가할 수 있습니다. - SwapBuffers 메서드: 읽기 큐와 쓰기 큐를 교체합니다. 이로써 쓰기 큐에 데이터를 추가하는 동안 읽기 큐에서 데이터가 처리될 수 있도록 하여, 데이터를 병렬로 처리할 수 있습니다.
- ProcessQueueAsync 메서드: 읽기 큐의 데이터를 비동기적으로 처리합니다.
TryDequeue
를 사용해 데이터가 있는지 확인하고, 데이터가 있으면ProcessItemAsync
메서드를 통해 비동기 작업을 실행합니다.