Last time we talked a little about asynchrony and about the cpp_async_await project. The previous article is located at http://jrb-programming.blogspot.com/2013/04/c-style-asyncawait-in-c-part-1.html. All code for the project is located at https://github.com/jbandela/cpp_async_await/. We talked about how to use the library with Boost.Asio.
As mentioned before the other major C++ library is Microsoft PPL/PPLX (PPLX is the cross platform port of PPL by Microsoft Casablanca Project) You can obtain PPLX and the documentation at http://casablanca.codeplex.com/ along with a host of other really neat stuff such as an http client, json library, etc. From here on out, unless specified otherwise, you can take what I say about PPL and assume that it applies to PPLX.
While Boost.Asio uses a callback model, PPL/PPLX uses a continuation model. The key class is
template < typename _Type>
class task;
_Type specifies the type of value produced by the task and it can be void. Task is very similar to std::future with the addition of the .then method. Whereas std::future has a .get method which blocks until the future is complete, the .then method allows a lambda to be specified which will be called when the task is complete. You can read more about PPL tasks at http://msdn.microsoft.com/en-us/library/dd492427(v=vs.110).aspx.
Here is an example of how to use tasks and continuations taken from the above link
#include <ppltasks.h>
#include <iostream>
using namespace concurrency;
using namespace std;
int wmain()
{
auto t = create_task([]() -> int
{
return 42;
});
t.then([](int result)
{
wcout << result << endl;
}).wait();
}
This is actually pretty neat and it is easier to chain tasks than in Boost.Asio.
There is currently a proposal to add .then to std::future
. You can find the proposal at http://isocpp.org/files/papers/N3558.pdf
However, it gets hard to use once you need to do anything in a loop. Due to this, along with other reasons, there is a proposal to add resumable functions to the C++ standard. You can find the paper at http://isocpp.org/files/papers/N3564.pdf
Here is one of the motivating example from that paper. Note, they are using a future with the .then continuations just like a PPL task currently
auto write =
[&buf](future<int> size) -> future<bool>
{
return streamW.write(size.get(), buf).then(
[](future<int> op){ return op.get() > 0; });
};
auto flse = [](future<int> op){ return
future::make_ready_future(false);};
auto copy = do_while(
[&buf]() -> future<bool>
{
return streamR.read(512, buf)
.choice(
[](future<int> op){ return op.get() > 0; }, write, flse);
});
The code asynchronously reads a stream 512 bytes at a time until no more bytes are read, while asynchronously writing what was read. Here is how the code looks with the proposed C++ language additions. Note that resumable marks a function as resumable and await suspends the function and then resumes the function when the awaited future (task) is complete returning the value generated by the task that was awaited.
int cnt = 0;
do
{
cnt = await streamR.read(512, buf);
if ( cnt == 0 ) break;
cnt = await streamW.write(cnt, buf);
} while (cnt > 0);
Notice how much easier to follow the code is with the language additions. The downside is you will have to wait for the proposal to be approved, become part of a standard, and for your compiler to implement it.
The good news is you can have much of the same convenience using cpp_async_await
now. First a motivating example. In the Casablanca REST SDK that provides PPLX, there is an example of asynchronously searching a file for lines which contain a some string and writing them asynchronously to another file. You can find the code at http://casablanca.codeplex.com/SourceControl/changeset/view/ 040c323727ca7747beb254ecf2b8eac73632f3be#Release/collateral/Samples/SearchFile/searchfile.cpp. We are using PPLX because it is a bit easier to have a real example with a commandline app. You would use PPL tasks in the same way as PPLX tasks.
#include <filestream.h>
#include <containerstream.h>
#include <producerconsumerstream.h>
using namespace utility;
using namespace concurrency::streams;
pplx::task<bool> _do_while_iteration(std::function<pplx::task<bool>(void)> func)
{
pplx::task_completion_event<bool> ev;
func().then([=](bool guard)
{
ev.set(guard);
});
return pplx::create_task(ev);
}
pplx::task<bool> _do_while_impl(std::function<pplx::task<bool>(void)> func)
{
return _do_while_iteration(func).then([=](bool guard) -> pplx::task<bool>
{
if(guard)
{
return ::_do_while_impl(func);
}
else
{
return pplx::task_from_result(false);
}
});
}
pplx::task<void> do_while(std::function<pplx::task<bool>(void)> func)
{
return _do_while_impl(func).then([](bool){});
}
typedef std::vector<std::string> matched_lines;
namespace Concurrency { namespace streams {
template <typename CharType>
class _type_parser<CharType, matched_lines>
{
public:
static pplx::task<matched_lines> parse(streambuf<CharType> buffer)
{
basic_istream<CharType> in(buffer);
auto lines = std::make_shared<matched_lines>();
return do_while([=]()
{
container_buffer<std::string> line;
return in.read_line(line).then([=](const size_t bytesRead)
{
if(bytesRead == 0 && in.is_eof())
{
return false;
}
else
{
lines->push_back(std::move(line.collection()));
return true;
}
});
}).then([=]()
{
return matched_lines(std::move(*lines));
});
}
};
}}
static pplx::task<void> find_matches_in_file(const string_t &fileName,
const std::string &searchString, basic_ostream<char> results)
{
return file_stream<char>::open_istream(fileName).then([=](basic_istream<char> inFile)
{
auto lineNumber = std::make_shared<int>(1);
return ::do_while([=]()
{
container_buffer<std::string> inLine;
return inFile.read_line(inLine).then([=](size_t bytesRead)
{
if(bytesRead == 0 && inFile.is_eof())
{
return pplx::task_from_result(false);
}
else if(inLine.collection().find(searchString) != std::string::npos)
{
results.print("line ");
results.print((*lineNumber)++);
return results.print(":").then([=](size_t)
{
container_buffer<std::string> outLine(std::move(inLine.collection()));
return results.write(outLine, outLine.collection().size());
}).then([=](size_t)
{
return results.print("\r\n");
}).then([=](size_t)
{
return true;
});
}
else
{
++(*lineNumber);
return pplx::task_from_result(true);
}
});
}).then([=]()
{
return inFile.close() && results.close();
});
})
.then([](std::vector<bool>) {});
}
static pplx::task<void>
write_matches_to_file(const string_t &fileName, matched_lines results)
{
auto sharedResults = std::make_shared<matched_lines>(std::move(results));
return file_stream<char>::open_ostream(fileName,
std::ios::trunc).then([=](basic_ostream<char> outFile)
{
auto currentIndex = std::make_shared<size_t>(0);
return ::do_while([=]()
{
if(*currentIndex >= sharedResults->size())
{
return pplx::task_from_result(false);
}
container_buffer<std::string> lineData((*sharedResults)[(*currentIndex)++]);
outFile.write(lineData, lineData.collection().size());
return outFile.print("\r\n").then([](size_t)
{
return true;
});
}).then([=]()
{
return outFile.close();
});
})
.then([](bool) {});
}
#ifdef _MS_WINDOWS
int wmain(int argc, wchar_t *args[])
#else
int main(int argc, char *args[])
#endif
{
if(argc != 4)
{
printf("Usage: SearchFile.exe input_file search_string output_file\n");
return -1;
}
const string_t inFileName = args[1];
const std::string searchString = utility::conversions::to_utf8string(args[2]);
const string_t outFileName = args[3];
producer_consumer_buffer<char> lineResultsBuffer;
basic_ostream<char> outLineResults(lineResultsBuffer);
find_matches_in_file(inFileName, searchString, outLineResults)
.then([&]()
{
basic_istream<char> inLineResults(lineResultsBuffer);
return inLineResults.extract<matched_lines>();
})
.then([&](matched_lines lines)
{
return write_matches_to_file(outFileName, std::move(lines));
})
.wait();
return 0;
}
Notice how painful iteration is. Now here is the code using cpp_await_async
pplx_helper
. Just a quick note. The code above first copies the matching lines into a producer_consumer_buffer
and then into a vector and then to the output file. My code copies into the producer_consumer_buffer
and then uses that buffer to copy to output. I think, my code achieves the same level of concurrency as the example program. If I am incorrect in this, please let me know in the comments below. You can find the whole file at https://github.com/jbandela/cpp_async_await/blob/master/PplxExample2.cpp
#include "pplx_helper.hpp"
#include <filestream.h>
#include <containerstream.h>
#include <producerconsumerstream.h>
using namespace utility;
using namespace concurrency::streams;
#ifdef _MS_WINDOWS
int wmain(int argc, wchar_t *args[])
#else
int main(int argc, char *args[])
#endif
{
if(argc != 4)
{
printf("Usage: PplxExample2 input_file search_string output_file\n");
return -1;
}
const string_t inFileName = args[1];
const std::string searchString = utility::conversions::to_utf8string(args[2]);
const string_t outFileName = args[3];
producer_consumer_buffer<char> lineResultsBuffer;
basic_ostream<char> outLineResults(lineResultsBuffer);
auto reader = pplx_helper::do_async([&](pplx_helper::async_helper<void> helper){
auto inFile = helper.await(file_stream<char>::open_istream(inFileName));
int lineNumber = 1;
bool done = false;
while(!done){
container_buffer<std::string> inLine;
auto bytesRead = helper.await(inFile.read_line(inLine));
if(bytesRead==0 && inFile.is_eof()){
done = true;
}
else if(inLine.collection().find(searchString) != std::string::npos){
helper.await(outLineResults.print("line "));
helper.await(outLineResults.print(lineNumber++));
helper.await(outLineResults.print(":"));
container_buffer<std::string> outLine(std::move(inLine.collection()));
helper.await(outLineResults.write(outLine,outLine.collection().size()));
helper.await(outLineResults.print("\r\n"));
}
else{
++lineNumber;
}
}
helper.await(inFile.close() && outLineResults.close());
});
auto writer = pplx_helper::do_async([&](pplx_helper::async_helper<void> helper){
basic_istream<char> inLineResults(lineResultsBuffer);
auto outFile = helper.await(file_stream<char>::open_ostream(outFileName,std::ios::trunc));
auto currentIndex = 0;
bool done = false;
while(!done){
container_buffer<std::string> lineData;
auto bytesRead = helper.await(inLineResults.read_line(lineData));
if(bytesRead==0 && inLineResults.is_eof()){
done = true;
}
else{
container_buffer<std::string> lineDataOut(std::move(lineData.collection()));
helper.await(outFile.write(lineDataOut,lineDataOut.collection().size()));
helper.await(outFile.print("\r\n"));
}
}
helper.await(inLineResults.close() && outFile.close());
});
try{
(reader && writer).wait();
}
catch(std::exception& e){
std::cerr << e.what();
}
return 0;
}
Notice how we can easily do iteration. The library is pretty similar to what can be achieved with the language additions. Instead of of resumable to mark a function as resumable, we use pplx_helper::do_async
which takes a lambda. The lambda takes a single parameter of pplx_helper::async_helper<void>
. If the lambda were to return an int for example it would take pplx_helper::async_helper<int>
. In general a lambda return type T takes pplx_helper::async_helper<T>
. In the case of the example code the parameter is named helper
. In the language proposal you use the unary await
keyword to suspend the function until a task is complete and then resume the function returning the value generated by the task were were awaiting. In your code we call helper.await
on the task you want to await. helper.await
provides pretty much the same convenience as the language keyword await.
You can use the same syntax to work with PPL tasks by using namespace ppl_helper
. In summary for PPLX (Project Casablanca) use namespace pplx_helper
and for PPL (Shipped with Visual C++ on Windows) use ppl_helper
.
This functionality is packaged up for you at https://github.com/jbandela/cpp_async_await. It is licensed under the Boost Software License which allows usage for both open source and commercial applications. It is a header only library and does not need to be built, but it does depend on Boost.Coroutine and needs to be linked to the boost_context library. The library has been tested with Visual C++ 2012 on Windows, and G++ 4.7.2 on Fedora Linux.
I hope you have enjoyed this discussion. Download the code and try it out, and let me know what you think. If people are interested, I will talk in a future post about how the library actually works.
Thanks,
John Bandela