Part 1 - Using SQLXAgent
Part 2 - Architecture and Design Decisions
Part 3 - The CSV and Excel Importer Code
Part 4 - Job Scheduling Code (this article)
Part 5 - How Packages Are Run
Part 6 - Interesting Coding
Introduction
This article is the 4th part of a larger article series describing the SQLXAgent utility. In this article, I'll be describing the job schedule handling code. This was, without a doubt, the most difficult part of the entire solution. I think I had to refactor it half a dozen times before I got it right.
Due to the almost limitless number of scedule permutations available, the most practical way to test the scheduling piece was to create a test app that allowed you to generate the future job execution date/times without actually running the jobs.
This article specifically discusses code found in the SQLXThreadManager assembly, and only code dealing with the jobs as they are processed by the SQLXAgentSvc application. In the code snippets below, most of the calls to DebugMsgs
were removed in the interest of brevity, and formatting was modified to in an attempt to eliminate horizontal scroll bars.
NOTE: Code snippets presented in the article may or may not reflect the absolutely latest and greatest version of the code. In the event that it does not exactly match the actual code, it will fairly close. Such is the nature of noticing issues while writing an article.
Welcome to System.Reactive (aka "Rx")
Coming up with a suitable model for running the scheduled jobs was a pain. At first, I tried using threads, and it quickly became a convoluted mess. Next I tried the TPL library, and in some ways, that was even worse. Then I stumbled on the System.Reactive (referred to as Rx in this article) library. With Rx, I was able to significantly reduce the amount and complexity of the code and the load on the CPU necessary to track the schedule and perform the work required by the jobs.
With Rx, I set up a timer, and then subscribed to what amounts to the "tick" event. I wanted to make each job responsible for its own timer because job schedules don't necessarily run on even intervals. Here's the basic flowchart that describes schedule processing.
In point of fact, I use Rx to perform the following duties:
- Keep the service alive. Despite having the
JobThreadManager
running, I wanted the service to be able to keep itself alive while everything else was at rest and waiting for the desired time. This timer runs every second. No actual work is performed.
- Watch for config file changes in the job manager. The job manager monitors the config file for changes so that it knows when job definitions have changed. When you use SQLXAgent to add/delete or otherwise change jobs and/or steps, that application saves the config file. This triggers a "job refresh" event to occur. Only jobs that were added, removed or changed are affected.
- Establishes and waits for the next execution date/time for a given job. When the "tick" event occurs, the job is executed.
It is far outside the scope of this article to provide anything resembling a tutorial on Rx, so please use Google to find relevant info. After all, that's how I found out about it.
The JobThreadManager Class
The job thread manager class is responsible for managing the loading and updating of jobs. It performs no other functionality. When the job thread manager is instantiated, it loads all jobs from the database, and then creates a list of jobs that can run (they're enabled and haven't exceed their end date). There really is no point in using up memory for jobs that can't run, right?
The following subsections discuss more important and interesting individual methods.
The Start() Method
This method starts the job manager, where we subscribe to the timer
public void Start()
{
if (this.isValidJobList)
{
DateTime dueStart = DateTime.Now;
dueStart = ((dueStart.Second > 55)
? dueStart.AddMinutes(2).ZeroSeconds()
: dueStart.AddMinutes(1)).ZeroSeconds();
this.Timer = Observable.Timer(dueStart - DateTime.Now, TimeSpan.FromSeconds(1));
this.JobsStarted = false;
this.TimerSub = this.Timer.Subscribe(x => this.DoWork(), ex => this.WorkError());
}
else
{
DebugMsgs.Show(this,
"Found no jobs that were enabled, or no jobs with steps that were enabled.",
DebugLevel.Info);
}
}
The DoWork() Method
The DowWork
method is called by the Rx timer every second. This method will start the jobs if not yet started, check the SQLXAgent.exe.config
file to see if the jobs need to be updated, and if the date has changed, will remove expired history items.
private IObserver<long> DoWork()
{
IObserver<long> result = null;
try
{
if (!this.JobsStarted)
{
this.JobsStarted = true;
this.StartJobs();
}
else
{
if (File.Exists(this.configFileInfo.FullName))
{
this.configFileInfo.Refresh();
if (configFileInfo.LastWriteTimeUtc > this.lastConfigFileDate)
{
this.lastConfigFileDate = configFileInfo.LastWriteTimeUtc;
this.HistoryExpireDays = Convert.ToInt32(SQLXCommon.Globals.
AppConfig.AppSettings.
Settings["HistoryExpireDays"].
Value);
this.RefreshJobs();
}
}
}
if (DateTime.Now.Date > this.LastHistoryDelete.Date)
{
this.RemoveExpiredHistoryItems();
}
}
catch (Exception ex)
{
AppLog.Error(ex);
}
finally
{
}
return result;
}
The RefreshJobs() Method
public void RefreshJobs()
{
foreach (JobThread item in this)
{
item.Refreshed = false;
}
JobList newJobs = new JobList();
newJobs.GetFromDatabase();
foreach(JobItem newJob in newJobs)
{
JobItem oldJob = this.Jobs.FirstOrDefault(x => x.ID == newJob.ID);
if (oldJob == null)
{
JobThread job = new JobThread(newJob);
this.Add(job);
if (this.JobsStarted)
{
job.Start();
}
}
else
{
JobThread jobThread = this.FirstOrDefault(x => x.Job.ID == oldJob.ID);
if (jobThread != null)
{
jobThread.UpdateJob(newJob);
DebugMsgs.Show(this,
string.Format("Job Manager - job [{0}] updated",
oldJob.Name),
DebugLevel.Info);
}
else
{
}
}
}
var toRemove = this.Where(x => x.Refreshed == false);
foreach(JobThread oldItem in toRemove)
{
oldItem.Stop();
this.Remove(oldItem);
}
}
The JobThread Class
The job thread is responsible for processing the schedule for the associated job. No other processing is performed.
The Start() Method
The Start() method creates/hooks into the Observable.Timer object.
public void Start()
{
this.UpdatePending = false;
if (this.Job.CanRestart)
{
this.NextTime = new DateTime(0);
this.CreateAndSubscribe(TimeSpan.FromSeconds(1));
}
else
{
}
}
The RunSchedule() Method
This method is the actual worker in the class. It is the delegate method that responds to timer tick events.
public void RunSchedule()
{
bool isRunning = true;
bool canWork = (this.NextTime.Ticks != 0);
DateTime now = DateTime.Now;
this.NextTime = (now + this.CalculateNextRunTime(now)).ZeroSeconds();
if (canWork)
{
try
{
this.ExecuteJob();
}
catch (Exception ex)
{
AppLog.Error(ex);
}
finally
{
if (this.NextTime >= this.Job.SchedDurEndDate)
{
this.Stop();
isRunning = false;
}
}
}
else
{
}
if (this.UpdatePending)
{
this.UpdateJob(this.PendingJob);
this.PendingJob = null;
}
else if (isRunning)
{
this.ResetTimer();
}
}
Calculating the Next Scheduled Job Execution
There are several methods involved in the calculation of job execution time, and which method it uses is determined by the job's schedule properties. Since I don't have a full-blown SQL Server instance at home, and because I can't install software (that can project a job's schedule) on my machine at work, there is no way for me to determine how close my scheduling code is to the way SQL Server does it. If someone has the ability and inclination to check this, I'd like to hear about how I'm not doing it right (my goal with all this code was to generally replicate the way SQL Server works).
Calculating the next scheduled job execution date/time starts out with the curiously named CalculateNextRunTime
method.
protected TimeSpan CalculateNextRunTime(DateTime now)
{
TimeSpan result = new TimeSpan(0);
TimeSpan time = CalculateDailyFrequencyTime(now);
switch (this.Job.SchedOccurPeriod)
{
case "Daily" :
result = this.CalcNextDailyDay(now, time);
break;
case "Weekly" :
result = this.CalcNextWeeklyDay(now, time);
break;
case "Monthly" :
result = this.CalcNextMonthlyDay(now, time);
break;
default :
result = new TimeSpan(0);
break;
}
return result;
}
From this method, we call two additional methods. The first is CalculateDailyFrequencyTime
. This
protected TimeSpan CalculateDailyFrequencyTime(DateTime now)
{
DateTime result = now;
switch (this.Job.SchedDailyFreq)
{
case true :
{
result = result.SetTime(this.Job.SchedFreqOnceAt.TimeOfDay);
if (now >= result)
{
result = result.AddDays(1);
}
}
break;
case false :
{
int periodSeconds = 0;
switch (this.Job.SchedFreqEveryPeriod)
{
case "minute(s)" : periodSeconds = this.Job.SchedFreqEveryN * 60; break;
case "hour(s)" : periodSeconds = this.Job.SchedFreqEveryN * 3600; break;
}
result = result.AddSeconds(periodSeconds);
if (result.TimeOfDay < this.Job.SchedFreqStart.TimeOfDay)
{
result = result.SetTime(this.Job.SchedFreqStart.TimeOfDay);
}
else if (result.TimeOfDay > this.Job.SchedFreqEnd.TimeOfDay)
{
result = result.AddDays(1).SetTime(this.Job.SchedFreqStart.TimeOfDay);
}
}
break;
}
return (result - now);
}
After CalculateDailyFrequencyTime
is called, one of the following methods is called to establish the date of the next execution.
protected TimeSpan CalcNextDailyDay(DateTime now, TimeSpan time)
{
DateTime result = now;
result = now.AddSeconds(time.TotalSeconds);
if (result.Day != now.Day)
{
result.AddDays(this.Job.SchedFreqEveryN-1);
}
return (result - now);
}
protected TimeSpan CalcNextWeeklyDay(DateTime now, TimeSpan time)
{
int weekInterval = this.Job.SchedOccurWeeklyInterval;
List<dayofweek> weekDays = this.GetScheduledWeekdays();
DateTime result = now.AddSeconds(time.TotalSeconds);
if (result.Day != now.Day)
{
if (weekDays.IndexOf(result.DayOfWeek) < 0)
{
int index = weekDays.IndexOf(now.DayOfWeek);
if (index < 0)
{
List<dayofweek> daysAvailable = weekDays.Where(x=>x > now.DayOfWeek).ToList();
index = (daysAvailable != null && daysAvailable.Count >= 1)
? weekDays.IndexOf(daysAvailable[0])
: 0;
}
else
{
index = (index+1 > weekDays.Count-1) ? 0 : index+1;
}
DayOfWeek newDOW = weekDays[index];
if (index == 0)
{
result = result.AddDays(weekInterval * 7);
result = result.FirstDayOfWeek();
result = result.AddDays(-1);
}
result = result.GetNextDayOfWeek(newDOW);
}
}
return (result - now);
}
protected TimeSpan CalcNextMonthlyDay(DateTime now, TimeSpan time)
{
DateTime result = now.AddSeconds(time.TotalSeconds);
if (result.Day != now.Day)
{
if (this.Job.SchedOccurMonthlyTypeIsDay)
{
result = result.AddMonths(this.Job.SchedOccurMonthlyDayInterval);
result = result.SetDay(Math.Min(this.Job.SchedOccurMonthlyDayNumber, result.DaysInMonth()));
}
else
{
result = result.AddMonths(this.Job.SchedOccurMonthlyDayInterval);
DayOfWeek dow = SQLXCommon.Globals.StringToEnum(this.Job.SchedOccurMonthlyTheNthDay, DayOfWeek.Sunday);
result = result.GetDateByOrdinalDay(dow, this.GetOrdinalDay());
}
}
return (result - now);
}
The ExecuteJob() Method
This method is found in the JobTheadBase
class. the reason it's in a base class is because the SQLXAgent application allows you to manually run jobs on demand, without consideration for their schedules.
public virtual void ExecuteJob()
{
if (!this.IsWorking)
{
DateTime jobStart = DateTime.Now;
this.OnJobStart(jobStart);
this.IsWorking = true;
string status = "SUCCESS";
string reason = string.Empty;
long execID = 0;
string debugMsg = string.Empty;
foreach(StepItem step in this.Job.Steps)
{
if (step.StepIsEnabled)
{
DateTime start = DateTime.Now;
OnStepStart(start, step.ID, step.Name);
string stepName = string.Format("-Job {0}.[STEP {1}].[{2}].[{3}]",
this.DebugJobName,
step.Position,
step.StepType,
step.Name);
switch (step.StepType)
{
case "SQL" :
{
try
{
if (string.IsNullOrEmpty(step.SqlQuery))
{
status = "FAIL";
reason = string.Format("{0}",
SQLXExceptionCodes.
Codes[(int)SQLXExceptionEnum.
QueryTextNullEmpty]);
}
else
{
DBObject2.NonQuery(step.SqlQuery,
null,
CommandType.Text);
status = "SUCCESS";
reason = string.Empty;
}
}
catch (Exception ex)
{
status = "FAIL";
reason = ex.Message;
}
}
break;
case "PKG" :
{
try
{
if (string.IsNullOrEmpty(step.SsisFilePath))
{
status = "FAIL";
reason = SQLXExceptionCodes.Codes[(int)SQLXExceptionEnum.
PkgPathNullEmpty];
}
else
{
string pkgDLLFileName = step.SsisFilePath;
string path = System.IO.Path.Combine(Globals.AppPath,
"SQLXPkgRunner.exe");
string args = string.Format("-p\"{0}\" -s\"{1}\" -c\"{2}\"",
pkgDLLFileName,
step.ID,
step.ConnectionString);
Process app = new Process();
ProcessStartInfo info = new ProcessStartInfo()
{
Arguments = args,
CreateNoWindow = true,
FileName = path,
UseShellExecute = true,
};
app.StartInfo = info;
app.Start();
app.WaitForExit();
int result = app.ExitCode;
if (result > 0)
{
status = "FAIL";
SQLXExceptionEnum exception = Globals.IntToEnum(result,
SQLXExceptionEnum.Unknown);
switch (exception)
{
case SQLXExceptionEnum.PkgFileNotFound :
reason = string.Concat(SQLXExceptionCodes.Codes[(int)exception],
" - ",
pkgDLLFileName);
break;
default : reason = SQLXExceptionCodes.
Codes[(int)exception] ; break;
}
}
else
{
status = "SUCCESS";
reason = string.Empty;
}
}
}
catch (Exception ex)
{
status = "FAIL";
reason = ex.Message;
}
}
break;
}
DateTime finish = DateTime.Now;
this.SaveHistory(ref execID, step, start, finish, status, reason);
this.OnStepFinish(start, finish, step.ID, step.Name,
(status=="SUCCESS"), reason);
}
else
{
}
}
DateTime jobFinish = DateTime.Now;
this.OnJobFinish(jobStart, jobFinish, (status=="SUCCESS"));
this.IsWorking = false;
}
}
Thread Testing App
During project development, I created an application to test the scheduling and actual thread execution functionality. The app is a nothing-fancy WPF utility, and show below.
The first pair of buttons allows you to visualize how the schedule would be executed.
On the left is the list of jobs that are currently configured. This list includes all jobs whether they're enabled, or not.
On the top/right, there are radio buttons that allow you to configure how many projected dates you want to create. If the job has an end date, the Show all projections radio button will automatically be checked for you. Otherwise the Show "n" projections radio button will be checked automatically. When you click the Generate button, the list boxes below the button will be populated.
The Dates list box contains the unique dates for which one or more projections were generated. Clicking a date in the list will cause the Times list box to populate with the times that were generated for the selected date.
All dates/times are projected from the current date and time. Given its status as a mere test application, I felt this was a reasonable way to approach it.
The second set of buttons actually runs the jobs according to their specified schedules, just like the Windows service would run them. Messages are sent to the output window as jobs are processed.
History
- 29 Sep 2017 - Initial publication.