Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / database / SQL-Server

SQLXAgent - Jobs for SQL Express - Part 4 of 6

5.00/5 (3 votes)
30 Sep 2017CPOL6 min read 7.9K   33  
Create and run jobs kinda like SQL Server Enterprise - Job Scheduling Code

Part 1 - Using SQLXAgent
Part 2 - Architecture and Design Decisions
Part 3 - The CSV and Excel Importer Code
Part 4 - Job Scheduling Code (this article)
Part 5 - How Packages Are Run
Part 6 - Interesting Coding

Introduction

This article is the 4th part of a larger article series describing the SQLXAgent utility. In this article, I'll be describing the job schedule handling code. This was, without a doubt, the most difficult part of the entire solution. I think I had to refactor it half a dozen times before I got it right.

Due to the almost limitless number of scedule permutations available, the most practical way to test the scheduling piece was to create a test app that allowed you to generate the future job execution date/times without actually running the jobs.

This article specifically discusses code found in the SQLXThreadManager assembly, and only code dealing with the jobs as they are processed by the SQLXAgentSvc application. In the code snippets below, most of the calls to DebugMsgs were removed in the interest of brevity, and formatting was modified to in an attempt to eliminate horizontal scroll bars.

NOTE: Code snippets presented in the article may or may not reflect the absolutely latest and greatest version of the code. In the event that it does not exactly match the actual code, it will fairly close. Such is the nature of noticing issues while writing an article.

Welcome to System.Reactive (aka "Rx")

Coming up with a suitable model for running the scheduled jobs was a pain. At first, I tried using threads, and it quickly became a convoluted mess. Next I tried the TPL library, and in some ways, that was even worse. Then I stumbled on the System.Reactive (referred to as Rx in this article) library. With Rx, I was able to significantly reduce the amount and complexity of the code and the load on the CPU necessary to track the schedule and perform the work required by the jobs.

With Rx, I set up a timer, and then subscribed to what amounts to the "tick" event. I wanted to make each job responsible for its own timer because job schedules don't necessarily run on even intervals. Here's the basic flowchart that describes schedule processing.

In point of fact, I use Rx to perform the following duties:

  • Keep the service alive. Despite having the JobThreadManager running, I wanted the service to be able to keep itself alive while everything else was at rest and waiting for the desired time. This timer runs every second. No actual work is performed.
     
  • Watch for config file changes in the job manager. The job manager monitors the config file for changes so that it knows when job definitions have changed. When you use SQLXAgent to add/delete or otherwise change jobs and/or steps, that application saves the config file. This triggers a "job refresh" event to occur. Only jobs that were added, removed or changed are affected.
     
  • Establishes and waits for the next execution date/time for a given job. When the "tick" event occurs, the job is executed.
     

It is far outside the scope of this article to provide anything resembling a tutorial on Rx, so please use Google to find relevant info. After all, that's how I found out about it.

The JobThreadManager Class

The job thread manager class is responsible for managing the loading and updating of jobs. It performs no other functionality. When the job thread manager is instantiated, it loads all jobs from the database, and then creates a list of jobs that can run (they're enabled and haven't exceed their end date). There really is no point in using up memory for jobs that can't run, right?

The following subsections discuss more important and interesting individual methods.

The Start() Method

This method starts the job manager, where we subscribe to the timer

C#
/// <summary>
/// Starts the job manager thread. 
/// </summary>
public void Start()
{
    if (this.isValidJobList)
    {
        // we always start on the next even minute (or if we're too close to that minute, the one after).
        DateTime dueStart = DateTime.Now;
        dueStart = ((dueStart.Second > 55) 
                   ? dueStart.AddMinutes(2).ZeroSeconds() 
                   : dueStart.AddMinutes(1)).ZeroSeconds();
        // we need to "do work" every second
        this.Timer = Observable.Timer(dueStart - DateTime.Now, TimeSpan.FromSeconds(1));
        this.JobsStarted = false;
        //subscribe to the timer event
        this.TimerSub = this.Timer.Subscribe(x => this.DoWork(), ex => this.WorkError());
    }
    else
    {
        DebugMsgs.Show(this, 
                       "Found no jobs that were enabled, or no jobs with steps that were enabled.", 
                       DebugLevel.Info);
    }
}

The DoWork() Method

The DowWork method is called by the Rx timer every second. This method will start the jobs if not yet started, check the SQLXAgent.exe.config file to see if the jobs need to be updated, and if the date has changed, will remove expired history items.

C#
/// <summary>
/// Do the work we need to do
/// </summary>
/// <returns></returns>
private IObserver<long> DoWork()
{
    IObserver<long> result = null;
    try
    {
        // if we haven't started the jobs yet, do that
        if (!this.JobsStarted)
        {
            this.JobsStarted = true;
            this.StartJobs();
        }
        // otherwise, check the config file to see if it's changed, and refresh 
	    // the jobs if necessary.
        else
        {
            // make sure the file still exists
            if (File.Exists(this.configFileInfo.FullName))
            {
                // if it does, refresh the FileInfo object
                this.configFileInfo.Refresh();
                // if the file date is > the last file date
                if (configFileInfo.LastWriteTimeUtc > this.lastConfigFileDate)
                {
                    this.lastConfigFileDate = configFileInfo.LastWriteTimeUtc;
                    this.HistoryExpireDays = Convert.ToInt32(SQLXCommon.Globals.
                                                             AppConfig.AppSettings.
                                                             Settings["HistoryExpireDays"].
                                                             Value);
                    this.RefreshJobs();
                }
            }
        }
        // see if we need to delete history items
        if (DateTime.Now.Date > this.LastHistoryDelete.Date)
        {
            this.RemoveExpiredHistoryItems();
        }
    }
    catch (Exception ex)
    {
        AppLog.Error(ex);
    }
    finally
    {
    }
    return result;
}

The RefreshJobs() Method

C#
/// <summary>
/// Refreshes the jobs because of a config file change.
/// </summary>
public void RefreshJobs()
{
    // Start out unrefreshed - as jobs are added/updated, the Refreshed 
    // flag will be set to true.
    foreach (JobThread item in this)
    {
        item.Refreshed = false;
    }
    // get the jobs from the database
    JobList newJobs = new JobList();
    newJobs.GetFromDatabase();
    // check each job to see if it is new or changed
    foreach(JobItem newJob in newJobs)
    {
        // get the current instance of the job
        JobItem oldJob = this.Jobs.FirstOrDefault(x => x.ID == newJob.ID);
        // if we didn't find it, it must be a new job
        if (oldJob == null)
        {
            // create a new JobThread object
            JobThread job = new JobThread(newJob);
            // add it to this manager
            this.Add(job);
            // start it if the other jobs have already started
            if (this.JobsStarted)
            {
                job.Start();
            }
        }
        else
        {
            // get the existing job thread that is handling this job
            JobThread jobThread = this.FirstOrDefault(x => x.Job.ID == oldJob.ID);
            // if we found it, update it
            if (jobThread != null)
            {
                jobThread.UpdateJob(newJob);
                DebugMsgs.Show(this, 
                               string.Format("Job Manager - job [{0}] updated", 
                                             oldJob.Name), 
                               DebugLevel.Info);
            }
            else
            {
                // error msg
            }
        }
    }
    // stop/remove jobs that were not refreshed.
    var toRemove = this.Where(x => x.Refreshed == false);
    foreach(JobThread oldItem in toRemove)
    {
        oldItem.Stop();
        this.Remove(oldItem);
    }
}

The JobThread Class

The job thread is responsible for processing the schedule for the associated job. No other processing is performed.

Job thread flow chart

The Start() Method

The Start() method creates/hooks into the Observable.Timer object.

C#
/// <summary>
/// Starts the job "thread" if possible.
/// </summary>
public void Start()
{
    // we don't have an update pending
    this.UpdatePending = false;
    // if the job can start (is enabled and has at least one enabled step)
    if (this.Job.CanRestart)
    {
        // set the value used to determine the next interval. A value of 0 is how 
        // we indicate to the RunSchedule method that we're just starting out.
        this.NextTime = new DateTime(0);
        // start the timer with an initial interval of 1 second. The RunSchedule 
        // method will determine the actual interval.
        this.CreateAndSubscribe(TimeSpan.FromSeconds(1));
    }
    else
    {
        // the code that actually lives here merely supports the call to DebugMsgs
        // and is omitted in the interest of brevity
    }
}

The RunSchedule() Method

This method is the actual worker in the class. It is the delegate method that responds to timer tick events.

C#
/// <summary>
/// Runs the scheduled job. The method is called by the timer when the interval 
/// expires.
/// </summary>
public void RunSchedule()
{
    bool isRunning = true;
    // if the NextTime is 0 ticks, we're just starting out, so we don't have anything to do.
    bool canWork  = (this.NextTime.Ticks != 0);
    // determine the next execution datetime (this will cause canWork to be true next time around)
    DateTime now  = DateTime.Now;
    this.NextTime = (now + this.CalculateNextRunTime(now)).ZeroSeconds();
    // if we can do the work
    if (canWork)
    {
        try
        {
            // execute the job (method is in JobThreadBase class)
            this.ExecuteJob();
        }
        catch (Exception ex)
        {
            AppLog.Error(ex);
        }
        finally
        {
            // if our next execution datetime is later than the schedule 
            // duration end date
            if (this.NextTime >= this.Job.SchedDurEndDate)
            {
                // stop the timer
                this.Stop();
                isRunning = false;
            }
        }
    }
    else
    {
        // DebugMsgs.Show(...);
    }
	// if we have an update pending, do the update
    if (this.UpdatePending)
    {
        this.UpdateJob(this.PendingJob);
        this.PendingJob = null;
    }
    // otherwise, if we're still running, reset the timer for the new 
    // interval.
    else if (isRunning)
    {
        this.ResetTimer();
    }
}

Calculating the Next Scheduled Job Execution

There are several methods involved in the calculation of job execution time, and which method it uses is determined by the job's schedule properties. Since I don't have a full-blown SQL Server instance at home, and because I can't install software (that can project a job's schedule) on my machine at work, there is no way for me to determine how close my scheduling code is to the way SQL Server does it. If someone has the ability and inclination to check this, I'd like to hear about how I'm not doing it right (my goal with all this code was to generally replicate the way SQL Server works).

Calculating the next scheduled job execution date/time starts out with the curiously named CalculateNextRunTime method.

C#
/// <summary>
/// Determines how to calculate the next execution datetime, and calls 
/// the appropriate methods.
/// </summary>
/// <param name="now">The current datetime</param>
/// <returns>A timespan that will be added to the current datetime to establish 
/// the subsequent execution datetime.</returns>
protected TimeSpan CalculateNextRunTime(DateTime now)
{
    TimeSpan result = new TimeSpan(0);
    // Establish our time of day based on the frequency properties
    TimeSpan time = CalculateDailyFrequencyTime(now);
    // call the appropriate period method
    switch (this.Job.SchedOccurPeriod)
    {
        case "Daily" :
            result = this.CalcNextDailyDay(now, time);
            break;
        case "Weekly" :
            result = this.CalcNextWeeklyDay(now, time);
            break;
        case "Monthly" :
            result = this.CalcNextMonthlyDay(now, time);
            break;
        default :
            result = new TimeSpan(0);
            break;
    }
    return result;
}

From this method, we call two additional methods. The first is CalculateDailyFrequencyTime. This

C#
/// <summary>
/// Calculate the next time to execute based on the daily frequency. This method 
/// only sets the time of the next run.
/// </summary>
/// <param name="now"></param>
/// <returns></returns>
protected TimeSpan CalculateDailyFrequencyTime(DateTime now)
{
    DateTime result = now;
    switch (this.Job.SchedDailyFreq)
    {
        case true :
            {
                // set the time
                result = result.SetTime(this.Job.SchedFreqOnceAt.TimeOfDay);
                // if the current datetime is greater than new datetime, add a day.
                if (now >= result)
                {
                    result = result.AddDays(1);
                }
            }
            break;
        case false :
            {
                // determine the number of seconds we're using (minutes * 60, or hours * 3600)
                int periodSeconds = 0;
                switch (this.Job.SchedFreqEveryPeriod)
                {
                    case "minute(s)" : periodSeconds = this.Job.SchedFreqEveryN * 60; break;
                    case "hour(s)"   : periodSeconds = this.Job.SchedFreqEveryN * 3600; break;
                }

                // set the new time
                result = result.AddSeconds(periodSeconds);

                // if the new start time is less than the specified start time
                if (result.TimeOfDay < this.Job.SchedFreqStart.TimeOfDay)
                {
                    // set the time to the specified start time
                    result = result.SetTime(this.Job.SchedFreqStart.TimeOfDay);
                }

                // if the new time is past the specified end time
                else if (result.TimeOfDay > this.Job.SchedFreqEnd.TimeOfDay)
                {
                    // add a day and set the time to the specified start time
                    result = result.AddDays(1).SetTime(this.Job.SchedFreqStart.TimeOfDay);
                }
            }
            break;
    }
    // return the difference between the current datetime and the new datetime
    return (result - now);
}

After CalculateDailyFrequencyTime is called, one of the following methods is called to establish the date of the next execution.

C#
/// <summary>
/// Calculate the next appropriate date for a daily schedule
/// </summary>
/// <param name="now"></param>
/// <param name="time"></param>
/// <returns></returns>
protected TimeSpan CalcNextDailyDay(DateTime now, TimeSpan time)
{
    DateTime result = now;
    // add the time difference
    result = now.AddSeconds(time.TotalSeconds);
    // if the new day is not the current day
    if (result.Day != now.Day)
    {
        // add the number of days - 1 (because we already added a day by 
        // changing the time) 
        result.AddDays(this.Job.SchedFreqEveryN-1); 
    }
    return (result - now);
}
C#
/// <summary>
/// Determines the next run date for weekly schedules
/// </summary>
/// <param name="now"></param>
/// <param name="time"></param>
/// <returns></returns>
protected TimeSpan CalcNextWeeklyDay(DateTime now, TimeSpan time)
{
    // the number of weeks between runs
    int weekInterval = this.Job.SchedOccurWeeklyInterval;

    // the weekdays we can run on
    List<dayofweek> weekDays = this.GetScheduledWeekdays();

    // add the time
    DateTime result = now.AddSeconds(time.TotalSeconds);

    // if the new day is not the current day
    if (result.Day != now.Day)
    {
        // determine if the new day is an selected day of the week
        // if it's not 
        if (weekDays.IndexOf(result.DayOfWeek) < 0)
        {
            // get the index of the current day of week
            int index = weekDays.IndexOf(now.DayOfWeek);

            // if the current day of week is not in the list
            if (index < 0)
            {
                // find the index of the next appropriate day of week
                List<dayofweek> daysAvailable = weekDays.Where(x=>x > now.DayOfWeek).ToList();

                // the index will be the nextDOW (if found) or 0
                index = (daysAvailable != null && daysAvailable.Count >= 1) 
                        ? weekDays.IndexOf(daysAvailable[0]) 
                        : 0;
            }
            else
            {
                // if the new index (index + 1) is out of range, we use 
                // the first item, otherwise, we se the new index
                index = (index+1 > weekDays.Count-1) ? 0 : index+1;
            }
            // get the actual new day of week based on our index value
            DayOfWeek newDOW = weekDays[index];

            // if the index is 0, we have to add the weekly interval 
            // (every n weeks) to the new date
            if (index == 0)
            {
                // add the days (interval * 7)
                result = result.AddDays(weekInterval * 7);

                // set the date to the first day of the new week
                result = result.FirstDayOfWeek();

                // backup one so we can find the correct next actual day
                result = result.AddDays(-1);
            }

            // this method starts at the current day, and adds one uti it gets to 
            // the appropriate day of wek.
            result = result.GetNextDayOfWeek(newDOW);
        } // if the new day is not the current day
    }
    return (result - now);
}
C#
/// <summary>
/// Caluclates the next execution date for a monthly schedule.
/// </summary>
/// <param name="now"></param>
/// <param name="time"></param>
/// <returns></returns>
protected TimeSpan CalcNextMonthlyDay(DateTime now, TimeSpan time)
{
    // add the time
    DateTime result = now.AddSeconds(time.TotalSeconds);

    if (result.Day != now.Day)
    {
        if (this.Job.SchedOccurMonthlyTypeIsDay)
        {
            // advance the date by the month interval
            result = result.AddMonths(this.Job.SchedOccurMonthlyDayInterval);
            // Reset the day to the lowest of the actual number of days in the 
            // month, and the preferred day. This allows for months with fewer 
            // days than the specified day number. For instance, if you specified 
            // the 31st in the schedule form, and a day has fewer days, the last 
            // day of the month will be used instead.
            result = result.SetDay(Math.Min(this.Job.SchedOccurMonthlyDayNumber, result.DaysInMonth()));
        }
        else
        {
            // advance the date by the month interval
            result = result.AddMonths(this.Job.SchedOccurMonthlyDayInterval);
            // get the weekday we're intersted in
            DayOfWeek dow = SQLXCommon.Globals.StringToEnum(this.Job.SchedOccurMonthlyTheNthDay, DayOfWeek.Sunday);
            // get the ordinal occurrence of the specified weekday
            result = result.GetDateByOrdinalDay(dow,  this.GetOrdinalDay());
        }
    }
    return (result - now);
}

The ExecuteJob() Method

This method is found in the JobTheadBase class. the reason it's in a base class is because the SQLXAgent application allows you to manually run jobs on demand, without consideration for their schedules.

C#
/// <summary>
/// Executes all of the steps for the job in (step.Position) sequential order.
/// </summary>
public virtual void ExecuteJob()
{
    if (!this.IsWorking)
    {
        DateTime jobStart = DateTime.Now;
        // send event (only used when job is run manually)
        this.OnJobStart(jobStart);
        this.IsWorking  = true;
        string status   = "SUCCESS";
        string reason   = string.Empty;
        long   execID   = 0;
        string debugMsg = string.Empty;
        foreach(StepItem step in this.Job.Steps)
        {
            if (step.StepIsEnabled)
            {
                DateTime start = DateTime.Now;
                // send event (only used when job is run manually)
                OnStepStart(start, step.ID, step.Name);
                // this is just for easing typing later on
                string stepName = string.Format("-Job {0}.[STEP {1}].[{2}].[{3}]", 
                                                this.DebugJobName, 
                                                step.Position, 
                                                step.StepType, 
                                                step.Name);
                switch (step.StepType)
                {
                    case "SQL" :
                        {
                            try
                            {
                                // this should never happen, but we check for it nonetheless.
                                if (string.IsNullOrEmpty(step.SqlQuery))
                                {
                                    status = "FAIL";
                                    reason = string.Format("{0}", 
                                                           SQLXExceptionCodes.
                                                           Codes[(int)SQLXExceptionEnum.
                                                                 QueryTextNullEmpty]);
                                }
                                else
                                {
                                    // execute the batch query. Remember, SQL Server throws an 
                                    // exception when it encounters a "GO" statement in batch 
                                    // queries.
                                    DBObject2.NonQuery(step.SqlQuery, 
                                                       null, 
                                                       CommandType.Text);
                                    status = "SUCCESS";
                                    reason = string.Empty;
                                }
                            }
                            catch (Exception ex)
                            {
                                status = "FAIL";
                                reason = ex.Message;
                            }
                        }
                        break;
                    case "PKG" :
                        {
                            try
                            {
                                // this should never happen, but we check nonetheless
                                if (string.IsNullOrEmpty(step.SsisFilePath))
                                {
                                    status = "FAIL";
                                    reason = SQLXExceptionCodes.Codes[(int)SQLXExceptionEnum.
                                                                      PkgPathNullEmpty];
                                }
                                else
                                {
                                    string pkgDLLFileName = step.SsisFilePath;
                                    string path = System.IO.Path.Combine(Globals.AppPath, 
                                                                         "SQLXPkgRunner.exe");
                                    
                                    string args = string.Format("-p\"{0}\" -s\"{1}\" -c\"{2}\"", 
                                                                pkgDLLFileName, 
                                                                step.ID, 
                                                                step.ConnectionString);
                                    Process app = new Process();
                                    ProcessStartInfo info = new ProcessStartInfo()
                                    {
                                        Arguments       = args,
                                        CreateNoWindow  = true,
                                        FileName        = path,
                                        UseShellExecute = true,
                                    };
                                    app.StartInfo = info;
                                    app.Start();
                                    app.WaitForExit();
                                    int result = app.ExitCode;
                                    if (result > 0)
                                    {
                                        status = "FAIL";
                                        SQLXExceptionEnum exception = Globals.IntToEnum(result, 
                                                                         SQLXExceptionEnum.Unknown);
                                        switch (exception)
                                        {
                                            case SQLXExceptionEnum.PkgFileNotFound  : 
                                                reason = string.Concat(SQLXExceptionCodes.Codes[(int)exception],  
                                                                       " - ", 
                                                                       pkgDLLFileName); 
                                                break;
                                            default : reason = SQLXExceptionCodes.
                                                                  Codes[(int)exception] ; break;
                                        }
                                    }
                                    else
                                    {
                                        status = "SUCCESS";
                                        reason = string.Empty;
                                    }
                                }
                            }
                            catch (Exception ex)
                            {
                                status = "FAIL";
                                reason = ex.Message;
                            }
                            // DebugMsgs...
                        }
                        break;
                }
                DateTime finish = DateTime.Now;
                // save the history item for this execution
                this.SaveHistory(ref execID, step, start, finish, status, reason);
                // send event (only used by manual job run form)
                this.OnStepFinish(start, finish, step.ID, step.Name, 
                                  (status=="SUCCESS"), reason);
            }
            else
            {
            }
        }
        DateTime jobFinish = DateTime.Now;
        // send event (only used by manual job run form)
        this.OnJobFinish(jobStart, jobFinish, (status=="SUCCESS"));
        this.IsWorking = false;
    }
}

Thread Testing App

During project development, I created an application to test the scheduling and actual thread execution functionality. The app is a nothing-fancy WPF utility, and show below.

ThreadTesterUI main window

The first pair of buttons allows you to visualize how the schedule would be executed.

Schedule Tester form

On the left is the list of jobs that are currently configured. This list includes all jobs whether they're enabled, or not.

On the top/right, there are radio buttons that allow you to configure how many projected dates you want to create. If the job has an end date, the Show all projections radio button will automatically be checked for you. Otherwise the Show "n" projections radio button will be checked automatically. When you click the Generate button, the list boxes below the button will be populated.

The Dates list box contains the unique dates for which one or more projections were generated. Clicking a date in the list will cause the Times list box to populate with the times that were generated for the selected date.

All dates/times are projected from the current date and time. Given its status as a mere test application, I felt this was a reasonable way to approach it.

The second set of buttons actually runs the jobs according to their specified schedules, just like the Windows service would run them. Messages are sent to the output window as jobs are processed.

Sample output window content

 

 

 

 

 

History

 

 

 

  • 29 Sep 2017 - Initial publication.
     

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)