Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Pipeline and Filters Pattern using C#

0.00/5 (No votes)
19 Apr 2016 1  
Implementing Pipeline and Filters pattern using C#

Introduction

Pipeline and filters is a very useful and neat pattern in the scenario when a set of filtering (processing) needs to be performed on an object to transform it into a useful state, as described below in this picture.

The code used in this article is the complete implementation of Pipeline and Filter pattern in a generic fashion.

Background

Understanding of messaging patterns is critical to understand Enterprise Integration patterns:

Using the Code

The code here considers there is an object repository AgentStatusRepository, which has a static property Agents to return List<Agent>, this list of AgentStatus is the input to the pipeline on which various filtering needs to happen.

AgentStatus class (Step 1)

public class AgentStatus
   {
       public string AgentId { get; set; }
       public string PresenceStatus { get; set; }
       public int CurrentWorkload { get; set; }
       public int MaxWorkload { get; set; }
       public DateTime StatusUpdateDatetime { get; set; }
   }

Possible Values of PresenceStatus:

  • Available
  • Busy
  • Research
  • Offline

And below is the list of filtering that needs to be performed on the List<AgentStatus> object (Object1 in the picture above), retrieved from the AgentStatusRepository:

  • AgentAvailabilityFilter (Filter1 in the picture above) - Filter the List<Agent> where Agent.PresenceStatus == 'Available'
  • AgentWorkloadFilter (Filter2 in the picture above) - Filter the list filtered from Filter1 where Agent.CurrentWorkload < Agent.MaxWorkload
  • AgentPresenceUpdateDatetimeFilter (Filter3 in the picture above) - Filter the list filtered from Filter2 where (DateTime.UtcNow - Agent.StatusUpdateDatetime).TotalMinutes < 3

AgentStatusRepository code:

public class AgentStatusRepository
   {
       public static List<Agent> Agents
       {
           get
           {
               return new List<Agent>
               {
                   new Agent {AgentId = "agent_id_1",
                   PresenceStatus = "Available", CurrentWorkload = 1,
                   MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
                   new Agent {AgentId = "agent_id_2",
                   PresenceStatus = "Busy", CurrentWorkload = 2,
                   MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
                   new Agent {AgentId = "agent_id_3",
                   PresenceStatus = "Available", CurrentWorkload = 2,
                   MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow.AddMinutes (-5) },
                   new Agent {AgentId = "agent_id_4",
                   PresenceStatus = "Research", CurrentWorkload = 2,
                   MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow},
                   new Agent {AgentId = "agent_id_5",
                   PresenceStatus = "Available", CurrentWorkload = 2,
                   MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow },
                   new Agent {AgentId = "agent_id_6",
                   PresenceStatus = "Available", CurrentWorkload = 5,
                   MaxWorkload = 5, StatusUpdateDatetime = DateTime.UtcNow}
               };
           }
       }
   }

Pipeline and Filters Class Diagram:

IFilter code:

/// <summary>
    /// A filter to be registered in the message processing pipeline
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public interface IFilter<T>
    {
        /// <summary>
        /// Filter implementing this method would perform processing on the input type T
        /// </summary>
        /// <param name="input">The input to be executed by the filter</param>
        /// <returns></returns>
        T Execute(T input);       
    }

Pipeline code:

/// <summary>
   /// An abstract Pipeline with a list of filters and abstract Process method
   /// </summary>
   /// <typeparam name="T"></typeparam>
   public abstract class Pipeline<T>
   {
       /// <summary>
       /// List of filters in the pipeline
       /// </summary>
       protected readonly List<IFilter<T>> filters = new List<IFilter<T>>();

       /// <summary>
       /// To Register filter in the pipeline
       /// </summary>
       /// <param name="filter">A filter object implementing IFilter interface</param>
       /// <returns></returns>
       public Pipeline<T> Register(IFilter<T> filter)
       {
           filters.Add(filter);
           return this;
       }

       /// <summary>
       /// To start processing on the Pipeline
       /// </summary>
       /// <param name="input">
       /// The input object on which filter processing would execute</param>
       /// <returns></returns>
       public abstract T Process(T input);
   }

AgentSelectionPipeline (ConcretePipeline) code:

/// <summary>
/// Pipeline which to select final list of applicable agents
/// </summary>
public class AgentSelectionPipeline : Pipeline<IEnumerable<Agent>>
{
    /// <summary>
    /// Method which executes the filter on a given Input
    /// </summary>
    /// <param name="input">Input on which filtering
    /// needs to happen as implementing in individual filters</param>
    /// <returns></returns>
    public override IEnumerable<Agent> Process(IEnumerable<Agent> input)
    {
        foreach (var filter in filters)
        {
            input = filter.Execute(input);
        }

        return input;
    }
}

AgentAvailabilityFilter code (ConcreteFilter 1):

/// <summary>
  /// This output of this filter is the list of all available agents
  /// </summary>
  public class AgentAvailabilityFilter : IFilter<IEnumerable<Agent>>
  {
      public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
      {
          if (input == null || input.Count() < 1)
          {
              return input;
          }
          return input.Where(agent => agent.PresenceStatus == "Available");
      }
  }

AgentWorkloadFilter code (ConcreteFilter 2):

/// <summary>
/// The output of this filter is the list of Agent for which CurrentWorkload is less than MaxWorklod
/// </summary>
public class AgentWorkloadFilter : IFilter<IEnumerable<Agent>>
{
    public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
    {
        if (input == null || input.Count() < 1)
        {
            return input;
        }

        return input.Where(agent => agent.CurrentWorkload < agent.MaxWorkload);
    }
}

AgentPresenceUpdateDatetimeFilter code (ConcreteFilter 3):

/// <summary>
/// The output of this filter is the list of
/// all those agents who has StatusUpdateDatetime less than 3 minutes
/// </summary>
public class AgentPresenceUpdateDatetimeFilter : IFilter<IEnumerable<Agent>>
{
    public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
    {
        if (input == null || input.Count() < 1)
        {
            return input;
        }

        return input.Where(agent =>
        (DateTime.UtcNow - agent.StatusUpdateDatetime).TotalMinutes < 3) ;
    }
}

Pipeline and Filter, how to use code:

public class Program
{
    static void Main(string[] args)
    {
        //Get the Agents from repository
        var agentsStatus = AgentStatusRepository.Agents;

        //Construct the Pipeline object
        AgentSelectionPipeline agentStatusPipeline = new AgentSelectionPipeline();

        //Register the filters to be executed
         agentStatusPipeline.Register(new AgentAvailabilityFilter())
            .Register(new AgentWorkloadFilter())
            .Register(new AgentPresenceUpdateDatetimeFilter());

        //Start pipeline processing
        var agentsStatus_1 = agentStatusPipeline.Process(agentsStatus);
    }
}

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here