Overview
Facing a
complex data process procedure, programmers often divide it into
several small
and simple process steps, and assemble these steps in a pipeline-like
structure
to make the code more maintainable. Figure 1 shows a pipeline structure
which will
be discussed in this article.

In figure
1, the pipeline structure consists of two pipelines: the main pipeline
and the
sub-pipeline. The main pipeline contains step 1 to step 5, and the
sub-pipeline
contains step3-1, step3-2, step3-3. Particularly, step 3 in the main
pipeline
has two roles: it is a normal step in the main pipeline and is also a
sub-pipeline which contains sub-steps.
The purpose
of a pipeline is to get result data after the data flows through the
pipeline.
Each step
in a pipeline is a sequential homogeneous container that contains a
number of ‘process
units’. So call process unit is an object of the type of the step. In a
nut shell,
you can think pipeline step as a vector of C++ Standard Template
Library (STL)
which contains 0 to many ‘process units’. Different pipeline steps do
different
jobs, hence are implemented as different classes.
A pipeline
can have an arbitrary number of process steps. Although a pipeline
structure
can have only one main pipeline, it can have an arbitrary level of
nested sub-pipelines.
Each process step can have an arbitrary number of process units.
This strategy can
drastically improve the code
quality in the following areas:
1)
Reduce
the complexity: all the process steps can be
very simple classes. Each one does only a simple job.
2)
Building
a new pipeline to handle a new issue is
extremely simple: In fact, it’s only a matter of changing one line of
code.
3)
Help
the programmer avoid building a complicated
architecture: Because of the ease of building pipeline and also the
de-coupling
between pipeline and process step, the programmer can concentrate on
the
development of the process step.
4)
Simplify
the
architecture: Often a data process procedure consists of many complex
“if-then-else” logic. For example, a typical data process is often in
the following
style:
for( each data )
{
if (data satisfies condition_1)
{
do_something_1();
if (data satisfies condition_2)
do_something_2();
}
}
“if-then-else”
has two major drawbacks: first, it makes your code unnecessarily rigid,
everything you change in “if-then-else” will cause you re-compile the
pipeline;
second, as the number of branches growing, your code becomes less and
less
maintainable.
This library can have the compiler
automatically
generates the “if-then-else” code for different pipelines which solves
the issue
nicely.
5)
Highly
reusable: all process units are standardized nuts
and bolts; therefore they can be easily reused in other pipelines.
Although
pipeline has many benefits, when develop such structure, programmers
often find
themselves facing many difficulties:
1)
The
interface of the pipeline is not flexible enough.
Therefore, handling new requirement often needs to change the pipeline
code
itself.
2)
The
fact that C++ (at least its standard library) lack
of heterogeneous class container makes developing a pipeline difficult.
Different
steps are different classes. There is no easy way to put them into a
STL vector
like container.
3)
Often
a pipeline
consists of many nested “sub-pipelines”, so when a pipeline becomes
complicated, it is often difficult to access the process units in the
pipeline.
For example, it is not uncommon to see such ugly code:
pipeline->sub_pipeline1->sub_sub_pipeline->member1
4)
How
to generate “if-then-else” code automatically.
Besides above items, a generic
pipeline should also address the following requirements:
1.
Easy
to build new
pipeline: Ideally, building a new pipeline should be as easy as just
pass a new
list of the process unit class name to it,just
like the following pseudo code:
typedef unit_type_list(class1,class2,class3....)
pipeline_step_typelist;
pipeline<pipeline_step_typelist> new_pipeline;
2.
When
data is
flowing through the pipeline should automatically process the data.
Therefore
the pipeline should have the following abilities:
-
The data should be looped through all the
process units in the pipeline. This has two meanings: 1) the data can
reach all
the steps inside the pipeline 2) in each step, the data can be iterated
through
all the process units in the step.
-
When
the data
being looped, pipeline must call certain function in step or process
unit to process
the data. As a policy, this function
must be implemented as operator (). In other words, all process units
and
process steps must implement operator ().
-
In
addition, pipeline
should interrupt the looping when encounter a unit or step whose
operator ()
returns “false”. By this way, pipeline can avoid “if-then-else”
structure by
moving the logic into each process unit. Thus each unit’s operator ()
must have
“bool” as its return type. “true” means the looping should continue,
while
“false” means interrupt the looping. A process unit should look like
the
following pseudo code:
class AUnit
{
public:
AUnit(const AUnit& source);
// omit the other constructors
bool operator() (...);
};
3.
Pipeline
can be unlimited
nested: A process unit in a pipeline can be another pipeline
(sub-pipeline),
and a sub-pipeline can have “sub-sub-pipeline”, and so on. Obviously,
sub-pipeline should follow the same rules of pipeline.
4.
Pipeline should be type safe: The error such
as adding a unit with step/type A into step/type B position should be
reported
in compile time.
5.
Easy
to access ALL units in the pipeline: When accessing a unit or step,
users
shouldn’t care in which sub-pipeline the unit is. In other words,
instead of using
those ugly codes listed in difficulties 3), the following code should
be used.
pipeline.GetStep<Step_Type>().member1
This
article discusses all these issues and provides a generic pipeline
library to
address all these requirements.
An
example of pipeline
To make our discussion easier,
let’s see an example first.
Say in a TV survey market, we
want to know how many people watch certain TV shows at a certain time
in a specific
geographic area. This can be translated into the following questions:
how many
respondents watched TV at a certain time (8:00am or 3:12pm) on certain
TV
station(s) (BBC or CNN) in a given area (Toronto or Vancouver) for how
long ( 1
hour or 1 minute). To answer these questions, we need to introduce some
data
structures first.
struct TuningEvent
{
int startTime; //tuning event beginning time
int endTime; //tuning event ending time
int station; //the station the respondent watched in above time period
char ext; //unused
};
struct RespondentDailyRec
{
int weekDay; // which week day (0-sunday; 1-Monday....)
vector< TuningEvent> tunings; //all tuning events in the weekDay
};
struct Respondent
{
int rspID; //respondent id
int area; //geographic area this respondent belongs to
float weight; //how many people this respondent represents in this survey
int week; //which week this respondent belongs to
vector< RespondentDailyRec > rspDailyRecs; //all 7 days' tuning info of this
//respondent
};
Above structures are quite
straightforward. However, some terms should be explained first.
A person in a survey is called
‘respondent’. The action that a respondent tunes TV channel is called
“tune”,
and the period between two “tune”s is called “tuning event” or
“tuning”. A
tuning event records the station the respondent watched, the start
time, and
the end time. A respondent can exist in a survey only one week, thus
each
respondent has a vector of RespondentDailyRec to record all seven days’
tuning
events. Each RespondentDailyRec structure stores all tuning events of
that day in
a vector called “tunigs”. Respondent in a survey has his own
attributes:
‘weight’ means this respondent represents how many people in that
‘area’,
‘week’ means to which week the survey belongs.
Let’s try to calculate the
population in a certain area and in certain week. Without pipeline
structure,
the code would be like this:
float population=0;
for( each respondent)
{
if(respondent.area== the_city)
{
if(respondent.week = the_week)
population+=respondent.weight;
}
}
Build
process units
Now let’s use pipeline
strategy. The first thing we want to do is developing process units we
need. Keep
in mind that process units should be simple, and should do only simple
job. We
don’t want super-complicated process unit because complicated unit
defeat the
whole purpose of the pipeline strategy.
First, as shown in requirement
2, all units in the pipeline must implement operator ().
Second, we will use two terms
in this example: filter and calculator. For those units which can
decide if the
data can go further in the pipeline, we call them “filter”. Filter has
a data
member as criteria. Filters filter data based on the return value of
their
operator () function: if the data satisfies the criteria, operator ()
returns
false, and the data is filtered out; otherwise, the data passes the
filter and
flows to the next step. Process units other than filter are called
“calculator”. Calculator’s operator () function always return true.
Let’s start with AreaFilter.
Following is the code of AreaFilter (copy constructor is omitted):
class AreaFilter
{
public:
typedef Respondent& ParamType;
static const bool isPipeline = false;
AreaFilter(const set<int>& criteria): area_criteria( criteria ) {}
bool operator () (Respondent& rsp)
{
if( area_criteria.empty() ) return true;
return area_criteria.count(rsp.area);
}
private:
set<int> area_criteria;
AreaFilter(){}
};
The purpose of AreaFilter is to
tell us if a respondent is in certain area. The criteria is the data
member:
set<int> area_criteria;
The AreaFilter simply searches
the area_criteria to see if the respondent’s area id is already in the
set. If
it is, operator () returns true. Otherwise, return false. Please notice
that an
empty area_criteria means we are not interested in area filter, so the
filter
returns true in this case.
Similar to AreaFilter, WeekFilter
filters if a respondent exists in a certain week.
You can get the source
code of WeekFilter from:
http://codecentral.borland.com/codecentral/ccweb.exe/listing?id=20455
PopulationCalculator
calculates the population in the area and week we are interested in.
Because a
calculator doesn’t filter data, so it always return true. Instead of
having
criteria, PopulationCalculator has
a data
member ‘population’. The calculation is very simple: just accumulates
the
respondent’s weight into population variable.
class PopulationCalculator
{
typedef Respondent& ParamType;
public:
float population;
static const bool isPipeline = false;
public:
PopulationCalculator():population(0){}
bool operator() ( Respondent& rsp)
{
population += rsp.weight;
return true;
}
};
Build
the pipeline library
After building the process
units (filters and calculator), we can start to build the pipeline.
The
pipeline which calculates the population in certain areas and within
certain
weeks would like this:
typedef type_list( AreaFilter,WeekFilter,PopulationCalculator) unit_typelist;
Pipeline<unit_typelist> population_pipeline;
for(each respondent)
{
population_pipeline.Process(respondent);
}
cout<<population_pipeline.GetUnit<PopulationCalculator>().population;
In above code, ‘Pipeline’ is
the key class. It has all the power we discussed previously. However,
how to
implement this class is not easy.
A naturally thought of Pipeline
would be a std::vector. However, std::vector can not hold the three
process
units. Because WeekFilter, AreaFilter, PopulationCalculator are
different
classes. In fact, it is quite frustrating: The C++ standard library
doesn’t
provide a heterogeneous container capability.
Typelist
technique resolves the difficulty
A new C++ technique called
Typelist can resolve heterogeneous container issue. Before we start to
build
our pipeline, we need some background information about type list. If
you
already know typelist, you can skip this section.
Basically,
Typelist is a nested, recursive structure, terminated with an empty
structure, NullType. Here is the definition of
Typelist:
template <class T, class U>
struct Typelist
{
typedef T Head;
typedef U Tail;
};
Some
macros are used to create Typelist:
#define TYPELIST_1(T1) Typelist<T1, NullType>
#define TYPELIST_2(T1, T2) Typelist<T1, TYPELIST_1(T2) >
In theory, you can define as
many Typelist nodes as you want. But in the real world, hundreds of
type list
nodes will most likely reach the limit of your compiler.
Some
function can help you manipulate typelist. The one relates to our topic
is
IndexOf. IndexOf searches a certain type in the type list. Following is
the
format of IndexOf
IndexOf<type_list, type>::value
‘value’ is the index of ‘type’
in the type list. The index starts from 0. If value is -1, then it
means ‘type’
is not in the “type_list”.
One thing worth to mention that
IndexOf is a compile time function, the index value is decided at
compile time.
Using the Typelist structure
the compiler can generate a class hierarchy which can hold
heterogeneous
classes in a vector like structure. GenScatterHierarchy class is used
to
generate the heterogeneous container. Following is an example to
generate such
container:
template <class T>
struct Holder
{
T _value;
};
typedef GenScatterHierarchy<TYPELIST_3<int,float,double>, Holder> Element3D;
Elememt3D triple_types;
Field<int>( triple_types ) = 0; //access int field
Field<float>( triple_types)= 1.5; //access via field name
Field<double>( triple_types ) = 3.6; //access float index
Above example first defines a heterogeneous
container by using:
GenScatterHierarchy<TYPELIST_3<int,float,double>, Holder> e;
Here ‘e’ is a heterogeneous
container which can hold three types: int, float, double. The container
requires a wrapper class for the data type it holds. Class ‘Holder’ is
the wrapper
class in this example. In fact, we will replace “Holder” with a
structure
called “StepArray” later. “StepArray”
gives the ability to hold more than one homogeneous process unit in a
calculation step.
Field<>() is a helper
function to access the element inside the container. Everything works
similar
to STL vector except it is a compile time container.
To learn more information about
Typelist, please refer to [“Modern C++
Design”]
Iterate
through the pipeline
Now we got a heterogeneous
container, we can start to build the pipeline. Let’s focus on
requirement 2
first (heterogeneous class container). Our pipeline class should derive
from
class GenScatterHierarchy to have the ability to hold heterogeneous
classes.
Following
is part of Pipeline class:
////////////////////////////////////////////////////////////////////////////
//class Pipeline
//Pipeline is the interface of the pipeline library. Users use this class directly.
//template paramters:
//TList: the typelist which specifies the calculation steps in the pipeline
//StepHolder: the helper type to hold the actual calcualtion steps
//Param: the paramter type of the calculation, used by 'Process' function
//member functions:
//Process(data): loop each element in the pipeline, call their operator() to do the
// calculation on the data, which is specified by the paramter 'data'
////////////////////////////////////////////////////////////////////////////
template <class TList,typename Param,template <class> class StepHolder>
struct Pipeline : public GenScatterHierarchy<TList, StepHolder>
{
typedef TList StepList;
typedef GenScatterHierarchy<TList,StepHolder> ProcessSteps;
static const bool isPipeline = true;
Pipeline(){ }
bool Process(Param data)
{
return ProcessHelper<ProcessSteps, ProcessSteps::TList,Param,bool>::Do(*this,data);
}
...
};
Pipeline class uses the same
template parameters as GenScatterHierachy. The first template parameter
is the
step type list the pipeline holds. The second parameter ‘Param’ is the
type
pass to ‘Process’ function. The third one, StepHolder, is the ‘wrapper’
class.
By default, the pipeline
library passes class StepArray as the third template parameter. Besides
being a
STL::vector, StepArray is both a holder of step types and a functor.
Its
operator () sequentially calls the operator () of all the units inside
it, and
returns the result according to the logic types (‘and’ or ‘or’) in it.
Following
is the source code of StepArray
enum OpTypes{ and, or }; //////////////////////////////////////////////////////////////////////////////////////
//class StepArray
//StepArray holds calculators of the same type in a std::vector
//member function:
//operator() (param) : Sequentially call the operator () of each calculator in the vector.
// The result returned by this funciton depends on OpTypes op ('and, 'or')
// In order to return true,
// 1)all calculators in the vector must return true when op =='and';
// 2)at least one the calculators in the vector must return true when op =='or'
//////////////////////////////////////////////////////////////////////////////////////
template < class StepType >
struct StepArray:public vector<StepType>
{
typedef typename StepType::ParamType ParamType;
private:
OpTypes op;
public:
StepArray( OpTypes sourceOp):op(sourceOp){}
StepArray( ):op(and){}
StepArray(const StepArray& sourceArray):vector<StepType> (sourceArray), op(sourceArray.op)
{}
void SetLogicalOp(OpTypes sourceOp){op=sourceOp;}
bool operator()(ParamType param)
{
if(empty()) return true;
vector<StepType>::iterator i;
bool result ;
if(op == and )
{
result = true;
for( i= begin(); i!= end(); ++i)
if (!(*i) (param))
{
result = false;
break;
}
}
else // op==or
{
result = false;
for(i=begin(); i!= end(); ++i)
if( (*i)(param) )
{
result = true;
break;
}
}
return result;
}
};
Let’s go back to the pipeline
class. Member function “Process” calls ‘ProcessHelper::Do(…)’ to loop
through
all steps inside the container and let them process the data. In other
words, Process()
is a ‘foreach’ like function.
Because all step types in Pipeline are immutable, so we have to do
the loop in
compile time (that is, we can’t have a normal iteration ‘foreach’
mechanism).
The only weapons we can use are using template partial specification
and
recursive algorithm.
Structure ProcessHelper is designed
to achieve this goal.
template<class H,class TList,typename Param, typename R>
struct ProcessHelper;
template<class H,class TList,typename Param>
struct ProcessHelper<H,TList,Param,bool>
{
typedef typename TList::Head Head;
static bool Do(H& obj, Param p )
{
if((Field<Head>(obj))(p))
return ProcessHelper<H,TList::Tail,Param,bool>::Do(obj,p);
else
return false;
}
};
template < class H, typename Param>
struct ProcessHelper<H,NullType,Param,bool>
{
static bool Do(H &, Param )
{return true;}
};
ProcessHelper recursively calls
itself to iterate through the Typelist until encounters ‘NullType’,
which is the
last node of the Typelist. Thanks to partial template specification,
compiler
can figure out if an element in a typelist is NullType or not.
Following
pseudo code is the rationale of the algorithm and should help
you understand the algorithm:
while TList::Head is not NullType
{
Get the object of TList::Head;
result = call the process function of the object
if (result ==true)
{
recursively call itself by passing the next node in the TList as parameter
}
else
return false;
}
Build population
calculation pipeline
Since we have a pipeline class, now let’s
build a very simple pipeline to calculate the population:
typedef TYPELSIT_3(AreaFilter,WeekFilter,PopulationCalculator) CalculatePopulationTypelist;
typedef Pipeline<CalculatePopulationTypelist,Respondent&,StepArray> PopulationCalculatePipeline;
vector< Respondent> respondents;
InitRespondents(); //Initialize respondents
PopulationCalculatePipeline pipeline; //create a pipeline instance
InitPipeline(); // initialize the pipeline by setting correct area criteria and week criteria
for (vector<Respondent>::iterator respondent = respondents.begin(); respondent !=respondents.end();++respondent)
{ //let every respondent data goes through the pipeline
pipeline.Process(*respondent);
}
cout<< Field<PopulationCalculator>(pipeline)(0).population;
As you see, building a pipeline
to calculate the population is very simple. First, define a type list
‘CalculatePipelationTypeList’ which contains three types: AreaFilter,
WeekFilter, and PopulationCalculator. Then define the pipeline type
using this
type list. Thus the new pipeline has three process steps defined in the
type
list. After initialize the pipeline (we will discuss initialize
pipeline later),
loop through all respondents and push them into the pipeline. After all
data
flows through the pipeline we get the population.
In fact, the
PopulaitonCalculatePipeline does the same thing as following code:
class PopulationCalculatePipeline
{
public:
//StepArray is a vector<T> which implements 'operator ()'
StepArray<AreaFilter> area_filters;
StepArray<WeekFilter> week_filtesr;
StepArray<PopulationCalculator> population_calculators;
//omit some codes
...
bool Process(...)
{
//call or areaFilters in StepArray<AreaFilfer>
//similar to:
//for( i=0;i<area_filters.size();++i)
// area_filters[i](...)
if( area_filters(...) )
if( week_filers(...) )
population_calculators(...);
}
};
The difference is that, by using the pipeline class, the compiler generates all these codes for you.
Access
Process Units
You may have noticed that we
didn’t discuss how to initialize the pipeline. The reason is: To
initialize the
pipeline, we must access those filters and calculators in the pipeline.
Since
pipeline can be nested sub-pipeline, accessing process unit/step can be
very
inconvenient. For example, to access process units inside step 3 in
figure 1,
we have to use following ugly code:
Field<step3-1>(
Field<Step3>(pipeline)[0]) [index]
Imagine if the pipeline becomes
more complicate, accessing process unit/step inside a pipeline could be
a
nightmare. Therefore a mechanism to simplify the way to access process
unit/step
is important.
Ideally, accessing process step
should be straightforward like this:
Pipeline.GetStep<step3-1>()
Template member function
GetStep returns the process step specified by the template parameter
(step3-1).
No matter where the step is (in a nested pipeline or not), GetStep
should
return proper reference of the process step.
There are two difficulties to
develop GetStep:
1)
Steps
can be
nested pipeline or a simple process step.
2)
Nested
pipeline
can be anywhere in the type list. Therefore searching a process step in
a
pipeline becomes to searching in a multi-dimension matrix.
Following is the algorithm of
searching a step in an arbitrary nested pipeline:
1) Search in the type list of
the main pipeline, if find the step, return the address of this step;
2) Otherwise, if the first node
in the type list is a nested pipeline, recursively calls search
function to
search the node;
3) If the step is not in the
first node, or if the first node is not a nested pipeline (a simple
step), search
the next node in the type list. Repeat the procedure until find the
type or
reach the last type of the type list: NullType.
The
algorithm is a depth-first searching. Following is the rationale of
this algorithm:
StepType * GetStep(pipeline, step)
{
get pipeline typelist;
if (step is NullType) return 0;
if (pipeline is not a pipeline) return 0;
address = 0;
if (step in the typelist)
return the address of the step;
else
{
Get the first node in the typelist;
if( the first node is a nested pipeline)
{
address = GetStep(first_node,step);
if (address!=0)
return address;
}
return GetStep(the_rest_nodes_listed_in_pipeline, step);
}
}
Although the
algorithm is
simple, difficulties are still there: Because everything of type list
is in
static world, we have to rely on compile time algorithm. Again, partial
template specialization is our only weapon.
To access a step in pipeline,
we need a helper class called GetStepHelper. GetStepHelper has four
template
parameters. The first one H is the pipeline type. The second parameter
TList is
the step type list of the pipeline. The third parameter T is the step
type you
want to access. The last one is the return type of GetStep(). GetStepHelper class has a partially
specialized form, GetStepHelper<…NullType>, for the searched type
giving
the search an exit. When the searched type is NullType, it returns 0.
GetStepHelper has only one
static public function: GetStep. According to whether finding the step
type T
in the type list of the pipeline or not, GetStep calls two different
overloaded
private function: GetField(H&, Int2Type<true>) when find type
T in
the pipeline type list; or calls GetField(H&,Int2Type<false>)
when
type T is not found in the type list (The reason to use
Int2Type<> is
that member template function can not be partially specialized). The
latter one calls another private template function GetStep_(…) to find
out if
the step of type T is in the first node of the type list. Because any
node in
the type list could be either a nested pipeline (which’s isPipeline=
true) or a
simple process step (isPipeline=false), GetStep_ has two overloaded
forms: when
the node is a simple process step (isPipeline = false), GetStep_(…,
Int2Type<false>) is called and always returns 0; Otherwise,
GetStep_(…,
Int2Type<true>) is called, which recursively calls itself by
passing the Head’s
type list to GetStepHelper structure. Thus GetStep_ can search all
child
pipelines in the first node. After search in the first node and all its
child
pipeline (if any), GetField tests the return address of GetStep_, if
the return
address is 0 (invalid), GetField searches the rest of the type list in
the
pipeline by recursively calls GetStepHelper with passing the second
template
parameter “Tail” of the type list.
The
source code is listed here:
//////////////////////////////////////////////////////
//GetStepHelper helps pipeline to get a step in it
//template parameters:
//H: the pipeline type
//TList: the typelist in the pipeline
//T: the type of the element you want to access
//RType the return type of GetStep()
//////////////////////////////////////////////////////
template <class H,class TList,class T, class RType >
struct GetStepHelper
{
typedef typename TList::Head Head;
typedef typename TList::Tail Tail;
private:
template <class R>
static RType*
GetStep_(R& obj, Int2Type<true> )
{
return GetStepHelper<Head, Head::StepList, T,RType>::GetStep( obj );
}
template <class R>
static RType* GetStep_(R& obj, Int2Type<false> )
{
return 0;
}
static RType* GetField(H& obj, Int2Type<true>)
{
return &(Field<T>( obj));
}
static RType* GetField(H& obj, Int2Type<false>)
{
RType* result= GetStep_<Head> ( Field<Head>(obj)[0],Int2Type< Head::isPipeline >() ) ;
if( result != 0 )
return result;
else
return GetStepHelper<H, Tail, T,RType>::GetStep( obj);
}
public:
static RType* GetStep(H& obj)
{
return GetField(obj, Int2Type< IndexOf<TList,T>::value!=-1 >());
}
};
template <class H,class T,class RType>
struct GetStepHelper<H,NullType, T,RType>
{
static RType* GetStep(H&)
{
return 0;
}
};
Calculate
duration
Since we got everything we need
to build a pipeline, let’s build a complex calculation. Say we want to
calculate “how long do those people 1) in certain area(s) 2) in certain
week(s)
3) in certain week day(s) watch certain TV station(s). Please notice
that these
questions (area, week, week day, and station) are in different data
structures.
Therefore, we need to build multiple pipelines. Each pipeline handles
one data
structure.
Before building a pipeline, we
need some more filters and calculators.
- WeekDayFilter filters RespondentDailyRec to
find out if the daily respondent record is in certain weekday(s).
- StationFilter
filters tuning event which tuned into certain station(s). It works on
TuningEvent structure.
- DurationCalculator accumulates the duration
respondents watched. It works on TuningEvent structure.
You can find the source code in:
http://codecentral.borland.com/codecentral/ccweb.exe/listing?id=20455
After implementing the new
filters and calculators, we can start to build the new pipeline.
Before building the pipeline,
let’s see the logic of duration calculation:
for (each respondent)
{
if ( respondent passes AreaFilter) // if the respondent is in the areas
{
Accumulate population using PopulationCalculator;
if (respondent passes WeekFilter) //if the respondent is in the weeks
{
for (each RespondentDailyRec of the respondent) //each week day
{
if(the RespondentDailyRec passes WeekDayFilter) //if the rec is on the week day we want
{
for( each tuning event in the RespondentDailyRec)
{
if(the event passes StationFilter) //if the event is watching the station we want
{
Accumulate duration using DurationCalculator;
}
}
}
}
}
}
}
By the way, you can see how
nasty the if-then-else structure is here. It is not only error prone,
but also
very rigid: even if you just want to change the order of the filters or
calculators, you would have to pay special attention not to break the
code. In
fact the calculation shown in this example is drastically simplified.
In the
real world, the calculation logic is much more complicated. Thus the
if-then-else structure would cause more difficulties to maintain.
Expressed in pipeline structure,
above logic includes the following steps:
Step 1: Check if
the respondent in the areas we want
Step 2: Accumulate the
population
Step 3: Check if the respondent
in the weeks we want
Step 4: For each
RespondentDailyRec in the respondent, check if it is in the week days
we want
Step 5: For each tuning event
in the RespondentDailyRec structure, check if the event watching the
station we
want
Step 6: Accumulate duration
If a respondent and its data
reach a certain step, then it means the respondent satisfies all
conditions it
passed. For example, if a respondent reaches step 6, it means that the
respondent is in correct area, correct week, watched correct station(s)
in
correct week day(s).
Building the pipeline to
calculate the duration is building the pipeline to hold the six steps
above. Because
area and week are in Respondent data structure, let’s first start with
building
the respondent pipeline to handle this data structure (Respondent
pipeline is
also the main pipeline).
typedef TYPELIST_4(AreaFilter,PopulationCalculator, WeekFilter , DailyRspPipeline ) Mainlist;
class RspPipeLine: public Pipeline<Mainlist,Respondent&,StepArray>
{
public:
RspPipeLine()
{
GetStep<DailyRspPipeline>().push_back(DailyRspPipeline());
}
};
The first three steps are the
first three types in the pipeline type list. The last step of
RspPipeLine is a
nested pipeline: DailyRspPipeline. DailyRspPipeline has two roles:
process unit
and sub-pipeline. As a process unit, DailyRspPipeline implements
operator (); in
the mean time, DailyRspPipeline is also a pipeline. As a pipeline,
DailyRspPipline processes data in RespondentDailyRec structure. The
operator ()
links the two roles together.
Following
is the source code of DailyRspPipeline:
typedef TYPELIST_2(WeekDayFilter, EventPipeline) Sublist;
class DailyRspPipeline : public Pipeline<Sublist,RespondentDailyRec&, StepArray>
{
public:
typedef Respondent& ParamType;
DailyRspPipeline()
{
GetStep<EventPipeline>().push_back(EventPipeline());
}
bool operator()(Respondent& rsp)
{
vector< RespondentDailyRec >::iterator dailyRsp;
for(dailyRsp = rsp.rspDailyRecs.begin();dailyRsp!=rsp.rspDailyRecs.end(); ++dailyRsp)
{
Process(*dailyRsp);
}
return true;
}
};
Step 4 is the first type in
DailyRspPipeline’s type list. The second type of DailyRspPipeline is
another
nested pipeline: EventPipeline. EventPipeline processes data structure
TuningEvent in RespondentDailyRec.
typedef TYPELIST_2( StationFilter, DurationCalculator) EventCalcualtorList;
class EventPipeline: public Pipeline< EventCalcualtorList, TuningEvent&, StepArray>
{
public:
typedef RespondentDailyRec& ParamType;
EventPipeline(){}
bool operator() (RespondentDailyRec& dailyRsp)
{
vector< TuningEvent >::iterator event;
for( event = dailyRsp.tunings.begin(); event!= dailyRsp.tunings.end(); ++event)
{
Process( *event );
}
return true;
}
};
Now we
have built everything. Let’s take a look how to use our new pipeline to
calculate the duration satisfied all the criteria.
int main(int argc, char* argv[])
{
TestData testdata; //build test data
RspPipeLine pipeline; //build pipeline
//Initialize area Filter
set<int> areas;
areas.insert(1);
areas.insert(101);
//add areaFilter into the pipeline
pipeline.GetStep<AreaFilter>().push_back(AreaFilter(areas));
//add populationCalculator into the pipeline
pipeline.GetStep<PopulationCalculator>().push_back(PopulationCalculator());
//initialize weekFilter
set<int> weeks;
weeks.insert(1);
weeks.insert(2);
//add weekfilter into the pipeline
pipeline.GetStep<WeekDayFilter>().push_back( WeekDayFilter(weeks) );
//initialize stationFilter
set<int> stations;
stations.insert(1);
//add stationFilter into the pipeline
pipeline.GetStep<StationFilter>().push_back( StationFilter( stations ) );
//add durationCalculator into the pipeline
pipeline.GetStep<DurationCalculator>().push_back( DurationCalculator() );
vector<Respondent>::iterator rsp;
//let all data go through the pipeline
for(rsp = testdata.respondents.begin(); rsp!=testdata.respondents.end();++rsp)
{
pipeline.Process(*rsp);
}
cout<<"n"<<"population: "<< pipeline.GetUnit<PopulationCalculator>(0).population;
cout<<"n Duration:"<< pipeline.GetUnit<DurationCalculator>(0).duration;
getchar();
return 0;
}
Summary
Pipeline strategy encourages
people to develop highly reusable and maintainable code. Usually
developing a
pipeline application has two steps:
1) Develop process units.
Appendix A illustrates how to develop process unit.
2) Build pipeline by inheriting
from the ‘Pipeline’ classes.
Pros and Cons
Of course there are more than
one way to implement pipeline strategy. Normally they fall into two
categories:
static pipeline and dynamic pipeline. Compare with dynamic strategy,
static
pipeline has some pros and cons.
Pros:
Because pipeline is defined in
compile time, the compiler can check type mistakes for you. In other
words, if
you mistakenly insert a unit of step/type A into step/type B, you don’t
need to
wait until run time or, even worse, your users to find them, the
compiler will
tell you.
All pipeline steps and process
units in a pipeline can be accessed in a unified way. Users do not care
if the
pipeline is nested or not, or in which sub-pipeline the step or unit is.
Cons:
All steps of the pipeline can
only be built in compile time. That is, you can not dynamically change
your
pipeline steps. All you can do is add instances of process unit into
your
predefined steps.
Because the pipeline library
relies on compile time to generate the code the steps for the pipeline
shouldn’t be too long. A pipeline which has hundreds of steps will most
likely
exceed the limit of the compiler.
Appendix
A: Process unit and user defined pipeline specification
1.
Process
unit must
be a ‘functor’. In other words, it must implement operator (). By
default,
operator () should return bool. If you have special requirement to
return other
types, you have to implement “ProcessHelper” partial template
specification of
that type. The parameter type of operator () must be consistence with
the
pipeline template parameter type (the third template parameter). In
addition,
process unit must typedef the ParamType. Following is an example:
typedef Respondent& ParamType;
2.
Process unit must have a static const data
member ‘isPipeline’. This variable helps the pipeline to figure out if
a unit
inside the pipeline is a nested pipeline or a pure process unit.
Setting
‘isPipeline’ to ‘true’ means this unit is a nested pipeline. Otherwise,
it is a
simple unit.
3.
All
user defined
pipelines must inherit from Pipeline class
4.
If a
step is a
nested pipeline, then this step should contain only one (the first one)
instance (unit) in it.
Appendix B: Example files explanation
You can download the
library source code and an example source code from
the following address:
http://codecentral.borland.com/codecentral/ccweb.exe/listing?id=20455
1.
pipeline.h
This file is the only file
of pipeline library. To use the pipeline library, the only thing you
need to do
is include this file (It relies on some Loki library header files).
However, if
you don’t want to download Loki, you can include ‘LokiPipelinePart.h’
instead
(I make this default). ‘LokiPipelinePart.h’ includes all Loki code you
need to
compile the pipeline library. If you do want use Loki, include
"loki/HierarchyGenerators.h"
and remember to make it searchable in you project.
2.
LokiPipelinePart.h
This file includes everything
you need from Loki. I prepared this file to those people who do not
bother to
download Loki library. Let ‘Pipeline.h’ file include this file in case
you do
not have Loki library. However, this file is modified for Borland C++
compiler,
it can not be compiled by GCC. If you want to try pipeline library on
compilers
other than Borland C++, use Loki library directly.
3.
Loki
Library
You can download the library
from the following address:
http://sourceforge.net/projects/loki-lib/
Also, you can find ported
version on specific compilers such as Borland C++, Visual C++, g++,
etc. on
Internet.
4.
Calculator.h
and
Calculator.cpp
All the process unit classes
for the example.
5.
ExamplePipeline.h
All the pipeline classes for
the example.
6.
testdata.h
and
testdata.cpp
A class to produce some test
data.
7.
testtypes.h
Data structure for the
example.
8.
maintest.cpp
The main function for the
example.
Appendix
D: Books
and Web Sites
[Modern C++ Design]
Andrei Alexandrescu
Modern C++ Design
Generic Programming and
Design Patterns Applied
Addison-Wesley, Reading,
MA,
2001
[C++ Templates]
David Vandevoorde,
Nicolai M.Joscuttis
C++ Templates
The Complete Guide
Addison-Wesley, 2002
ISBN: 0-201-73484-2
[Boost]
The Boost Repository for
Free, Peer-Reviewed C++ Libraries
http://www.boost.org
[Loki]
A free C++ library
provides all feature described in [Modern
C++]
http://sourceforge.net/projects/loki-exp
Appendix D: Contact
information
For further discussion,
you can reach me at
the following email address:
Danny Wang
Toronto
Canada
Wxg250@yahoo.com
Connect with Us