24 March 2011

Request or Task Scheduling


I worked on the product which was developed for reconciling bank transaction. That reconciliation was performed on the bases of priority. In order to execute priority based requests from different threads, I have written an scheduler for executing them. Following is the source code of that scheduler.

Source Code.

using System;
using System.Collections.Generic;
using System.Text;
using System.Collections;
using System.Threading;

namespace EXtremecode.Common
{
    //each request must implement this interface. 
    public interface IScheduledProcess
    {
        void Execute();
        
    }
 
 public class PriorityComparer : IComparer<int>
    {
        #region IComparer<int> Members

        public int Compare(int x, int y)
        {
            return x - y;
        }

        #endregion
    }

    public enum PriorityLevel
    {
         High = 10
       , Normal = 5
       , Low = 1

    }
    public class ProcessSchedular
    {
        //for identifying a schedular
        private static Dictionary<string, ProcessSchedular>
            schedularDict = new Dictionary<string, ProcessSchedular>();

        //for priority based execution
        private SortedDictionary<int, Queue<IScheduledProcess>>
            queuesDict = new SortedDictionary<int, Queue<IScheduledProcess>>(
                new PriorityComparer());


        //for controlling termination of main thread.
        private volatile bool isRunning = false;

        //for letting single thread to be executed at a time.
        private AutoResetEvent autoReset = new AutoResetEvent(false);


        //it is used for time based schdular (without priority). All registed process will be executed simultaneously
        public event Action OnExecuteProcess;
        
        
        public static ProcessSchedular CreateInstance(string key)
        {
            ProcessSchedular processSchedular;
            if (schedularDict.ContainsKey(key))
            {
                processSchedular = schedularDict[key];
            }
            else
            {
                processSchedular = new ProcessSchedular();
                schedularDict.Add(key, processSchedular);
            }
            return processSchedular;

        }

        public static ProcessSchedular GetInstance(string key)
        {
            ProcessSchedular processSchedular = null;
            if (schedularDict.ContainsKey(key))
            {
                processSchedular = schedularDict[key];
            }

            return processSchedular;

        }

        
        //Add process to prioriy based schedular with normal priority
        public void AddProcess(IScheduledProcess process)
        {
            AddProcess(process, PriorityLevel.Normal);
        }

        //Add process to prioriy based schedular with provided priority
        public void AddProcess(IScheduledProcess process, PriorityLevel priorityLevel)
        {
            AddProcess(process, (int)priorityLevel);
        }


        [System.Runtime.CompilerServices.MethodImpl
        (System.Runtime.CompilerServices.MethodImplOptions.Synchronized)]
        public void AddProcess(IScheduledProcess process, int priorityLevel)
        {
            GetQueue(priorityLevel).Enqueue(process);
            autoReset.Set(); //every time single thread will me released.
        }


        public void StopScheduling()
        {
            isRunning = false;
        }
        
        public void StartTimeBasedScheduling(object objInterval)
        {
            int interval = (int)objInterval;
            isRunning = true;
            while (isRunning)
            {

                try
                {

                    if (OnExecuteProcess != null)
                    {
                        OnExecuteProcess();
                    }
                    Thread.Sleep(interval);
                }
                catch
                {
                    throw;
                }

            }
          
        }

        public void StartPriorityBasedScheduling()
        {
            isRunning = true;
            while (isRunning)
            {
                while (true)
                {
                    Queue<IScheduledProcess> queue
                        = GetQueueToExecute();

                    if (queue == null)
                    {
                        break;
                        //wait for next processes
                    }
                    else
                    {
                        try
                        {
                            queue.Dequeue().Execute();

                        }
                        catch
                        {
                            //thread should not be terminated
                            //process should handle this exception.
                        }
                    }
                }

                autoReset.WaitOne(15000,false);
            }
        }

        private Queue<IScheduledProcess> GetQueue(int priorityLevel)
        {

            Queue<IScheduledProcess> queue;
            if (queuesDict.ContainsKey(priorityLevel))
            {
                queue = queuesDict[priorityLevel];
            }
            else
            {
                queue = new Queue<IScheduledProcess>();
                queuesDict.Add(priorityLevel, queue);
            }

            return queue;
        }

        private Queue<IScheduledProcess> GetQueueToExecute()
        {
            foreach (int key in queuesDict.Keys)
            {
                if (queuesDict[key].Count > 0)
                {
                    return queuesDict[key];
                }
            }
            return null;
        }
    }
}


Digg Google Bookmarks reddit Mixx StumbleUpon Technorati Yahoo! Buzz DesignFloat Delicious BlinkList Furl

0 comments: on "Request or Task Scheduling"

Post a Comment