频道栏目
首页 > 程序开发 > 软件开发 > C++ > 正文
线程池任务队列
2012-09-23 08:04:00           
收藏   我要投稿
[delphi]
unit uPool; 
 
{***********************************************************************
 
                       线程池+任务队列
 
         整个线程池调度图
         ==========================================================
         |  -----   ----------------------                         |
         |  |空 |   | 任务队列   ←----    | ⑴                     |
         |  |闲 |   ----------------------                         |
         |  |线 |     ↑空闲线程检查队列是否有任务                  |
         |  |程 |--①-- 有任务要执行时,加入到工作队列              |
         |  |队 |            |                                     |
         |  |列 |            ↓②               ----------------    |
         |  |   |   -----------------------    |  自动回收空   |   |
         |  |   |   |正在工作队列          |   |  闲定时器     |   |
         |  |   |   -----------------------    ----------------    |
         |  |   |     ③     | 任务做完后              |           |
         |  ----- ←----------| 调度到空闲队列          |           |
         |    |                                        |           |
         |    -----------------------------------------|           |
         |             ④定时回收空闲线程                          |
         |                                                         |
         ==========================================================
 
         使用方法:
 
         pool = TThreadPool.Create;
         pool.MinNums := 2; //最小线程
         pool.MaxNums := 6; //最大线程
         pool.TasksCacheSize := 10; //任务缓冲队列
 
         上面创建好之后,就可以往池中放任务
 
         pool.AddWorkTask(Task);
 
         线程池就开始工作了。
         同时线程池支持对任务进行优先级排序,排序算法默认
         为快速序,也可以外问进行队列排序
 
         这里把任务和池分开了。
         使用任务时,需要继承TWorkTask进开自己的任务设计。
         然后重写exectask;方法。如果方法中要进行毫时循环,
         请见如下例子;
         for i := 0 to 5000 do
          begin
            if tk.WorkState = tsFinished then break;
              inc(k);
              //caption := inttostr(k);
            edit2.Text := inttostr(k);
          end;
 
         如:TWirteFileTask = Class(TWorkTask);
 
 
        作者:边缘
        @RightCopy fsh
        QQ: 19985430
        date: 2012-09-22
        Email:fengsh998@163.com
***********************************************************************} 
 
interface 
 
uses 
    Classes,Windows,SysUtils,Messages,SyncObjs; 
 
Const 
    PRE_NUM = 5; 
    MAX_NUM = 100; 
    AUTO_FREE = 2; 
    MAX_TASKNUM = 100; 
    ONEMINUTE = 10000;//60000; 
 
  type 
    TLogLevel = (lDebug,lInfo,lError); 
 
    ILog = interface 
      procedure WriteLog(Const Msg:String;Level:TLogLevel = lDebug); 
    end; 
 
    TPoolLog = Class(TInterfacedObject,ILog) 
       private 
          procedure WriteLog(Const Msg:String;Level:TLogLevel = lDebug); 
       public 
          procedure OutputLog(Const Msg:String;Level:TLogLevel);virtual; 
    End; 
 
 
    Thandles = Array of Cardinal; 
 
    //任务级别  优先级高的任务先执行。 
    TTaskLevel = (tlLower,tlNormal,tlHigh); 
    TTaskState = (tsNone,tsDoing,tsWaiting,tsReStart,tsStop,tsFinished); 
    TWorkTask = Class 
       private 
          Work:TThread; 
          //任务ID 
          hTask:TCriticalSection; 
          FWorkId:Cardinal; 
          FWorkName:String; 
          FWorkLevel:TTaskLevel; //默认为普通 
          FWorkState : TTaskState; 
          procedure setWorkState(Const Value:TTaskState); 
       public 
          Constructor Create; 
          Destructor Destroy;override; 
          procedure execTask;virtual; abstract; 
          property WorkId:Cardinal read FWorkId write FWorkId; 
          property WorkName:String read FWorkName write FWorkName; 
          property WorkLevel:TTaskLevel read FWorkLevel write FWorkLevel; 
          property WorkState : TTaskState read FWorkState write setWorkState; 
    End; 
 
    TWorkTaskQueue = Array of TWorkTask; 
     
    TThreadPool = Class; 
 
    TWorkThreadState = (wtIdle,wtRunning,wtStop,wtFinished); 
    //工作线程(单个线程一次只能处理一个task) 
    TWorkThread = Class(TThread) 
      private 
        FPool:TThreadPool; 
        FState:TWorkThreadState; 
        procedure SetDefault; 
      protected 
        procedure Execute;override; 
      public 
        Constructor Create(Const pool:TThreadPool); 
        property State : TWorkThreadState read FState write FState; 
    End; 
 
    TWorkThreadQueue = Array of TWorkThread; 
 
    //查看缓冲情况事件 
    TListenCacheInfoEvent = procedure (Sender:TObject;Const IdleCount,BusyCount,TaskCount:Integer) of Object; 
    TTaskQueueFullEvent = procedure (Sender:TObject) of Object; 
    //任务处理完后 
    TTaskFinishedEvent = procedure (Const cTast:TWorkTask) of object; 
    //任务准备被处理前事件 
    TTaskWillDoBeforeEvent = procedure (Const thId:Cardinal;Const cTast:TWorkTask) of Object; 
    //外部排序任务队列算法,默认为快速排序,可自行在外部定制算法。 
    TSortTaskQueueEvent = procedure (Sender:TObject;var taskQueue:TWorkTaskQueue) of object; 
 
    TThreadPool = Class 
     private 
       Log:TPoolLog; 
       //自动回收标识 
       FAuto:Boolean; 
       //定时等待控制 
       FWaitFlag:Boolean; 
       //表示正在用于等待回收到的线程 
       Waiting:TWorkThread; 
       //提取任务通知信号 
       entTaskNotify:Tevent; 
       //时间事件HANDLE 
       hTimeJump:Cardinal; 
       //是否排序任务队列 
       FSorted:Boolean; 
       //对空闲队列操作锁 
       hIDleLock:TCriticalSection; 
       //对正在进行的线程锁 
       hBusyLock:TCriticalSection; 
       //任务队列锁 
       hTaskLock:TCriticalSection; 
       //预设线程数 默认为5 发现忙不过来时才进行自增直到Max 
       FMinNums:Integer; 
       //最大限制线程数,默认为100 
       FMaxNums:Integer; 
       //任务队列缓冲大小 默认100 
       FTasksCache:Integer; 
       //当线程空闲时间长达XX时自动回收 :单位为分钟 
       FRecoverInterval:Integer; 
       //是否允许队列中存在重复任务 (同一任务时要考虑线程同步),默认为否 
       FIsAllowTheSameTask:Boolean; 
       //任务队列 (不释放外部任务) 最大100个任务。当大于100个任务时,需要等待 
       //每抽取一个任务,立即从队列中删除。 
       TaskQueue:TWorkTaskQueue; 
       //工作线程 
       BusyQueue:TWorkThreadQueue; 
       //空闲线程 
       IdleQueue:TWorkThreadQueue; 
 
       //************************事件回调**********************// 
       //排序队列回调 
       FOnSortTask:TSortTaskQueueEvent; 
       FOnTaskWillDo:TTaskWillDoBeforeEvent; 
       FOnTaskFinished:TTaskFinishedEvent; 
       FOnTaskFull:TTaskQueueFullEvent; 
       FOnListenInfo:TListenCacheInfoEvent; 
       //*****************************************************// 
        
       //************************Get/Set操作*******************// 
       procedure SetMinNums(Const Value:Integer); 
       function getTaskQueueCount: Integer; 
       function getBusyQueueCount: Integer; 
       function getIdleQueueCount: Integer; 
       //*****************************************************// 
 
       //***********************同步量处理********************// 
       procedure CreateLock; 
       procedure FreeLock; 
       //*****************************************************// 
 
       //设置初值 
       procedure SetDefault; 
       //处理回调 
       procedure DoTaskFull; 
 
       //********************线程队列操作**********************// 
       //清空线程队列 
       procedure ClearQueue(var Queue:TWorkThreadQueue); 
       //得到队列的长度 
       function QueueSize(Const Queue:TWorkThreadQueue):Integer; 
       //调整队列 
       procedure DelQueueOfIndex(var Queue:TWorkThreadQueue;Const Index:Integer); 
       //移动队列; 
       procedure MoveQueue(Const wt:TWorkThread;flag:Integer); 
       //移除某个线程 
       procedure RemoveFromQueue(var Queue:TWorkThreadQueue;Const re:TWorkThread); 
       //*****************************************************// 
 
       //********************任务队列操作**********************// 
       //排序队列将优先级高的排前面。//可以交给外问进行排序算法 
       procedure SortTask(var Queue:TWorkTaskQueue); 
       //调整队列 
       procedure DelTaskOfIndex(var Queue:TWorkTaskQueue;Const Index:Integer); 
       //获取队列大小 
       function TaskSzie(Const Queue:TWorkTaskQueue):Integer; 
       //*****************************************************// 
       //查找任务(如果有好的算法,哪更高效) 
       function FindTask(Const tsk:TWorkTask):Integer; 
       //快速排序 
       procedure QuikeSortTask(var Queue:TWorkTaskQueue;Const s,e:Integer); 
       //自动回收空闲线程 
       procedure RecoverIDle(Const wait:TWorkThread); 
       //交换任务 
       procedure switch(var Queue: TWorkTaskQueue; m, n: Integer); 
       //判断当前运行线程是否使用在等待自动回收 
       function WaitAutoRecover(Const curThread:TWorkThread):Boolean; 
     protected 
       //求最小值 
       function Smaller(Const expresion:Boolean;Const tureValue,falseValue:Integer):Integer; 
       //按照先进选出进行提取任务 
       function PickupTask:TWorkTask; 
       //创建空闲线程 
       procedure CreateIdleThread(Const Nums:Integer = 1); 
       //添加到空闲线程队列 
       procedure AddThreadToIdleQueue(Const idle:TWorkThread); 
       //添加到工作队列 
       procedure AddThreadToBusyQueue(Const busy:TWorkThread); 
       //发送新任务到达信号 
       procedure PostNewTaskSign; 
        
     public 
       Constructor Create; 
       Destructor Destroy;override; 
       //***********************线程池管理方法******************************// 
       //停止执行的任务 
       procedure StopAll; 
       //开始任务 
       procedure StartAll; 
       //清空任务 
       procedure CleanTasks; 
       //运行中不能进行对调 
       function  SwitchTasks(Const aTask,bTask:TWorkTask):Boolean; 
       //移除某个任务 
       procedure RemoveTask(Const tk:TWorkTask);//只允许移除未执行的任务 
       //需要外部定时调用来得到动态数据效果 
       procedure ListenPool; 
       //******************************************************************// 
       //添加任务 
       function AddWorkTask(Const wtask:TWorkTask):Integer; 
 
       property MinNums:Integer read FMinNums write SetMinNums; 
       property MaxNums:Integer read FMaxNums write FMaxNums; 
       property TasksCacheSize:Integer read FTasksCache write FTasksCache; 
       property RecoverInterval:Integer read FRecoverInterval 
                write FRecoverInterval; 
       property IsAllowTheSameTask:Boolean read FIsAllowTheSameTask 
                write FIsAllowTheSameTask; 
       property Sorted:Boolean read FSorted write FSorted; 
       property TaskQueueCount:Integer read getTaskQueueCount; 
       property IdleQueueCount:Integer read getIdleQueueCount; 
       property BusyQueueCount:Integer read getBusyQueueCount; 
       property OnSortTask:TSortTaskQueueEvent read FOnSortTask write FOnSortTask; 
       property OnTaskWillDo:TTaskWillDoBeforeEvent read FOnTaskWillDo write FOnTaskWillDo; 
       property OnTaskFinished:TTaskFinishedEvent read FOnTaskFinished write FOnTaskFinished; 
       property OnTaskFull:TTaskQueueFullEvent read FOnTaskFull write FOnTaskFull; 
       property OnListenInfo:TListenCacheInfoEvent read FOnListenInfo write FOnListenInfo; 
    End; 
 
implementation 
 
{ TThreadPool } 
 
constructor TThreadPool.Create; 
var 
  tpError:Cardinal; 
begin 
   Log:=TPoolLog.Create; 
   SetDefault; 
   CreateLock; 
 
   tpError := 0; 
 
   entTaskNotify:=Tevent.create(nil,false,false, 'TaskNotify');//事件信号 
   hTimeJump := CreateEvent(nil,False,False,'Timer');//自动回收心跳事件 
   if hTimeJump = 0 then 
       tpError := GetLastError; 
        
   //the same name of sign exists. 
   Case tpError of 
     ERROR_ALREADY_EXISTS: 
                       begin 
                          hTimeJump := 0; 
                          Log.WriteLog('CreateTimerEvent Fail,the Same Name of Event Exists'); 
                       end; 
   End; 
   //预创建线程 
   CreateIdleThread(FMinNums); 
   Log.WriteLog('Thread Pool start run.',lInfo); 
end; 
 
destructor TThreadPool.Destroy; 
begin 
   ClearQueue(IdleQueue); 
   ClearQueue(BusyQueue); 
   FreeLock; 
   if hTimeJump > 0 then 
      CloseHandle(hTimeJump); 
   entTaskNotify.Free; 
   Log.Free; 
  inherited; 
  Log.WriteLog('Thread Pool end run.',lInfo); 
end; 
 
procedure TThreadPool.DoTaskFull; 
begin 
   if Assigned(FOnTaskFull) then 
      FOnTaskFull(self); 
end; 
 
procedure TThreadPool.SetDefault; 
begin 
   FMinNums := PRE_NUM; 
   FMaxNums := MAX_NUM; 
   FTasksCache := MAX_TASKNUM; 
   FRecoverInterval := AUTO_FREE; 
   FIsAllowTheSameTask := False; 
   FAuto :=False; 
   FWaitFlag := True; 
   Waiting := nil; 
   FSorted := False; 
end; 
 
procedure TThreadPool.CreateLock; 
begin 
   hIDleLock := TCriticalSection.Create; 
   hBusyLock := TCriticalSection.Create; 
   hTaskLock := TCriticalSection.Create; 
end; 
 
procedure TThreadPool.FreeLock; 
begin 
   hIDleLock.Free; 
   hBusyLock.Free; 
   hTaskLock.Free; 
end; 
 
function TThreadPool.getBusyQueueCount: Integer; 
begin 
   Result := QueueSize(BusyQueue); 
end; 
 
function TThreadPool.getIdleQueueCount: Integer; 
begin 
   Result := QueueSize(IdleQueue); 
end; 
 
function TThreadPool.getTaskQueueCount: Integer; 
begin 
   Result := TaskSzie(TaskQueue); 
end; 
 
procedure TThreadPool.CleanTasks; 
begin 
   hTaskLock.Enter; 
   SetLength(TaskQueue,0); 
   hTaskLock.Leave; 
end; 
 
procedure TThreadPool.ListenPool; 
begin 
   //正在执行任务的线程,空闲线程,队列中任务数 
   if Assigned(FOnListenInfo) then 
      FOnListenInfo(self,IdleQueueCount,BusyQueueCount,TaskQueueCount); 
end; 
 
procedure TThreadPool.ClearQueue(var Queue: TWorkThreadQueue); 
var 
   i:Integer; 
   sc:Integer; 
begin 
   sc := Length(Queue); 
   for i := 0 to sc - 1 do 
   begin 
       TWorkThread(Queue[i]).Terminate; 
       PostNewTaskSign; 
       //TWorkThread(Queue[i]).Free; //如果FreeOnTerminate为TRUE就不要使用这句了。 
   end; 
   SetLength(Queue,0); 
end; 
 
procedure TThreadPool.SetMinNums(const Value: Integer); 
begin 
   if Value = 0 then 
      FMinNums := PRE_NUM 
   else if FMinNums > Value then 
        begin 
          //先清容再创建 
          FMinNums := Value; 
          ClearQueue(IDleQueue); 
        end 
   else 
     FMinNums := Value; 
 
   CreateIdleThread(FMinNums); 
   Log.WriteLog('Reset MinNums Numbers is ' + inttostr(FMinNums) + ' .',lInfo); 
end; 
 
 
function TThreadPool.Smaller(const expresion: Boolean; const tureValue, 
  falseValue: Integer): Integer; 
begin 
   if expresion then 
      result := tureValue 
   else 
      result := falseValue; 
end; 
 
procedure TThreadPool.DelQueueOfIndex(var Queue: TWorkThreadQueue; 
  const Index: Integer); 
var 
   i:integer; 
   ic:integer; 
begin 
   ic := Length(Queue); 
   for i := Index to ic - 1 do 
       Queue[i] := Queue[i+1]; 
 
   setLength(Queue,ic-1); 
end; 
 
procedure TThreadPool.DelTaskOfIndex(var Queue: TWorkTaskQueue; 
  const Index: Integer); 
var 
   i:integer; 
   ic:integer; 
begin 
   ic := length(Queue); 
   for i := Index to ic -1 do 
       Queue[i] := Queue[i+1]; 
 
   setLength(Queue,ic-1); 
end; 
 
procedure TThreadPool.MoveQueue(const wt: TWorkThread; flag: Integer); 
var 
    k:integer; 
begin 
   if flag = 0 then 
   begin 
     hIDleLock.Enter; 
     for k := Low(IdleQueue) to High(IdleQueue) do 
     begin 
        if IdleQueue[k]=wt then 
        begin 
           AddThreadToBusyQueue(wt); 
           DelQueueOfIndex(IdleQueue,k); 
        end; 
     end; 
     hIDleLock.Leave; 
   end 
   else 
   begin 
     hBusyLock.Enter; 
     for k := Low(BusyQueue) to High(BusyQueue) do 
     begin 
        if BusyQueue[k]=wt then 
        begin 
           AddThreadToIdleQueue(wt); 
           DelQueueOfIndex(BusyQueue,k); 
        end; 
     end; 
     hBusyLock.Leave; 
   end; 
end; 
 
function TThreadPool.SwitchTasks(const aTask, bTask: TWorkTask): Boolean; 
var 
   aIndex,bIndex:Integer; 
begin 
   Result := true; 
   hTaskLock.Enter; 
   aIndex := FindTask(aTask); 
   bIndex := FindTask(bTask); 
    
   if (aIndex = -1) or (bIndex = -1) then 
   begin 
      Result := false; 
      hTaskLock.Leave; 
      exit; 
   end; 
   switch(TaskQueue,aIndex,bIndex); 
   hTaskLock.Leave; 
end; 
 
function TThreadPool.TaskSzie(const Queue: TWorkTaskQueue): Integer; 
begin 
   Result := Length(Queue); 
end; 
 
function TThreadPool.WaitAutoRecover(const curThread: TWorkThread): Boolean; 
begin 
   Result := Waiting = curThread; 
end; 
 
procedure TThreadPool.CreateIdleThread(const Nums: Integer); 
var 
   WorkThread:TWorkThread; 
   i:integer; 
begin 
   hIDleLock.Enter; 
   for i := 0 to Nums - 1 do 
   begin 
     WorkThread := TWorkThread.Create(self); 
     WorkThread.FreeOnTerminate := true; 
     AddThreadToIdleQueue(WorkThread); 
   end; 
   hIDleLock.Leave; 
end; 
 
procedure TThreadPool.AddThreadToBusyQueue(const busy: TWorkThread); 
var 
   sz:integer; 
begin 
   sz := QueueSize(BusyQueue); 
   setLength(BusyQueue,sz + 1); 
   BusyQueue[sz] := busy; 
end; 
 
procedure TThreadPool.AddThreadToIdleQueue(const idle: TWorkThread); 
var 
   sz:integer; 
begin 
   sz := Length(IdleQueue); 
   setLength(IdleQueue,sz + 1); 
   IdleQueue[sz] := idle; 
end; 
 
function TThreadPool.PickupTask: TWorkTask; 
begin 
   //先排序再取 
   hTaskLock.enter; 
 
   if FSorted then 
      SortTask(TaskQueue); 
 
   if length(TaskQueue) > 0 then 
   begin 
      Result := TaskQueue[0]; 
      DelTaskOfIndex(TaskQueue,0); 
   end 
   else 
      Result := Nil; 
   hTaskLock.Leave; 
end; 
 
function TThreadPool.AddWorkTask(Const wtask: TWorkTask):Integer; 
var 
   sz,ic,bc:Integer; 
begin 
   sz := Length(TaskQueue); 
   if sz >= FTasksCache  then 
   begin 
      Result := -1; 
      DoTaskFull; 
      exit; 
   end; 
 
   setLength(TaskQueue,sz+1); 
   wtask.WorkState := tsWaiting; 
   TaskQueue[sz] := wtask; 
 
   Result := sz + 1; 
 
   //未达到最大线程数时增加 
   ic := IdleQueueCount; 
   bc := BusyQueueCount; 
 
   //最大只能ic + bc = MaxNums 
   if (ic = 0) and (ic+ bc < FMaxNums) then 
      CreateIdleThread(); 
       
   FAuto := True; 
   //通知线程去取任务 
   PostNewTaskSign; 
   Log.WriteLog('Add a task to queue.',lInfo); 
end; 
 
function TThreadPool.FindTask(const tsk: TWorkTask): Integer; 
var 
   l:Integer; 
begin 
   Result := -1; 
   for l := Low(TaskQueue) to High(TaskQueue) do 
       if TaskQueue[l] = tsk then 
       begin 
         Result := l; 
         Break; 
       end; 
end; 
 
procedure TThreadPool.PostNewTaskSign; 
begin 
   entTaskNotify.SetEvent; 
end; 
 
procedure TThreadPool.switch(var Queue:TWorkTaskQueue;m,n:Integer); 
var 
 tem:TWorkTask; 
begin 
  tem := Queue[m]; 
  Queue[m] := Queue[n]; 
  Queue[n] := tem; 
end; 
 
procedure TThreadPool.QuikeSortTask(var Queue: TWorkTaskQueue; const s, 
  e: Integer); 
var 
   key:Integer; 
   k,j:Integer; 
begin 
   key := ord(Queue[s].WorkLevel); 
 
   if s > e then exit; 
 
   k := s; 
   j := e; 
 
   while (k <> j) do 
   begin 
     while (k < j) and (ord(Queue[j].WorkLevel) <= key) do //如果排序从小到大时改为 >= 
         dec(j); 
     switch(Queue,k,j); 
 
     while (k < j) and (ord(Queue[k].WorkLevel) >= key) do //如果排序从小到大时改为 <= 
         inc(k); 
     Switch(Queue,j,k); 
   end; 
 
   if s < k-1 then 
      QuikeSortTask(Queue,s,k-1); 
   if k+1 < e then 
      QuikeSortTask(Queue,k+1,e); 
end; 
 
procedure TThreadPool.SortTask(var Queue: TWorkTaskQueue); 
var 
   f,l:Integer; 
   ic:Integer; 
begin 
   ic := Length(Queue); 
   if ic = 0 then exit; 
    
   if Assigned(FOnSortTask) then 
      FOnSortTask(self,Queue) 
   else 
   begin 
      f := 0; 
      l := ic-1; 
      QuikeSortTask(Queue,f,l); 
   end; 
end; 
 
procedure TThreadPool.StartAll; 
var 
   I:Integer; 
begin 
   hBusyLock.Enter; 
   for I := Low(BusyQueue) to High(BusyQueue) do 
   begin 
     BusyQueue[i].Resume; 
     BusyQueue[i].State := wtRunning; 
   end; 
   hBusyLock.Leave; 
 
   hIDleLock.Enter; 
   for I := Low(IdleQueue) to High(IdleQueue) do 
   begin 
     IdleQueue[i].Resume; 
     IdleQueue[i].State := wtRunning; 
   end; 
   hIDleLock.Leave; 
end; 
 
procedure TThreadPool.StopAll; 
var 
   I:Integer; 
begin 
   hBusyLock.Enter; 
   for I := Low(BusyQueue) to High(BusyQueue) do 
   begin 
     BusyQueue[i].Suspend; 
     BusyQueue[i].State := wtStop; 
   end; 
   hBusyLock.Leave; 
 
   hIDleLock.Enter; 
   for I := Low(IdleQueue) to High(IdleQueue) do 
   begin 
     IdleQueue[i].Suspend; 
     IdleQueue[i].State := wtStop; 
   end; 
   hIDleLock.Leave; 
end; 
 
function TThreadPool.QueueSize(const Queue: TWorkThreadQueue):Integer; 
begin 
  Result := Length(Queue); 
end; 
 
//每次只留单线程进行空闲回收等待 
procedure TThreadPool.RecoverIDle(Const wait:TWorkThread); 
var 
   k:Integer; 
begin 
   FAuto:=False; 
   //等待时间超时 
   FWaitFlag := False; 
   Waiting := wait; 
   hBusyLock.Enter; 
   RemoveFromQueue(BusyQueue,wait); 
   hBusyLock.Leave; 
   //补给一个空闲线程 
   CreateIdleThread(); 
   WaitforSingleObject(hTimeJump,FRecoverInterval*ONEMINUTE); 
 
   //满足空闲时间到后并且空闲线程大于零,没有线程在执行任务,及任务队列为空 
   if (IdleQueueCount > 0) 
      and (BusyQueueCount = 0) //正在等待的是清空空闲线程 
      and (TaskQueueCount = 0) then 
   begin 
      hTaskLock.Enter; 
      //回收到最小设定线程 
      for k := High(IdleQueue) Downto FMinNums do 
      begin 
         TWorkThread(IdleQueue[k]).Terminate; 
         PostNewTaskSign; 
      end; 
      SetLength(IdleQueue,FMinNums); 
      hTaskLock.Leave; 
   end; 
   //定时完后线程释放 
   wait.Terminate; 
   FWaitFlag := True; 
end; 
 
procedure TThreadPool.RemoveFromQueue(var Queue: TWorkThreadQueue; 
  const re: TWorkThread); 
var 
   index ,i: integer; 
begin 
   index := -1; 
   for i := Low(Queue) to High(Queue) do 
   begin 
       if Queue[i] = re then 
       begin 
          index := i; 
          break; 
       end; 
   end; 
    
   if Index<>-1 then 
      DelQueueOfIndex(Queue,index); 
end; 
 
procedure TThreadPool.RemoveTask(const tk: TWorkTask); 
var 
   index:Integer; 
begin 
   index := FindTask(tk); 
   if index = -1 then Exit; 
   hTaskLock.Enter; 
   DelTaskOfIndex(TaskQueue,index); 
   hTaskLock.Leave; 
end; 
 
{ TWorkThread } 
 
constructor TWorkThread.Create(const pool: TThreadPool); 
begin 
   FPool := pool; 
   SetDefault; 
   inherited Create(false); 
end; 
 
procedure TWorkThread.Execute; 
var 
  hd:Array[0..0] of Cardinal; 
  ret:Cardinal; 
  task:TWorkTask; 
  nc:Integer; 
begin 
   //不断的在任务队列中取任务 
   hd[0]:= fPool.entTaskNotify.Handle; 
   while not Terminated do 
   begin 
      //跟踪时为什么会暂停不了,是因为前面在设置MinNums时有信号增加 
      ret := WaitForMultipleObjects(1,@hd,false,INFINITE); 
 
      if Terminated then break; 
 
      Case ret - WAIT_OBJECT_0 of 
      WAIT_OBJECT_0: 
           begin 
                if state <> wtRunning then  
                begin 
                    try 
                      //抽取一个任务 
                      task := FPool.PickupTask; 
 
                      if assigned(task) then 
                      begin 
                         //任务启动前 
                         if Assigned(fPool.FOnTaskWillDo) then 
                            fPool.FOnTaskWillDo(self.ThreadID,task); 
 
                         //需要线程同步,以防正在执行的任务被其它线程执行。 
                         task.hTask.Enter; 
                         //当有任务做时,将自己移到工作队列中 
                         fPool.MoveQueue(self,0); 
                         state := wtRunning; 
                         //指定执行线程 
                         task.Work := self; 
                         task.WorkState := tsDoing; 
                         task.execTask; 
                         state := wtFinished; 
                         task.WorkState := tsFinished; 
                         task.Work := nil; 
                         task.hTask.leave; 
                         //任务完成 
                         if Assigned(fPool.FOnTaskFinished) then 
                            fPool.FOnTaskFinished(task); 
                      end; 
 
                    finally 
 
                    end; 
 
                end; 
           end; 
         WAIT_OBJECT_0 + 1:;//Terminate  don't to do something 
      End; 
 
      nc := fPool.TaskQueueCount; 
      if (nc > 0) then 
        fpool.PostNewTaskSign 
      else if (fPool.FAuto) and (fPool.FWaitFlag) then 
         fPool.RecoverIDle(self);//任务空闲,线程空闲时间大于设定时间时自动回收空闲线程 
 
      state := wtIdle; 
      //将自己移至空闲线程 
      if not fPool.WaitAutoRecover(self) then //如果当前正在等待自动回收线程的 
         fPool.MoveQueue(self,1) 
      else 
         fPool.Waiting := nil; 
   end; 
end; 
 
procedure TWorkThread.SetDefault; 
begin 
   FState := wtIdle; 
end; 
 
{ TWorkTask } 
 
constructor TWorkTask.Create; 
begin 
   hTask := TCriticalSection.Create; 
   WorkState := tsNone; 
   FWorkLevel := tlNormal; 
   Work := nil; 
end; 
 
destructor TWorkTask.Destroy; 
begin 
   WorkState := tsFinished; 
   if Assigned(Work) then 
      Work.Resume; 
   hTask.Free; 
  inherited; 
end; 
 
procedure TWorkTask.setWorkState(Const Value:TTaskState); 
begin 
 
   FWorkState := Value; 
 
   case value of 
     tsReStart: 
          begin 
            if Assigned(Work) and (Work.Suspended)  then 
            begin 
                FWorkState := tsDoing; 
                Work.Resume; 
            end; 
          end; 
     tsStop: 
          begin 
            if Assigned(Work) then 
                Work.Suspend; 
          end; 
   end; 
end; 
 
{ TPoolLog } 
 
procedure TPoolLog.OutputLog(const Msg: String; Level: TLogLevel); 
begin 
   // to implement at sub class. 
end; 
 
procedure TPoolLog.WriteLog(const Msg: String; Level: TLogLevel); 
var 
   dt:TDatetime; 
begin 
   dt := now; 
   OutputLog(datetimetostr(dt) + ' : ' + Msg,Level); 
end; 
 
end. 
点击复制链接 与好友分享!回本站首页
相关TAG标签 队列 线程 任务
上一篇:C++与C#对比学习:模板,泛型
下一篇:C++与C#对比学习:让对象变得像常量
相关文章
图文推荐
点击排行

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站