指点成金-最美分享吧

登录

C#线程安全队列ConcurrentQueue

佚名 举报

篇首语:本文由小编为大家整理,主要介绍了C#线程安全队列ConcurrentQueue相关的知识,希望对你有一定的参考价值。

参考技术A ConcurrentQueue队列是一个高效的线程安全的队列,是.Net Framework 4.0,System.Collections.Concurrent命名空间下的一个数据结构。

入队(EnQueue) 、出队(TryDequeue) 、是否为空(IsEmpty)、获取队列内元素数量(Count)。

入队函数,当队列已满时会自动增加队列容量。

尝试出队函数,如果当前队列为空,返回false,否则返回队列的第一个元素。

跟TryDequeue()方法相似,但不删除队列中的元素。

返回当前队列中元素的个数。

判定当前队列为空。

清空并复位队列。

线程安全集合

>>返回《C# 并发编程》

  • 1. 简介
  • 2. 不可变栈和队列
  • 3. 不可变列表
  • 4. 不可变Set集合
  • 5. 不可变字典
  • 6. 线程安全字典
  • 7. 阻塞队列
  • 8. 阻塞栈和包
  • 9. 异步队列
  • 10. 异步栈和包
  • 11. 阻塞/异步队列

1. 简介

  • 不可变集合
    • 不可变集合之间通常共享了大部分存储空间,因此其实浪费并不大
    • 因为是无法修改的,所以是线程安全
  • 线程安全集合
    • 可同时被多个线程修改的可变集合
      • 线程安全集合混合使用了细粒度锁定无锁技术,以确保线程被阻塞的时间最短
        • 通常情况下是根本不阻塞
    • 对很多线程安全集合进行枚举操作时,内部创建了该集合的一个快照(snapshot),并对这个快照进行枚举操作。
    • 线程安全集合的主要优点是多个线程可以安全地对其进行访问,而代码只会被阻塞很短的时间,或根本阻塞。

下面对常用的属于不可变集合线程安全集合类型的,特定数据结构的集合进行说明。

2. 不可变栈和队列

不可变集合采用的模式是返回一个修改过的集合,原始的集合引用是不变化的。

  • 这意味着,如果引用了特定的不可变集合的实例,它是不会变化的。
var stack = ImmutableStack.Empty;stack = stack.Push(13);var biggerStack = stack.Push(7);// 先显示“7”,接着显示“13”。 foreach (var item in biggerStack)    Console.WriteLine($"biggerStack {item}");// 只显示“13”。foreach (var item in stack)    Console.WriteLine($"stack {item}");

输出:

biggerStack 7biggerStack 13stack 13

两个栈实际上在内部共享了存储项目 13 的内存。

  • 这种实现方式的效率很高,并且可以很方便地创建当前状态的快照
  • 每个不可变集合的实例都是绝对线程安全

ImmutableQueue 使用方法类似。

  • 不可变集合的一个实例是永远不改变的。
  • 因为不会改变,所以是绝对线程安全的。
  • 对不可变集合使用修改方法时,返回修改后的集合。
  • 不可变集合非常适用于共享状态,但不适合用来做交换数据的通道。

3. 不可变列表

不可变列表的内部是用二叉树组织数据的。这么做是为了让不可变列表的实例之间共享的内存最大化。

  • 这导致 ImmutableListList 在常用操作上有性能上的差别(参见下表)。
操 作ListImmutableList
Add平摊 O(1)O(log N)
InsertO(N)O(log N)
RemoveAtO(N)O(log N)
Item[index]O(1)O(log N)

不可变列表确实可以使用index获取数据项,但需要注意性能问题。不能简单地用它来替代 List

  • 这意味着应该尽量使用 foreach 而不是用 for

4. 不可变Set集合

  • ImmutableHashSet
    • 是一个不含重复元素的集合
  • ImmutableSortedSet
    • 是一个已排序的不含重复元素的集合
  • 都有相似的接口
//ImmutableHashSetvar hashSet = ImmutableHashSet.Empty;hashSet = hashSet.Add(13);hashSet = hashSet.Add(7);// 显示“7”和“13”,次序不确定。 foreach (var item in hashSet)    Console.Write(item + " ");System.Console.WriteLine();hashSet = hashSet.Remove(7);//ImmutableSortedSetvar sortedSet = ImmutableSortedSet.Empty;sortedSet = sortedSet.Add(13);sortedSet = sortedSet.Add(7);// 先显示“7”,接着显示“13”。 foreach (var item in sortedSet)    Console.Write(item + " ");var smallestItem = sortedSet[0];// smallestItem == 7sortedSet = sortedSet.Remove(7);

输出:

7 13 7 13 
操 作ImmutableHashSetImmutableSortedSet
AddO(log N)O(log N)
RemoveO(log N)O(log N)
Item[index]不可用O(log N)

ImmutableSortedSet 索引操作的时间复杂度是 O(log N),而不是 O(1),这跟 上节中 ImmutableList 的情况类似。

  • 这意味着它们适用同样的警告:使用 ImmutableSortedSet时,应该尽量用 foreach 而不是用 for 。

可以先快速地以可变方式构建,然后转换成不可变集合

5. 不可变字典

  • ImmutableDictionary
  • ImmutableSortedDictionar y
//ImmutableDictionaryvar dictionary = ImmutableDictionary.Empty;dictionary = dictionary.Add(10, "Ten");dictionary = dictionary.Add(21, "Twenty-One");dictionary = dictionary.SetItem(10, "Diez");// 显示“10Diez”和“21Twenty-One”,次序不确定。 foreach (var item in dictionary)    Console.WriteLine(item.Key + ":" + item.Value);var ten = dictionary[10]; // ten == "Diez"dictionary = dictionary.Remove(21);//ImmutableSortedDictionaryvar sortedDictionary = ImmutableSortedDictionary.Empty; sortedDictionary = sortedDictionary.Add(10, "Ten");sortedDictionary = sortedDictionary.Add(21, "Twenty-One");sortedDictionary = sortedDictionary.SetItem(10, "Diez");// 先显示“10Diez”,接着显示“21Twenty-One”。 foreach (var item in sortedDictionary)    Console.WriteLine(item.Key + ":" + item.Value);ten = sortedDictionary[10];// ten == "Diez"sortedDictionary = sortedDictionary.Remove(21);

输出:

10:Diez21:Twenty-One10:Diez21:Twenty-One操 作 I
操 作ImmutableDictionaryImmutableSortedDictionary
AddO(log N)O(log N)
SetItemO(log N)O(log N)
Item[key]O(log N)O(log N)
RemoveO(log N)O(log N)

6. 线程安全字典

var dictionary = new ConcurrentDictionary(); var newValue = dictionary.AddOrUpdate(0,key => "Zero",(key, oldValue) => "Zero");

AddOrUpdate 方法有些复杂,这是因为这个方法必须执行多个步骤,具体步骤取决于并发字典的当前内容。

  • 方法的第一个参数是
  • 第二个参数是一个委托,它把键(本例中为 0)转换成添加到字典的值(本例中为“Zero”)
    • 只有当字典中没有这个键时,这个委托才会运行。
  • 第三个参数也是一个委托,它把(0)和原来的值转换成字典中修改后的值
    (“Zero”)。
    • 只有当字典中已经存在这个键时,这个委托才会运行。
  • AddOrUpdate return 这个键对应的新值(与其中一个委托返回的值相同)。

AddOrUpdate 可能要多次调用其中一个(或两个)委托。这种情况很少,但确实会发生。

  • 因此这些委托必须简单、快速,并且不能有副作用
  • 这些委托只能创建新的值,不能修改程序中其他变量
  • 这个原则适用于所有 ConcurrentDictionary 的方法所使用的委托
// 使用与前面一样的“字典”。string currentValue;bool keyExists = dictionary.TryGetValue(0, out currentValue);// 使用与前面一样的“字典”。string removedValue;bool keyExisted = dictionary.TryRemove(0, out removedValue);
  • 如果多个线程读写一个共享集合, 使用 ConcurrentDictrionary 是最合适的
  • 如果不会频繁修改(很少修改), 那更适合使用 ImmutableDictionary

  • 如果一些线程只添加元素,另一些线程只移除元素,那最好使用生产者/消费者集合

7. 阻塞队列

  • GetConsumingEnumerable 会阻塞线程
  • CommpleteAdding 方法执行后所有被 GetConsumingEnumerable 阻塞的线程开始执行
  • 每个元素只会被消费一次
private static readonly BlockingCollection _blockingQueue = new BlockingCollection();public static async Task BlockingCollectionSP(){    Action consumerAction = () =>     {        Console.WriteLine($"started print({Thread.CurrentThread.ManagedThreadId}).");        // 先显示“7”,后显示“13”。        foreach (var item in _blockingQueue.GetConsumingEnumerable())        {             Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {item}");        }        Console.WriteLine($"ended print({Thread.CurrentThread.ManagedThreadId}).");     };    Task task1 = Task.Run(consumerAction);    Task task2 = Task.Run(consumerAction);    Task task3 = Task.Run(consumerAction);    _blockingQueue.Add(7);    System.Console.WriteLine($"added 7.");    _blockingQueue.Add(13);    System.Console.WriteLine($"added 13.");    _blockingQueue.CompleteAdding();    System.Console.WriteLine("CompleteAdding.");    try    {        _blockingQueue.Add(15);    }    catch (Exception ex)    {        System.Console.WriteLine($"{ex.GetType().Name}:{ex.Message}");    }    await Task.WhenAll(task1, task2, task3);}

输出:

started print(4).started print(3).started print(6).added 7.added 13.CompleteAdding.ended print(6).InvalidOperationException:The collection has been marked as complete with regards to additions.print(4) 7ended print(4).print(3) 13ended print(3).

8. 阻塞栈和包

  • 在默认情况下,.NET 中的 BlockingCollection 用作阻塞队列,但它也可以作为任何类型的生产者/消费者集合
  • BlockingCollection 实际上是对线程安全集合进行了封装, 实现了 IProducerConsumerCollection 接口。
    • 因此可以在创建 BlockingCollection 实例时指明规则
BlockingCollection _blockingStack = new BlockingCollection( new ConcurrentStack());BlockingCollection _blockingBag = new BlockingCollection( new ConcurrentBag());

替换到阻塞队列示例代码中试试。

9. 异步队列

public static async Task BufferBlockPS(){    BufferBlock _asyncQueue = new BufferBlock();    Func concurrentConsumerAction = async () =>     {         while (true)         {             int item;             try             {                 item = await _asyncQueue.ReceiveAsync();             }             catch (InvalidOperationException)             {                 System.Console.WriteLine($"exit({Thread.CurrentThread.ManagedThreadId}).");                 break;             }             Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {item}");         }     };    Func consumerAction = async () =>    {        try        {            // 先显示“7”,后显示“13”。 单线程可用            while (await _asyncQueue.OutputAvailableAsync())            {                Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {await _asyncQueue.ReceiveAsync()}");            }        }        catch (Exception ex)        {            System.Console.WriteLine($"{ex.GetType().Name}({Thread.CurrentThread.ManagedThreadId}):{ex.Message}");        }    };    Task t1 = consumerAction();    Task t2 = consumerAction();    // Task t1 = concurrentConsumerAction();    // Task t2 = concurrentConsumerAction();    // 生产者代码    await _asyncQueue.SendAsync(7);    await _asyncQueue.SendAsync(13);    await _asyncQueue.SendAsync(15);    System.Console.WriteLine("Added 7 13 15.");    _asyncQueue.Complete();    await Task.WhenAll(t1, t2);}

输出:

Added 7 13 15.print(4) 7print(6) 13print(4) 15InvalidOperationException(3):The source completed without providing data to receive.

10. 异步栈和包

Nito.AsyncEx 库

AsyncCollection _asyncStack = new AsyncCollection( new ConcurrentStack());AsyncCollection _asyncBag = new AsyncCollection( new ConcurrentBag());

11. 阻塞/异步队列

在阻塞队列中已经介绍了BufferBlock

这里介绍 ActionBlock

public static async Task ActionBlockPS(){    ActionBlock queue = new ActionBlock(u => Console.WriteLine($"print({Thread.CurrentThread.ManagedThreadId}) {u}"));    // 异步的生产者代码    await queue.SendAsync(7);    await queue.SendAsync(13);    System.Console.WriteLine("Added async.");    // 同步的生产者代码     queue.Post(15);    queue.Post(17);    System.Console.WriteLine("Added sync.");    queue.Complete();    System.Console.WriteLine($"Completed({Thread.CurrentThread.ManagedThreadId}).");}

输出:

Added async.Added sync.Completed(1).print(3) 7print(3) 13print(3) 15print(3) 17

以上是关于C#线程安全队列ConcurrentQueue的主要内容,如果未能解决你的问题,请参考以下文章