I worked on the product which was developed for reconciling bank transaction. That reconciliation was performed on the bases of priority. In order to execute priority based requests from different threads, I have written an scheduler for executing them. Following is the source code of that scheduler.
Source Code.
using System;
using System.Collections.Generic;
using System.Text;
using System.Collections;
using System.Threading;
namespace EXtremecode.Common
{
//each request must implement this interface.
public interface IScheduledProcess
{
void Execute();
}
public class PriorityComparer : IComparer<int>
{
#region IComparer<int> Members
public int Compare(int x, int y)
{
return x - y;
}
#endregion
}
public enum PriorityLevel
{
High = 10
, Normal = 5
, Low = 1
}
public class ProcessSchedular
{
//for identifying a schedular
private static Dictionary<string, ProcessSchedular>
schedularDict = new Dictionary<string, ProcessSchedular>();
//for priority based execution
private SortedDictionary<int, Queue<IScheduledProcess>>
queuesDict = new SortedDictionary<int, Queue<IScheduledProcess>>(
new PriorityComparer());
//for controlling termination of main thread.
private volatile bool isRunning = false;
//for letting single thread to be executed at a time.
private AutoResetEvent autoReset = new AutoResetEvent(false);
//it is used for time based schdular (without priority). All registed process will be executed simultaneously
public event Action OnExecuteProcess;
public static ProcessSchedular CreateInstance(string key)
{
ProcessSchedular processSchedular;
if (schedularDict.ContainsKey(key))
{
processSchedular = schedularDict[key];
}
else
{
processSchedular = new ProcessSchedular();
schedularDict.Add(key, processSchedular);
}
return processSchedular;
}
public static ProcessSchedular GetInstance(string key)
{
ProcessSchedular processSchedular = null;
if (schedularDict.ContainsKey(key))
{
processSchedular = schedularDict[key];
}
return processSchedular;
}
//Add process to prioriy based schedular with normal priority
public void AddProcess(IScheduledProcess process)
{
AddProcess(process, PriorityLevel.Normal);
}
//Add process to prioriy based schedular with provided priority
public void AddProcess(IScheduledProcess process, PriorityLevel priorityLevel)
{
AddProcess(process, (int)priorityLevel);
}
[System.Runtime.CompilerServices.MethodImpl
(System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
public void AddProcess(IScheduledProcess process, int priorityLevel)
{
GetQueue(priorityLevel).Enqueue(process);
autoReset.Set(); //every time single thread will me released.
}
public void StopScheduling()
{
isRunning = false;
}
public void StartTimeBasedScheduling(object objInterval)
{
int interval = (int)objInterval;
isRunning = true;
while (isRunning)
{
try
{
if (OnExecuteProcess != null)
{
OnExecuteProcess();
}
Thread.Sleep(interval);
}
catch
{
throw;
}
}
}
public void StartPriorityBasedScheduling()
{
isRunning = true;
while (isRunning)
{
while (true)
{
Queue<IScheduledProcess> queue
= GetQueueToExecute();
if (queue == null)
{
break;
//wait for next processes
}
else
{
try
{
queue.Dequeue().Execute();
}
catch
{
//thread should not be terminated
//process should handle this exception.
}
}
}
autoReset.WaitOne(15000,false);
}
}
private Queue<IScheduledProcess> GetQueue(int priorityLevel)
{
Queue<IScheduledProcess> queue;
if (queuesDict.ContainsKey(priorityLevel))
{
queue = queuesDict[priorityLevel];
}
else
{
queue = new Queue<IScheduledProcess>();
queuesDict.Add(priorityLevel, queue);
}
return queue;
}
private Queue<IScheduledProcess> GetQueueToExecute()
{
foreach (int key in queuesDict.Keys)
{
if (queuesDict[key].Count > 0)
{
return queuesDict[key];
}
}
return null;
}
}
}

0 comments: on "Request or Task Scheduling"
Post a Comment