Introduction
Pipeline and filters is a very useful and neat pattern in the scenario when a set of filtering (processing) needs to be performed on an object to transform it into a useful state, as described below in this picture.
The code used in this article is the complete implementation of Pipeline and Filter pattern in a generic fashion.
Background
Understanding of messaging patterns is critical to understand Enterprise Integration patterns:
Using the Code
The code here considers there is an object repository AgentStatusRepository
, which has a static
property Agents
to return List<Agent>
, this list of AgentStatus
is the input to the pipeline on which various filtering needs to happen.
AgentStatus class (Step 1)
public class AgentStatus
{
public string AgentId { get; set; }
public string PresenceStatus { get; set; }
public int CurrentWorkload { get; set; }
public int MaxWorkload { get; set; }
public DateTime StatusUpdateDatetime { get; set; }
}
Possible Values of PresenceStatus
:
Available
Busy
Research
Offline
And below is the list of filtering that needs to be performed on the List<AgentStatus>
object (Object1
in the picture above), retrieved from the AgentStatusRepository
:
AgentAvailabilityFilter
(Filter1
in the picture above) - Filter the List<Agent>
where Agent.PresenceStatus == 'Available'
AgentWorkloadFilter
(Filter2
in the picture above) - Filter the list filtered from Filter1
where Agent.CurrentWorkload < Agent.MaxWorkload
AgentPresenceUpdateDatetimeFilter
(Filter3
in the picture above) - Filter the list filtered from Filter2
where (DateTime.UtcNow - Agent.StatusUpdateDatetime).TotalMinutes < 3
AgentStatusRepository
code:
public class AgentStatusRepository
{
public static List<Agent> Agents
{
get
{
return new List<Agent>
{
new Agent {AgentId = "agent_id_1",
PresenceStatus = "Available", CurrentWorkload = 1,
MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
new Agent {AgentId = "agent_id_2",
PresenceStatus = "Busy", CurrentWorkload = 2,
MaxWorkload = 2, StatusUpdateDatetime = DateTime.UtcNow },
new Agent {AgentId = "agent_id_3",
PresenceStatus = "Available", CurrentWorkload = 2,
MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow.AddMinutes (-5) },
new Agent {AgentId = "agent_id_4",
PresenceStatus = "Research", CurrentWorkload = 2,
MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow},
new Agent {AgentId = "agent_id_5",
PresenceStatus = "Available", CurrentWorkload = 2,
MaxWorkload = 3, StatusUpdateDatetime = DateTime.UtcNow },
new Agent {AgentId = "agent_id_6",
PresenceStatus = "Available", CurrentWorkload = 5,
MaxWorkload = 5, StatusUpdateDatetime = DateTime.UtcNow}
};
}
}
}
Pipeline and Filters Class Diagram:
IFilter
code:
public interface IFilter<T>
{
T Execute(T input);
}
Pipeline
code:
public abstract class Pipeline<T>
{
protected readonly List<IFilter<T>> filters = new List<IFilter<T>>();
public Pipeline<T> Register(IFilter<T> filter)
{
filters.Add(filter);
return this;
}
public abstract T Process(T input);
}
AgentSelectionPipeline
(ConcretePipeline
) code:
public class AgentSelectionPipeline : Pipeline<IEnumerable<Agent>>
{
public override IEnumerable<Agent> Process(IEnumerable<Agent> input)
{
foreach (var filter in filters)
{
input = filter.Execute(input);
}
return input;
}
}
AgentAvailabilityFilter
code (ConcreteFilter
1):
public class AgentAvailabilityFilter : IFilter<IEnumerable<Agent>>
{
public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
{
if (input == null || input.Count() < 1)
{
return input;
}
return input.Where(agent => agent.PresenceStatus == "Available");
}
}
AgentWorkloadFilter
code (ConcreteFilter
2):
public class AgentWorkloadFilter : IFilter<IEnumerable<Agent>>
{
public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
{
if (input == null || input.Count() < 1)
{
return input;
}
return input.Where(agent => agent.CurrentWorkload < agent.MaxWorkload);
}
}
AgentPresenceUpdateDatetimeFilter
code (ConcreteFilter
3):
public class AgentPresenceUpdateDatetimeFilter : IFilter<IEnumerable<Agent>>
{
public IEnumerable<Agent> Execute(IEnumerable<Agent> input)
{
if (input == null || input.Count() < 1)
{
return input;
}
return input.Where(agent =>
(DateTime.UtcNow - agent.StatusUpdateDatetime).TotalMinutes < 3) ;
}
}
Pipeline
and Filter
, how to use code:
public class Program
{
static void Main(string[] args)
{
var agentsStatus = AgentStatusRepository.Agents;
AgentSelectionPipeline agentStatusPipeline = new AgentSelectionPipeline();
agentStatusPipeline.Register(new AgentAvailabilityFilter())
.Register(new AgentWorkloadFilter())
.Register(new AgentPresenceUpdateDatetimeFilter());
var agentsStatus_1 = agentStatusPipeline.Process(agentsStatus);
}
}