I worked on the product which was developed for reconciling bank transaction. That reconciliation was performed on the bases of priority. In order to execute priority based requests from different threads, I have written an scheduler for executing them. Following is the source code of that scheduler.
Source Code.
using System; using System.Collections.Generic; using System.Text; using System.Collections; using System.Threading; namespace EXtremecode.Common { //each request must implement this interface. public interface IScheduledProcess { void Execute(); } public class PriorityComparer : IComparer<int> { #region IComparer<int> Members public int Compare(int x, int y) { return x - y; } #endregion } public enum PriorityLevel { High = 10 , Normal = 5 , Low = 1 } public class ProcessSchedular { //for identifying a schedular private static Dictionary<string, ProcessSchedular> schedularDict = new Dictionary<string, ProcessSchedular>(); //for priority based execution private SortedDictionary<int, Queue<IScheduledProcess>> queuesDict = new SortedDictionary<int, Queue<IScheduledProcess>>( new PriorityComparer()); //for controlling termination of main thread. private volatile bool isRunning = false; //for letting single thread to be executed at a time. private AutoResetEvent autoReset = new AutoResetEvent(false); //it is used for time based schdular (without priority). All registed process will be executed simultaneously public event Action OnExecuteProcess; public static ProcessSchedular CreateInstance(string key) { ProcessSchedular processSchedular; if (schedularDict.ContainsKey(key)) { processSchedular = schedularDict[key]; } else { processSchedular = new ProcessSchedular(); schedularDict.Add(key, processSchedular); } return processSchedular; } public static ProcessSchedular GetInstance(string key) { ProcessSchedular processSchedular = null; if (schedularDict.ContainsKey(key)) { processSchedular = schedularDict[key]; } return processSchedular; } //Add process to prioriy based schedular with normal priority public void AddProcess(IScheduledProcess process) { AddProcess(process, PriorityLevel.Normal); } //Add process to prioriy based schedular with provided priority public void AddProcess(IScheduledProcess process, PriorityLevel priorityLevel) { AddProcess(process, (int)priorityLevel); } [System.Runtime.CompilerServices.MethodImpl (System.Runtime.CompilerServices.MethodImplOptions.Synchronized)] public void AddProcess(IScheduledProcess process, int priorityLevel) { GetQueue(priorityLevel).Enqueue(process); autoReset.Set(); //every time single thread will me released. } public void StopScheduling() { isRunning = false; } public void StartTimeBasedScheduling(object objInterval) { int interval = (int)objInterval; isRunning = true; while (isRunning) { try { if (OnExecuteProcess != null) { OnExecuteProcess(); } Thread.Sleep(interval); } catch { throw; } } } public void StartPriorityBasedScheduling() { isRunning = true; while (isRunning) { while (true) { Queue<IScheduledProcess> queue = GetQueueToExecute(); if (queue == null) { break; //wait for next processes } else { try { queue.Dequeue().Execute(); } catch { //thread should not be terminated //process should handle this exception. } } } autoReset.WaitOne(15000,false); } } private Queue<IScheduledProcess> GetQueue(int priorityLevel) { Queue<IScheduledProcess> queue; if (queuesDict.ContainsKey(priorityLevel)) { queue = queuesDict[priorityLevel]; } else { queue = new Queue<IScheduledProcess>(); queuesDict.Add(priorityLevel, queue); } return queue; } private Queue<IScheduledProcess> GetQueueToExecute() { foreach (int key in queuesDict.Keys) { if (queuesDict[key].Count > 0) { return queuesDict[key]; } } return null; } } }
0 comments: on "Request or Task Scheduling"
Post a Comment