partitioner抛出invalidoperationexception的根本原因是其依赖的数据源在并行划分过程中被外部修改,导致内部状态不一致。1. 当使用partitioner.create处理非线程安全集合(如list

C#中
Partitioner抛出的
InvalidOperationException,通常发生在当你尝试在并行处理过程中,修改了作为数据源的集合时。简单来说,就是
Partitioner在划分任务时,它所依赖的底层数据源被“动了手脚”,导致其内部状态不一致,无法继续安全地执行划分操作。这通常不是一个bug,而是系统在告诉你,你正在做的事情可能会导致数据混乱或不完整。
解决方案
遇到
Partitioner引发
InvalidOperationException,核心思路是确保在并行处理期间,作为数据源的集合是稳定的、不可变的,或者至少其结构不会发生改变。
一种最直接、也最常用的方法,就是在将集合传递给
Partitioner之前,先创建一个它的“快照”或副本。例如,如果你有一个
List<T>,并且你担心在
Parallel.ForEach执行过程中它会被修改,那么可以这样做:
List<int> originalList = new List<int> { 1, 2, 3, 4, 5 };
// ... 某个地方可能会修改originalList
// 在传递给Partitioner之前,创建一个副本
var stableSource = originalList.ToArray(); // 或者 .ToList()
// 现在,使用这个稳定的副本进行并行处理
Parallel.ForEach(Partitioner.Create(stableSource), item =>
{
// 对item进行操作
Console.WriteLine($"Processing {item}");
// 在这里不要修改originalList或stableSource
});这样做的好处是,
Partitioner操作的是一个独立的、不会被外部修改的数组或列表,从而避免了
InvalidOperationException。当然,这会引入一份内存开销,但对于大多数场景来说,这是可接受的权衡。
如果你的数据源必须是动态的,并且在并行处理期间会有并发修改,那么你需要考虑使用专门为并发设计的集合类型,比如
ConcurrentBag<T>或
ConcurrentQueue<T>。这些集合在设计上就考虑了多线程访问的安全性,它们通常能更好地与
Partitioner协同工作,尽管它们在某些特定场景下可能不会提供最佳的性能分区策略。
// 假设这是一个可能被并发修改的集合
ConcurrentBag<string> concurrentData = new ConcurrentBag<string>();
concurrentData.Add("Alpha");
concurrentData.Add("Beta");
// ... 可以在其他线程继续添加或移除
// Partitioner.Create可以直接处理ConcurrentBag
Parallel.ForEach(Partitioner.Create(concurrentData), item =>
{
Console.WriteLine($"Processing {item}");
// 在这里对concurrentData进行修改通常是安全的,但要理解其语义
});总而言之,问题的根源在于并行处理对数据源一致性的要求,解决方案则围绕着如何提供一个满足这一要求的数据源展开。
为什么Partitioner会抛出InvalidOperationException?
嗯,这事儿挺常见的,说实话,我也踩过这个坑。
Partitioner,特别是当它试图对一个非线程安全的集合(比如
List<T>或
Array)进行划分时,它需要一个稳定的“视图”来确定每个并行任务应该处理哪些数据。你可以想象一下,它就像一个工头,正在给一群工人分配任务:第一个工人处理1-10号零件,第二个工人处理11-20号零件。如果在他分配任务的过程中,或者工人已经开始拿零件的时候,有人突然往生产线上加了几个零件,或者移走了几个,那工头之前算好的分配方案就全乱套了。
InvalidOperationException就是系统在告诉你:“嘿,你的集合在被我划分的时候变了!我没法保证我分出去的任务是正确的,也没法保证不会出现重复处理或者遗漏数据的情况。”这是一种“快失败”(fail-fast)机制,它不是一个bug,而是一种保护,防止你的程序在不一致的状态下继续运行,从而产生更难以调试的错误结果。
具体来说,当你使用
Partitioner.Create(myList)这样的方式时,
Partitioner可能会根据
myList的当前大小和结构来计算出各个并行任务的起始和结束索引。如果
myList在此时被另一个线程添加、删除元素,或者甚至只是改变了元素顺序,那么之前计算好的索引可能就会指向错误的位置,或者一部分数据被遗漏,另一部分被重复处理。这种不确定性是并行编程的大忌,所以系统选择直接抛出异常,强制你处理这种并发修改。
它主要发生在以下几种情况:
你在Parallel.ForEach或PLINQ操作一个
List<T>、
Dictionary<TKey, TValue>等非线程安全集合时,同时有另一个线程在向这个集合添加、删除元素。 你自定义了一个
OrderablePartitioner,但你的
GetPartitions或
GetDynamicPartitions方法没有正确处理底层数据源的并发修改,或者它本身就依赖于一个非线程安全的快照。
如何安全地在并行处理中修改数据源?
这确实是一个常见的需求,但“安全地在并行处理中修改数据源”这个说法本身就有点陷阱。更准确的说法应该是:如何在并行处理中收集结果,或者在并行处理中处理动态变化的数据。直接修改作为
Partitioner数据源的集合,通常不是推荐的做法,因为这正是导致
InvalidOperationException的原因。
如果你需要在并行处理中产生新的数据,并把这些数据收集起来,你应该使用线程安全的集合来存储结果,而不是去修改原始的数据源。例如:
List<int> numbers = Enumerable.Range(1, 100).ToList();
// 使用ConcurrentBag来收集并行处理的结果
ConcurrentBag<double> results = new ConcurrentBag<double>();
Parallel.ForEach(Partitioner.Create(numbers), number =>
{
// 假设这是一个耗时的计算
double result = Math.Sqrt(number) * 10;
results.Add(result); // 安全地将结果添加到线程安全集合中
});
// 所有并行任务完成后,可以安全地访问results
foreach (var res in results)
{
Console.WriteLine(res);
}这里,
numbers集合在整个
Parallel.ForEach过程中是保持不变的,而
results是一个专门为并发写入设计的集合。
如果你的“数据源”本身就是动态的,比如一个队列,你希望在并行处理的同时,有新的数据不断地被添加到队列中,并且能够被处理,那么你需要从一开始就选择一个线程安全的集合作为数据源,并且
Partitioner能够很好地支持它。
ConcurrentQueue<T>就是一个很好的例子:
ConcurrentQueue<string> tasksQueue = new ConcurrentQueue<string>();
tasksQueue.Enqueue("Task A");
tasksQueue.Enqueue("Task B");
// 模拟另一个线程不断添加任务
Task.Run(() =>
{
for (int i = 0; i < 5; i++)
{
Thread.Sleep(500); // 模拟延迟
tasksQueue.Enqueue($"Dynamic Task {i}");
Console.WriteLine($"Added Dynamic Task {i}");
}
});
// Partitioner.Create可以直接处理ConcurrentQueue
// 注意:对于无限流或持续添加的队列,你可能需要一个停止条件
Parallel.ForEach(Partitioner.Create(tasksQueue), task =>
{
Console.WriteLine($"Processing: {task}");
Thread.Sleep(200); // 模拟处理时间
});
Console.WriteLine("All tasks processed."); // 这行可能在队列完全清空前出现在这种情况下,
Partitioner.Create(tasksQueue)能够适应
ConcurrentQueue的动态特性。然而,你需要注意的是,这种模式下,
Parallel.ForEach会在队列为空时停止,如果你的生产者线程还在持续生产,你可能需要更复杂的协调机制(比如使用
BlockingCollection)。
总的来说,避免在并行处理中直接修改作为
Partitioner数据源的非线程安全集合,而是将修改操作转移到结果收集阶段,或者从一开始就使用线程安全的集合作为数据源。
Partitioner.Create的不同重载有什么区别?
Partitioner.Create方法提供了多个重载,它们的设计是为了适应不同类型的数据源和不同的分区需求。理解这些区别对于优化并行性能和避免潜在问题至关重要。
Partitioner.Create<TSource>(IEnumerable<TSource> source)
IEnumerable<TSource>的集合。 它会根据传入的
source的具体类型,在内部选择一个合适的分区策略。例如,如果
source是
List<T>或
T[],它可能会使用基于索引的范围分区;如果是非索引集合,它可能会使用迭代器分区。 风险点: 如果
source是一个非线程安全的集合,并且在
Parallel.ForEach执行期间被修改,就非常容易抛出
InvalidOperationException。这是最常见的触发点。
Partitioner.Create<TSource>(IList<TSource> list)
IList<TSource>的集合。 由于
IList提供了索引访问能力,
Partitioner可以利用这一点进行更高效的范围分区。它通常会将列表划分为连续的块,然后将这些块分配给不同的并行任务。 风险点: 尽管它能更高效地利用索引,但如果
list在并行处理期间被修改(添加、删除元素),同样会引发
InvalidOperationException。
Partitioner.Create<TSource>(TSource[] array)
TSource[]的重载。 数组是固定大小的,提供了最直接的索引访问。
Partitioner可以非常高效地将数组划分为精确的、连续的子范围,这通常能提供最佳的性能。 安全性: 相对于
List<T>,数组的结构(大小)在创建后是不可变的,这使得它在作为
Partitioner的数据源时更安全,不会因为元素数量的变化而导致
InvalidOperationException。当然,如果你在并行处理中修改数组内部的元素值,那又是另一个层面的并发问题了,但至少不会因为结构变化而抛出
InvalidOperationException。
Partitioner.Create(int fromInclusive, int toExclusive)
Parallel.For(0, 100, i => { /* process item at index i */ }); 在内部就可能使用类似的分区策略。
Partitioner.Create<TSource>(IEnumerable<TSource> source, EnumerablePartitionerOptions options)
IEnumerable<TSource>源指定额外的选项,比如
EnumerablePartitionerOptions.NoBuffering。
NoBuffering选项会告诉
Partitioner不要在内部进行额外的缓冲。这在某些情况下可以减少内存使用,但可能会增加线程间的协调开销,从而影响性能。通常在处理非常大的数据集,或者数据生成速度很快时才会考虑。
OrderablePartitioner<TSource>
OrderablePartitioner意味着你需要自己处理所有并发和一致性问题。如果你的自定义分区逻辑没有正确处理底层数据源的并发修改,你同样会遇到
InvalidOperationException或其他并发问题。
总的来说,选择哪个重载取决于你的数据源类型、你对性能的需求以及你是否需要处理动态或并发修改的数据。对于大多数情况,如果数据源是静态的,
ToArray()后使用
Partitioner.Create(TSource[])是最稳妥且高效的选择。如果数据源本身就是为并发设计的(如
ConcurrentBag),那么直接使用
Partitioner.Create(IEnumerable<TSource>)通常也能很好地工作。
