Introduction
So the origin of problem comes from one of applications where we (as an implementing team) wanted to add a tracable label for request processing. All of you who are more less familiar with reactive programming (multthreads, clouds, possibly callbacks and so on) are aware that such processing involving many systems is asynchronious by its nature so must concern many threads. This article or rather a tip shows how to pass mapped diagnostic context between threads in Scala generally managed by ExecutionContext with AOP (annotated Java).
Implementation
You can find the inspiration of my solution in following article: http://yanns.github.io/blog/2014/05/04/slf4j-mapped-diagnostic-context-mdc-with-play-framework/. When I read it I was pondering if there is a way to make coders unaware of changes made in execution context. Or even moreover in any delivered execution context. Fortunately I could use AspectJ to define aspect and the result is shown below:
package mdc;
import mdc.ContextSwitcherExample;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import scala.Option;
import scala.concurrent.ExecutionContext;
@Aspect
public class MdcGuardianExample {
@Pointcut(value = "execution(* *.prepare()) && this(me) &&
!within(mdc.*)", argNames = "me")
void prepareCall(ExecutionContext me) {
}
@Around(value = "prepareCall(me)", argNames = "pjp,me")
public ExecutionContext around(ProceedingJoinPoint pjp, ExecutionContext me) throws Throwable {
ExecutionContext delegate = (ExecutionContext) pjp.proceed(new Object[]{me});
return prepare(delegate, me);
}
public ExecutionContext prepare(final ExecutionContext delegate, final ExecutionContext me) {
final Option<String> callingThreadContext = ContextSwitcherExample.capture();
return new ExecutionContext() {
public void execute(final Runnable runnable) {
delegate.execute(new Runnable() {
public void run() {
ContextSwitcherExample.cleanContext();
ContextSwitcherExample
.withContext(callingThreadContext, new JFunction0<Void>() {
@Override
public Void apply() {
runnable.run();
return null;
}
});
}
});
}
public ExecutionContext prepare() {
return me.prepare();
}
public void reportFailure(Throwable cause) {
me.reportFailure(cause);
}
};
}
}
The ContextSwitcherExample is a class which encloses all what is needed to set and remove the MDC in currently working thread. The code (in Scala) is simplified but fully functional version of Pawel Gosztyla's proposal (hail to you, Pawel!):
package mdc
import org.slf4j.MDC
import scala.util.DynamicVariable
object ContextSwitcherExample {
val DefaultContext: Option[String] = None
val ContextKey = "contextKey"
val contextInfoVariable = new DynamicVariable[Option[String]](DefaultContext)
def capture = contextInfoVariable.value
def withContext[R](context: Option[String])(f: => R): R = {
val previousContext = contextInfoVariable.value
try {
contextInfoVariable.withValue(context) {
initializeContext(context)
f
}
} finally {
initializeContext(previousContext)
}
}
def cleanContext(): Unit = {
contextInfoVariable.value = DefaultContext
initializeContext(DefaultContext)
}
private def initializeContext(context: Option[String]): Unit = {
if (context.isDefined) {
MDC.put(ContextKey, context.get)
} else {
MDC.remove(ContextKey)
}
}
}
Additionally I had to define class which allows me to expose Scala's Function0 to Java world:
package mdc
abstract class JFunction0[+R] extends (() => R)
and aop.xml kept in META-INF folder:
<aspectj>
<aspects>
<aspect name="mdc.MdcGuardianExample"/>
</aspects>
<weaver options="-verbose -showWeaveInfo ">
<include within="mdc..*"/>
<include within="mdcaj..*"/>
<include within="scala.concurrent.impl..*"/>
</weaver>
</aspectj>
Finally there is a test code (ScalaTest). Rememer to run it with -javaagent:<path to aspectweaver.jar>.
package mdcaj
import mdc.ContextSwitcherExample
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import scala.concurrent.duration.DurationDouble
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
class MdcGuardianExampleTest extends FlatSpec with Matchers with ScalaFutures with BeforeAndAfter {
implicit protected def testedExecutionContext: ExecutionContext = ExecutionContext.global
private var passedRequestIdPromise: Promise[Option[String]] = _
private def successfulFuture = ContextSwitcherExample.withContext(ContextSwitcherExample.DefaultContext) {
Future("")
}
private def captureCurrentRequestId(promise: Promise[Option[String]] = passedRequestIdPromise): Unit = {
promise.success(ContextSwitcherExample.capture)
}
private def checkPassedRequestId(expectedRequestId: Option[String], promise: Promise[Option[String]] = passedRequestIdPromise) = {
Await.result(promise.future, 1.second) should equal(expectedRequestId)
}
before {
passedRequestIdPromise = Promise[Option[String]]()
}
it should "pass context when using Future.map" in {
ContextSwitcherExample.withContext(Some("requestId.map")) {
successfulFuture.map { value =>
captureCurrentRequestId()
value
}
}
checkPassedRequestId(Some("requestId.map"))
}
it should "pass context when using Future.fold" in {
val passedRequestId2Promise = Promise[Option[String]]()
val passedRequestId3Promise = Promise[Option[String]]()
ContextSwitcherExample.withContext(Some("requestId.fold")) {
Future.fold(List(
Future(passedRequestIdPromise),
Future(passedRequestId2Promise),
Future(passedRequestId3Promise)
))("") { (_, promise) =>
captureCurrentRequestId(promise)
""
}
}
checkPassedRequestId(Some("requestId.fold"), passedRequestIdPromise)
checkPassedRequestId(Some("requestId.fold"), passedRequestId2Promise)
checkPassedRequestId(Some("requestId.fold"), passedRequestId3Promise)
}
}
And that's it!