Introduction
I have been working on an N-tier application recently and one of the ideas I wanted to explore was to stream large result set from the server side to client side and display the data in Real-time. The application uses the ASP.NET Web API to stream large result set to WPF clients on background thread and display on screen as soon as content is received. I will also show how to yield return using Dapper, a simple ORM, to return the result per record. This article will explain the steps to create the sample solution.
Background
I will demonstrate the idea using the sample code which will allow the user to search for a person by first name and display a list of persons that match the search criteria.
Using the Code
The solution has 2 projects, the Service Project and the WpfClient
project. The solution should contain everything you'll need to run the sample code. You just need to make sure you have .NET Framework 4.5 installed as my code uses the new async
and await
keywords. For debugging purposes, start both projects at the same time.
Service Project
The Service Project is a Web API MVC4 project. I have 2 controllers, the Home Controller and the Name Controller. The Home Controller is the default controller created when the project is created, it is not being used in this case, so we are going to ignore it. The Name controller contains our services that will demonstrate streaming capability of Web API.
There are 2 actions in the Name controller. The SimulateLargeResultSet
Action and the UsingDapper
Action. To correctly route request to the appropriate action, we need to add the following route to WebApiConfig.cs.
public static void Register(HttpConfiguration config)
{
config.Routes.MapHttpRoute(
name: "Api",
routeTemplate: "api/{controller}/{action}",
defaults: new {first = string.Empty}
);
}
The SimulateLargeResultSet
Action demonstrates what would happen in a situation when large result set is returned from SQL Server. Retrieving large result set takes time. To simulate the delay, Thread.Sleep(250)
is appended to the end of the for
loop. It will create a Person
object with an Id
and a name to return back to the client.
The Action
first creates a HttpResponseMessage
, which is typical in a Web API action.
HttpResponseMessage response = Request.CreateResponse();
Then it sets the HttpResponseMessage.Content
as a PushStreamContent
. The PushStreamContent
is where all the magic occurs as it actually enables scenarios where data producers want to write directly (either synchronously or asynchronously) using a stream
.
response.Content = new PushStreamContent(....);
Then finally, the Action
returns the response to establish the stream
, which is again, typical in a normal Web API action.
return response;
One of the overloaded constructors for PushStreamContent
accepts an action, and mediaType
which we'll specify as "text/plain
". The Action
will write a Json serialized Person
object to the output stream
once every quarter of a second.
for (int i = 0; i < 20000; i++ )
{
var name = new Person()
{
Id = i,
Name = string.Format("Name for Id: {0}", i),
};
var str = await JsonConvert.SerializeObjectAsync(name);
var buffer = UTF8Encoding.UTF8.GetBytes(str);
await outputStream.WriteAsync(buffer, 0, buffer.Length);
Thread.Sleep(250);
}
The other action in the sample code is the UsingDapper Action
which is not being used by default because it requires a SQL Server database to be setup. The logic is pretty similar to the SimulateLargeResultSet
action but it contains the code to connect to a SQL database and reads from the Name
table. You will have to create the database structure and change the connection string
to make it work. I am using Dapper v1.12.0.0 to execute the SQL query and yield return a person
object per record. Doing it this way will not have to wait for the entire result to return before pushing the output back to the client. I have set the buffered parameter to false
and set the commandtimeout
to indefinitely.
var persons = connection.Query<Person>(dynamicQuery,
new { FirstName = first },
buffered: false,
commandTimeout: 0,
commandType: CommandType.Text);
Each foreach
execution will then read the result from the database and seralize it into Json and write to the output stream
.
foreach (var person in persons)
{
var str = await JsonConvert.SerializeObjectAsync(person);
var buffer = UTF8Encoding.UTF8.GetBytes(str);
await outputStream.WriteAsync(buffer, 0, buffer.Length);
}
WPF Client Project
The WPF client project is done using Prism 4.1 and MVVM approach. When you run the sample code, you will see a First Name textbox where user can enter the first name of the person to search for and a datagrid
that will show all the results from the service project.
The MainWindow.xaml has a ViewModel
called MainWindowViewModel
which has this line of code in the constructor.
BindingOperations.EnableCollectionSynchronization(Persons, _personsLock);
This line of code allows an Observable
collection to be modified from a background thread without WPF throwing an error. WPF requires all UI changing code to be run on the dispatcher thread, which is the UI thread, in the newer version of .NET Framework, it will automatically marshall all INotifyPropertyChanged.PropertyChanged
to the UI thread, but it doesn't do it for the INotifyCollectionChanged.CollectionChanged.
You will have to have the EnableCollectionSynchronization(...)
or manually dispatch the add/remove operation back to the UI thread. I find that using the BindingOperations
is so much faster than using the dispatcher.
The DataGrid
is binded to the Persons
property in the MainWindowViewModel
class which is of type ObservableCollection<Person>
which we will add person
object to in realtime.
public ObservableCollection<Person> Persons
{
get
{
if (_persons == null)
{
_persons = new ObservableCollection<Person>();
}
return _persons;
}
set
{
_persons = value;
RaisePropertyChanged(() => Persons);
}
}
In the View Model, there are two commands, the Search Command and the Cancel Command. The Search command simply starts a background thread, calls the Web API service stream and gets back a list of Person
back. Then it will add the person
object into the Persons ObservableCollection
which will be reflected in the UI in real time.
using (Stream stream = await response.Content.ReadAsStreamAsync())
{
byte[] readBuffer = new byte[512];
int bytesRead = 0;
while ((bytesRead = stream.Read(readBuffer, 0, readBuffer.Length)) != 0)
{
ct.ThrowIfCancellationRequested();
string personString = Encoding.UTF8.GetString(readBuffer, 0, bytesRead);
var person = JsonConvert.DeserializeObject<Person>(personString);
persons.Add(person);
}
}
At the beginning of the While
loop, it checks if the cancellation token has been requested:
ct.ThrowIfCancellationRequested()
If so, it will stop streaming from the service. The Cancellation
token is hooked up to the Cancel
command which is then hooked up to the Cancel button. So that when user clicks on the Cancel button, it will stop the stream
.
If the cancel button is clicked, the Service side will receive an exception with ErrorCode == -2147023667
.
catch (HttpException ex)
{
if (ex.ErrorCode == -2147023667)
{
return;
}
}
Conclusion
And that's it guys. I hope you guys enjoyed reading the article as much as I enjoyed writing it, and find the article helpful. Thanks.