C# 多线程与高并发处理并且具备暂停、继续、停止功能 - 渴死的鱼丶 - 博客园
19-09-27
C 多线程与高并发处理并且具备暂停、继续、停止功能
C# 多线程与高并发处理并且具备暂停、继续、停止功能
--近期有一个需要运用多线程的项目,会有并发概率,所以写了一份代码,可能有写地方还不完善,后续有需求在改
2 /// 并发对象
3 ///
4 public class MeterAsyncQueue
5 {
6 public MeterAsyncQueue()
7 {
8 MeterInfoTask = new MeterInfo();
9 }
10
11 public MeterInfo MeterInfoTask { get; set; }
12 }
13 public class MeterInfo
14 {
15 public MeterInfo()
16 {
17
18 }
19 public int Id { get; set; }
20
21 }
1 ///
2 /// 线程通用类
3 ///
4 public class TaskCommand
5 {
6 CancellationTokenSource tokenSource = new CancellationTokenSource();
7 ManualResetEvent resetEvent = new ManualResetEvent(true);
8 Thread thread = null;
9 ///
10 /// 开始任务
11 ///
12 public void StartData()
13 {
14 tokenSource = new CancellationTokenSource();
15 resetEvent = new ManualResetEvent(true);
16
17 List Ids = new List();
18 for (int i = 0; i < 10000; i++)
19 {
20 Ids.Add(i);
21 }
22 thread = new Thread(new ThreadStart(() => StartTask(Ids)));
23 thread.Start();
24 }
25 ///
26 /// 暂停任务
27 ///
28 public void OutData()
29 {
30 //task暂停
31 resetEvent.Reset();
32 }
33 ///
34 /// 继续任务
35 ///
36 public void ContinueData()
37 {
38 //task继续
39 resetEvent.Set();
40 }
41 ///
42 /// 取消任务
43 ///
44 public void Cancel()
45 {
46 //释放对象
47 resetEvent.Dispose();
48 foreach (var CurrentTask in ParallelTasks)
49 {
50 if (CurrentTask != null)
51 {
52 if (CurrentTask.Status == TaskStatus.Running) { }
53 {
54 //终止task线程
55 tokenSource.Cancel();
56 }
57 }
58 }
59 thread.Abort();
60 }
61 ///
62 /// 执行数据
63 ///
64 ///
65 public void Execute(int Index)
66 {
67 //阻止当前线程
68 resetEvent.WaitOne();
69
70 Console.WriteLine("当前第" + Index + "个线程");
71
72 Thread.Sleep(1000);
73
74 }
75 //队列对象
76 private Queue AsyncQueues { get; set; }
77
78 ///
79 /// 并发任务数
80 ///
81 private int ParallelTaskCount { get; set; }
82
83
84 ///
85 /// 并行任务集合
86 ///
87 private List ParallelTasks { get; set; }
88 //控制线程并行数量
89 public void StartTask(List Ids)
90 {
91 IsInitTask = true;
92 ParallelTasks = new List();
93 AsyncQueues = new Queue();
94 //获取并发数
95 ParallelTaskCount = 5;
96
97 //初始化异步队列
98 InitAsyncQueue(Ids);
99 //开始执行队列任务
100 HandlingTask();
101
102 Task.WaitAll(new Task[] { Task.WhenAll(ParallelTasks.ToArray()) });
103 }
104 ///
105 /// 初始化异步队列
106 ///
107 private void InitAsyncQueue(List Ids)
108 {
109 foreach (var item in Ids)
110 {
111 MeterInfo info = new MeterInfo();
112 info.Id = item;
113 AsyncQueues.Enqueue(new MeterAsyncQueue()
114 {
115 MeterInfoTask = info
116 });
117 }
118 }
119 ///
120 /// 是否首次执行任务
121 ///
122 private bool IsInitTask { get; set; }
123 //锁
124 private readonly object _objLock = new object();
125
126 ///
127 /// 开始执行队列任务
128 ///
129 private void HandlingTask()
130 {
131 lock (_objLock)
132 {
133 if (AsyncQueues.Count <= 0)
134 {
135 return;
136 }
137
138 var loopCount = GetAvailableTaskCount();
139 //并发处理队列
140 for (int i = 0; i < loopCount; i++)
141 {
142 HandlingQueue();
143 }
144 IsInitTask = false;
145 }
146 }
147 ///
148 /// 获取队列锁
149 ///
150 private readonly object _queueLock = new object();
151
152 ///
153 /// 处理队列
154 ///
155 private void HandlingQueue()
156 {
157 CancellationToken token = tokenSource.Token;
158 lock (_queueLock)
159 {
160 if (AsyncQueues.Count > 0)
161 {
162 var asyncQueue = AsyncQueues.Dequeue();
163
164 if (asyncQueue == null) return;
165 var task = Task.Factory.StartNew(() =>
166 {
167 if (token.IsCancellationRequested)
168 {
169 return;
170 }
171 //阻止当前线程
172 resetEvent.WaitOne();
173 //执行任务
174 Execute(asyncQueue.MeterInfoTask.Id);
175
176 }, token).ContinueWith(t =>
177 {
178 HandlingTask();
179 }, TaskContinuationOptions.OnlyOnRanToCompletion | TaskContinuationOptions.ExecuteSynchronously);
180 ParallelTasks.Add(task);
181 }
182 }
183 }
184 ///
185 /// 获取当前有效并行的任务数
186 ///
187 ///
188 [MethodImpl(MethodImplOptions.Synchronized)]
189 private int GetAvailableTaskCount()
190 {
191 if (IsInitTask)
192 return ParallelTaskCount;
193 return 1;
194 }
195 }
相关文章
最新文章
热点推荐