Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / Java

MDC for scala's ExecutionContext

4.89/5 (2 votes)
1 Mar 2015CPOL1 min read 9.5K  
MDC for scala's ExecutionContext

Introduction

So the origin of problem comes from one of applications where we (as an implementing team) wanted to add a tracable label for request processing. All of you who are more less familiar with reactive programming (multthreads, clouds, possibly callbacks and so on) are aware that such processing involving many systems is asynchronious by its nature so must concern many threads. This article or rather a tip shows how to pass mapped diagnostic context between threads in Scala generally managed by ExecutionContext with AOP (annotated Java).

Implementation

You can find the inspiration of my solution in following article: http://yanns.github.io/blog/2014/05/04/slf4j-mapped-diagnostic-context-mdc-with-play-framework/. When I read it I was pondering if there is a way to make coders unaware of changes made in execution context. Or even moreover in any delivered execution context. Fortunately I could use AspectJ to define aspect and the result is shown below:

Java
package mdc;

import mdc.ContextSwitcherExample;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import scala.Option;
import scala.concurrent.ExecutionContext;

@Aspect
public class MdcGuardianExample {
    @Pointcut(value = "execution(* *.prepare()) && this(me) && 
      !within(mdc.*)", argNames = "me")
    void prepareCall(ExecutionContext me) {
    }

    @Around(value = "prepareCall(me)", argNames = "pjp,me")
    public ExecutionContext around(ProceedingJoinPoint pjp, ExecutionContext me) throws Throwable {
        ExecutionContext delegate = (ExecutionContext) pjp.proceed(new Object[]{me});
        return prepare(delegate, me);
    }

    public ExecutionContext prepare(final ExecutionContext delegate, final ExecutionContext me) {
        final Option<String> callingThreadContext = ContextSwitcherExample.capture();
        return new ExecutionContext() {
            public void execute(final Runnable runnable) {
                delegate.execute(new Runnable() {
                    public void run() {
                        ContextSwitcherExample.cleanContext();
                        ContextSwitcherExample
                          .withContext(callingThreadContext, new JFunction0<Void>() {
                            @Override
                            public Void apply() {
                                runnable.run();
                                return null;
                            }
                        });
                    }
                });
            }

            public ExecutionContext prepare() {
                return me.prepare();
            }

            public void reportFailure(Throwable cause) {
                me.reportFailure(cause);
            }
        };
    }
}

The ContextSwitcherExample is a class which encloses all what is needed to set and remove the MDC in currently working thread. The code (in Scala) is simplified but fully functional version of Pawel Gosztyla's proposal (hail to you, Pawel!):

Java
package mdc

import org.slf4j.MDC

import scala.util.DynamicVariable

object ContextSwitcherExample {
  val DefaultContext: Option[String] = None
  val ContextKey = "contextKey"
  val contextInfoVariable = new DynamicVariable[Option[String]](DefaultContext)

  def capture = contextInfoVariable.value

  def withContext[R](context: Option[String])(f: => R): R = {
    val previousContext = contextInfoVariable.value
    try {
      contextInfoVariable.withValue(context) {
        initializeContext(context)
        f
      }
    } finally {
      initializeContext(previousContext)
    }
  }

  def cleanContext(): Unit = {
    contextInfoVariable.value = DefaultContext
    initializeContext(DefaultContext)
  }

  private def initializeContext(context: Option[String]): Unit = {
    if (context.isDefined) {
      MDC.put(ContextKey, context.get)
    } else {
      MDC.remove(ContextKey)
    }
  }
}

Additionally I had to define class which allows me to expose Scala's Function0 to Java world:

Java
package mdc

abstract class JFunction0[+R] extends (() => R)

and aop.xml kept in META-INF folder:

XML
<aspectj>
    <aspects>
        <aspect name="mdc.MdcGuardianExample"/>
    </aspects>
    <weaver options="-verbose -showWeaveInfo "> <!-- add -debug to print every class [not] woven  -->
        <include within="mdc..*"/>
        <include within="mdcaj..*"/>
        <include within="scala.concurrent.impl..*"/>
    </weaver>
</aspectj>

Finally there is a test code (ScalaTest). Rememer to run it with -javaagent:<path to aspectweaver.jar>.

Java
package mdcaj

import mdc.ContextSwitcherExample
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}

import scala.concurrent.duration.DurationDouble
import scala.concurrent.{Await, ExecutionContext, Future, Promise}

class MdcGuardianExampleTest extends FlatSpec with Matchers with ScalaFutures with BeforeAndAfter {
  implicit protected def testedExecutionContext: ExecutionContext = ExecutionContext.global

  private var passedRequestIdPromise: Promise[Option[String]] = _

  private def successfulFuture = ContextSwitcherExample.withContext(ContextSwitcherExample.DefaultContext) {
    Future("")
  }

  private def captureCurrentRequestId(promise: Promise[Option[String]] = passedRequestIdPromise): Unit = {
    promise.success(ContextSwitcherExample.capture)
  }

  private def checkPassedRequestId(expectedRequestId: Option[String], promise: Promise[Option[String]] = passedRequestIdPromise) = {
    // need to block thread, so we are sure that tested future operation is executed in different worker
    // do not change to whenReady!!!
    Await.result(promise.future, 1.second) should equal(expectedRequestId)
  }

  before {
    passedRequestIdPromise = Promise[Option[String]]()
  }

  it should "pass context when using Future.map" in {
    ContextSwitcherExample.withContext(Some("requestId.map")) {
      successfulFuture.map { value =>
        captureCurrentRequestId()
        value
      }
    }
    checkPassedRequestId(Some("requestId.map"))
  }

  it should "pass context when using Future.fold" in {
    val passedRequestId2Promise = Promise[Option[String]]()
    val passedRequestId3Promise = Promise[Option[String]]()

    ContextSwitcherExample.withContext(Some("requestId.fold")) {
      Future.fold(List(
        Future(passedRequestIdPromise),
        Future(passedRequestId2Promise),
        Future(passedRequestId3Promise)
      ))("") { (_, promise) =>
        captureCurrentRequestId(promise)
        ""
      }
    }
    checkPassedRequestId(Some("requestId.fold"), passedRequestIdPromise)
    checkPassedRequestId(Some("requestId.fold"), passedRequestId2Promise)
    checkPassedRequestId(Some("requestId.fold"), passedRequestId3Promise)
  }
}

And that's it!

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)