A Generic Data Process Pipeline Library

By: Danny Wang

Abstract: This article introduces a generic data process pipeline library based on Loki library's typelist technique.


Overview

Facing a complex data process procedure, programmers often divide it into several small and simple process steps, and assemble these steps in a pipeline-like structure to make the code more maintainable. Figure 1 shows a pipeline structure which will be discussed in this article.

pipeline structure

In figure 1, the pipeline structure consists of two pipelines: the main pipeline and the sub-pipeline. The main pipeline contains step 1 to step 5, and the sub-pipeline contains step3-1, step3-2, step3-3. Particularly, step 3 in the main pipeline has two roles: it is a normal step in the main pipeline and is also a sub-pipeline which contains sub-steps.

The purpose of a pipeline is to get result data after the data flows through the pipeline.

Each step in a pipeline is a sequential homogeneous container that contains a number of ‘process units’. So call process unit is an object of the type of the step. In a nut shell, you can think pipeline step as a vector of C++ Standard Template Library (STL) which contains 0 to many ‘process units’. Different pipeline steps do different jobs, hence are implemented as different classes.

A pipeline can have an arbitrary number of process steps. Although a pipeline structure can have only one main pipeline, it can have an arbitrary level of nested sub-pipelines. Each process step can have an arbitrary number of process units.

 This strategy can drastically improve the code quality in the following areas:

1)      Reduce the complexity: all the process steps can be very simple classes. Each one does only a simple job.

2)      Building a new pipeline to handle a new issue is extremely simple: In fact, it’s only a matter of changing one line of code.

3)      Help the programmer avoid building a complicated architecture: Because of the ease of building pipeline and also the de-coupling between pipeline and process step, the programmer can concentrate on the development of the process step.

4)      Simplify the architecture: Often a data process procedure consists of many complex “if-then-else” logic. For example, a typical data process is often in the following style:



for( each data )
{
if (data satisfies condition_1)
{
do_something_1();
if (data satisfies condition_2)
do_something_2();
}
}
“if-then-else” has two major drawbacks: first, it makes your code unnecessarily rigid, everything you change in “if-then-else” will cause you re-compile the pipeline; second, as the number of branches growing, your code becomes less and less maintainable.

This library can have the compiler automatically generates the “if-then-else” code for different pipelines which solves the issue nicely.

5)      Highly reusable: all process units are standardized nuts and bolts; therefore they can be easily reused in other pipelines.

Although pipeline has many benefits, when develop such structure, programmers often find themselves facing many difficulties:

1)      The interface of the pipeline is not flexible enough. Therefore, handling new requirement often needs to change the pipeline code itself.

2)      The fact that C++ (at least its standard library) lack of heterogeneous class container makes developing a pipeline difficult. Different steps are different classes. There is no easy way to put them into a STL vector like container.

3)      Often a pipeline consists of many nested “sub-pipelines”, so when a pipeline becomes complicated, it is often difficult to access the process units in the pipeline. For example, it is not uncommon to see such ugly code:


pipeline->sub_pipeline1->sub_sub_pipeline->member1
4)      How to generate “if-then-else” code automatically.

Besides above items, a generic pipeline should also address the following requirements:

1.        Easy to build new pipeline: Ideally, building a new pipeline should be as easy as just pass a new list of the process unit class name to itjust like the following pseudo code:


typedef unit_type_list(class1,class2,class3....)
pipeline_step_typelist;
pipeline<pipeline_step_typelist> new_pipeline;

2.         When data is flowing through the pipeline should automatically process the data. Therefore the pipeline should have the following abilities:

-           The data should be looped through all the process units in the pipeline. This has two meanings: 1) the data can reach all the steps inside the pipeline 2) in each step, the data can be iterated through all the process units in the step.

-          When the data being looped, pipeline must call certain function in step or process unit to process the data.  As a policy, this function must be implemented as operator (). In other words, all process units and process steps must implement operator ().

-          In addition, pipeline should interrupt the looping when encounter a unit or step whose operator () returns “false”. By this way, pipeline can avoid “if-then-else” structure by moving the logic into each process unit. Thus each unit’s operator () must have “bool” as its return type. “true” means the looping should continue, while “false” means interrupt the looping. A process unit should look like the following pseudo code:


class AUnit
{
public:
AUnit(const AUnit& source);
// omit the other constructors
bool operator() (...);
};


3.         Pipeline can be unlimited nested: A process unit in a pipeline can be another pipeline (sub-pipeline), and a sub-pipeline can have “sub-sub-pipeline”, and so on. Obviously, sub-pipeline should follow the same rules of pipeline.

4.          Pipeline should be type safe: The error such as adding a unit with step/type A into step/type B position should be reported in compile time.

5.         Easy to access ALL units in the pipeline: When accessing a unit or step, users shouldn’t care in which sub-pipeline the unit is. In other words, instead of using those ugly codes listed in difficulties 3), the following code should be used.

pipeline.GetStep<Step_Type>().member1

This article discusses all these issues and provides a generic pipeline library to address all these requirements.

An example of pipeline

To make our discussion easier, let’s see an example first.

Say in a TV survey market, we want to know how many people watch certain TV shows at a certain time in a specific geographic area. This can be translated into the following questions: how many respondents watched TV at a certain time (8:00am or 3:12pm) on certain TV station(s) (BBC or CNN) in a given area (Toronto or Vancouver) for how long ( 1 hour or 1 minute). To answer these questions, we need to introduce some data structures first.


struct TuningEvent
{
int startTime; //tuning event beginning time
int endTime; //tuning event ending time
int station; //the station the respondent watched in above time period
char ext; //unused
};

struct RespondentDailyRec
{
int weekDay; // which week day (0-sunday; 1-Monday....)
vector< TuningEvent> tunings; //all tuning events in the weekDay
};

struct Respondent
{
int rspID; //respondent id
int area; //geographic area this respondent belongs to
float weight; //how many people this respondent represents in this survey
int week; //which week this respondent belongs to
vector< RespondentDailyRec > rspDailyRecs; //all 7 days' tuning info of this
//respondent
};

Above structures are quite straightforward. However, some terms should be explained first.

A person in a survey is called ‘respondent’. The action that a respondent tunes TV channel is called “tune”, and the period between two “tune”s is called “tuning event” or “tuning”. A tuning event records the station the respondent watched, the start time, and the end time. A respondent can exist in a survey only one week, thus each respondent has a vector of RespondentDailyRec to record all seven days’ tuning events. Each RespondentDailyRec structure stores all tuning events of that day in a vector called “tunigs”. Respondent in a survey has his own attributes: ‘weight’ means this respondent represents how many people in that ‘area’, ‘week’ means to which week the survey belongs.

Let’s try to calculate the population in a certain area and in certain week. Without pipeline structure, the code would be like this:


float population=0;
for( each respondent)
{
if(respondent.area== the_city)
{
if(respondent.week = the_week)
population+=respondent.weight;
}
}

Build process units

Now let’s use pipeline strategy. The first thing we want to do is developing process units we need. Keep in mind that process units should be simple, and should do only simple job. We don’t want super-complicated process unit because complicated unit defeat the whole purpose of the pipeline strategy.

First, as shown in requirement 2, all units in the pipeline must implement operator ().

Second, we will use two terms in this example: filter and calculator. For those units which can decide if the data can go further in the pipeline, we call them “filter”. Filter has a data member as criteria. Filters filter data based on the return value of their operator () function: if the data satisfies the criteria, operator () returns false, and the data is filtered out; otherwise, the data passes the filter and flows to the next step. Process units other than filter are called “calculator”. Calculator’s operator () function always return true.

Let’s start with AreaFilter. Following is the code of AreaFilter (copy constructor is omitted):



class AreaFilter
{
public:
typedef Respondent& ParamType;
static const bool isPipeline = false;
AreaFilter(const set<int>& criteria): area_criteria( criteria ) {}
bool operator () (Respondent& rsp)
{
if( area_criteria.empty() ) return true;
return area_criteria.count(rsp.area);
}
private:
set<int> area_criteria;
AreaFilter(){}
};

The purpose of AreaFilter is to tell us if a respondent is in certain area. The criteria is the data member:


set<int> area_criteria;


The AreaFilter simply searches the area_criteria to see if the respondent’s area id is already in the set. If it is, operator () returns true. Otherwise, return false. Please notice that an empty area_criteria means we are not interested in area filter, so the filter returns true in this case.

Similar to AreaFilter, WeekFilter filters if a respondent exists in a certain week.

You can get the source code of WeekFilter from:

http://codecentral.borland.com/codecentral/ccweb.exe/listing?id=20455

PopulationCalculator calculates the population in the area and week we are interested in. Because a calculator doesn’t filter data, so it always return true. Instead of having criteria, PopulationCalculator has a data member ‘population’. The calculation is very simple: just accumulates the respondent’s weight into population variable.

class PopulationCalculator
{
typedef Respondent& ParamType;
public:
float population;
static const bool isPipeline = false;
public:
PopulationCalculator():population(0){}
bool operator() ( Respondent& rsp)
{
population += rsp.weight;
return true;
}
};

Build the pipeline library

After building the process units (filters and calculator), we can start to build the pipeline.

The pipeline which calculates the population in certain areas and within certain weeks would like this:

typedef type_list( AreaFilter,WeekFilter,PopulationCalculator) unit_typelist;
Pipeline<unit_typelist> population_pipeline;
for(each respondent)
{
population_pipeline.Process(respondent);
}
cout<<population_pipeline.GetUnit<PopulationCalculator>().population;

In above code, ‘Pipeline’ is the key class. It has all the power we discussed previously. However, how to implement this class is not easy.

A naturally thought of Pipeline would be a std::vector. However, std::vector can not hold the three process units. Because WeekFilter, AreaFilter, PopulationCalculator are different classes. In fact, it is quite frustrating: The C++ standard library doesn’t provide a heterogeneous container capability.

Typelist technique resolves the difficulty

 

A new C++ technique called Typelist can resolve heterogeneous container issue. Before we start to build our pipeline, we need some background information about type list. If you already know typelist, you can skip this section.

Basically, Typelist is a nested, recursive structure, terminated with an empty structure, NullType. Here is the definition of Typelist:

template <class T, class U>
struct Typelist
{
typedef T Head;
typedef U Tail;
};
Some macros are used to create Typelist:

#define TYPELIST_1(T1) Typelist<T1, NullType>
#define TYPELIST_2(T1, T2) Typelist<T1, TYPELIST_1(T2) >

In theory, you can define as many Typelist nodes as you want. But in the real world, hundreds of type list nodes will most likely reach the limit of your compiler.

Some function can help you manipulate typelist. The one relates to our topic is IndexOf. IndexOf searches a certain type in the type list. Following is the format of IndexOf

IndexOf<type_list, type>::value

‘value’ is the index of ‘type’ in the type list. The index starts from 0. If value is -1, then it means ‘type’ is not in the “type_list”.

One thing worth to mention that IndexOf is a compile time function, the index value is decided at compile time.

Using the Typelist structure the compiler can generate a class hierarchy which can hold heterogeneous classes in a vector like structure. GenScatterHierarchy class is used to generate the heterogeneous container. Following is an example to generate such container:



template <class T>

struct Holder
{
T _value;
};
typedef GenScatterHierarchy<TYPELIST_3<int,float,double>, Holder> Element3D;
Elememt3D triple_types;
Field<int>( triple_types ) = 0; //access int field
Field<float>( triple_types)= 1.5; //access via field name
Field<double>( triple_types ) = 3.6; //access float index

Above example first defines a heterogeneous
container by using:

GenScatterHierarchy<TYPELIST_3<int,float,double>, Holder> e;


Here ‘e’ is a heterogeneous container which can hold three types: int, float, double. The container requires a wrapper class for the data type it holds. Class ‘Holder’ is the wrapper class in this example. In fact, we will replace “Holder” with a structure called “StepArray” later.  “StepArray” gives the ability to hold more than one homogeneous process unit in a calculation step.

Field<>() is a helper function to access the element inside the container. Everything works similar to STL vector except it is a compile time container.

To learn more information about Typelist, please refer to [“Modern C++ Design”]

Iterate through the pipeline

Now we got a heterogeneous container, we can start to build the pipeline. Let’s focus on requirement 2 first (heterogeneous class container). Our pipeline class should derive from class GenScatterHierarchy to have the ability to hold heterogeneous classes.

Following is part of Pipeline class:

////////////////////////////////////////////////////////////////////////////
//class Pipeline
//Pipeline is the interface of the pipeline library. Users use this class directly.
//template paramters:
//TList: the typelist which specifies the calculation steps in the pipeline
//StepHolder: the helper type to hold the actual calcualtion steps
//Param: the paramter type of the calculation, used by 'Process' function
//member functions:
//Process(data): loop each element in the pipeline, call their operator() to do the
// calculation on the data, which is specified by the paramter 'data'
////////////////////////////////////////////////////////////////////////////
template <class TList,typename Param,template <class> class StepHolder>
struct Pipeline : public GenScatterHierarchy<TList, StepHolder>
{
typedef TList StepList;
typedef GenScatterHierarchy<TList,StepHolder> ProcessSteps;
static const bool isPipeline = true;
Pipeline(){ }
bool Process(Param data)
{
return ProcessHelper<ProcessSteps, ProcessSteps::TList,Param,bool>::Do(*this,data);
}
...
};

Pipeline class uses the same template parameters as GenScatterHierachy. The first template parameter is the step type list the pipeline holds. The second parameter ‘Param’ is the type pass to ‘Process’ function. The third one, StepHolder, is the ‘wrapper’ class.

By default, the pipeline library passes class StepArray as the third template parameter. Besides being a STL::vector, StepArray is both a holder of step types and a functor. Its operator () sequentially calls the operator () of all the units inside it, and returns the result according to the logic types (‘and’ or ‘or’) in it.

Following is the source code of StepArray

enum OpTypes{ and, or }; //////////////////////////////////////////////////////////////////////////////////////
//class StepArray
//StepArray holds calculators of the same type in a std::vector
//member function:
//operator() (param) : Sequentially call the operator () of each calculator in the vector.
// The result returned by this funciton depends on OpTypes op ('and, 'or')
// In order to return true,
// 1)all calculators in the vector must return true when op =='and';
// 2)at least one the calculators in the vector must return true when op =='or'
//////////////////////////////////////////////////////////////////////////////////////
template < class StepType >
struct StepArray:public vector<StepType>
{
typedef typename StepType::ParamType ParamType;
private:
OpTypes op;
public:
StepArray( OpTypes sourceOp):op(sourceOp){}
StepArray( ):op(and){}
StepArray(const StepArray& sourceArray):vector<StepType> (sourceArray), op(sourceArray.op)
{}
void SetLogicalOp(OpTypes sourceOp){op=sourceOp;}
bool operator()(ParamType param)
{
if(empty()) return true;
vector<StepType>::iterator i;
bool result ;
if(op == and )
{
result = true;
for( i= begin(); i!= end(); ++i)
if (!(*i) (param))
{
result = false;
break;
}
}
else // op==or
{
result = false;
for(i=begin(); i!= end(); ++i)
if( (*i)(param) )
{
result = true;
break;
}
}
return result;
}
};

Let’s go back to the pipeline class. Member function “Process” calls ‘ProcessHelper::Do(…)’ to loop through all steps inside the container and let them process the data. In other words, Process() is a ‘foreach’ like function.

Because all step types in Pipeline are immutable, so we have to do the loop in compile time (that is, we can’t have a normal iteration ‘foreach’ mechanism). The only weapons we can use are using template partial specification and recursive algorithm.

Structure ProcessHelper is designed to achieve this goal.


template<class H,class TList,typename Param, typename R>
struct ProcessHelper;
template<class H,class TList,typename Param>
struct ProcessHelper<H,TList,Param,bool>
{
typedef typename TList::Head Head;
static bool Do(H& obj, Param p )
{
if((Field<Head>(obj))(p))
return ProcessHelper<H,TList::Tail,Param,bool>::Do(obj,p);
else
return false;
}
};
template < class H, typename Param>
struct ProcessHelper<H,NullType,Param,bool>
{
static bool Do(H &, Param )
{return true;}
};

ProcessHelper recursively calls itself to iterate through the Typelist until encounters ‘NullType’, which is the last node of the Typelist. Thanks to partial template specification, compiler can figure out if an element in a typelist is NullType or not.

Following pseudo code is the rationale of the algorithm and should help you understand the algorithm:


while TList::Head is not NullType
{
Get the object of TList::Head;
result = call the process function of the object
if (result ==true)
{
recursively call itself by passing the next node in the TList as parameter
}
else
return false;
}

Build population calculation pipeline

Since we have a pipeline class, now let’s build a very simple pipeline to calculate the population:


typedef TYPELSIT_3(AreaFilter,WeekFilter,PopulationCalculator) CalculatePopulationTypelist;
typedef Pipeline<CalculatePopulationTypelist,Respondent&,StepArray> PopulationCalculatePipeline;
vector< Respondent> respondents;
InitRespondents(); //Initialize respondents
PopulationCalculatePipeline pipeline; //create a pipeline instance
InitPipeline(); // initialize the pipeline by setting correct area criteria and week criteria
for (vector<Respondent>::iterator respondent = respondents.begin(); respondent !=respondents.end();++respondent)
{ //let every respondent data goes through the pipeline
pipeline.Process(*respondent);
}
cout<< Field<PopulationCalculator>(pipeline)(0).population;


As you see, building a pipeline to calculate the population is very simple. First, define a type list ‘CalculatePipelationTypeList’ which contains three types: AreaFilter, WeekFilter, and PopulationCalculator. Then define the pipeline type using this type list. Thus the new pipeline has three process steps defined in the type list. After initialize the pipeline (we will discuss initialize pipeline later), loop through all respondents and push them into the pipeline. After all data flows through the pipeline we get the population.

In fact, the PopulaitonCalculatePipeline does the same thing as following code:


class PopulationCalculatePipeline
{
public:
//StepArray is a vector<T> which implements 'operator ()'
StepArray<AreaFilter> area_filters;
StepArray<WeekFilter> week_filtesr;
StepArray<PopulationCalculator> population_calculators;
//omit some codes
...
bool Process(...)
{
//call or areaFilters in StepArray<AreaFilfer>
//similar to:
//for( i=0;i<area_filters.size();++i)
// area_filters[i](...)
if( area_filters(...) )
if( week_filers(...) )
population_calculators(...);
}
};


The difference is that, by using the pipeline class, the compiler generates all these codes for you.

Access Process Units

You may have noticed that we didn’t discuss how to initialize the pipeline. The reason is: To initialize the pipeline, we must access those filters and calculators in the pipeline. Since pipeline can be nested sub-pipeline, accessing process unit/step can be very inconvenient. For example, to access process units inside step 3 in figure 1, we have to use following ugly code:

Field<step3-1>( Field<Step3>(pipeline)[0]) [index]

Imagine if the pipeline becomes more complicate, accessing process unit/step inside a pipeline could be a nightmare. Therefore a mechanism to simplify the way to access process unit/step is important.

Ideally, accessing process step should be straightforward like this:



Pipeline.GetStep<step3-1>()

Template member function GetStep returns the process step specified by the template parameter (step3-1). No matter where the step is (in a nested pipeline or not), GetStep should return proper reference of the process step.

There are two difficulties to develop GetStep:

1)      Steps can be nested pipeline or a simple process step.

2)      Nested pipeline can be anywhere in the type list. Therefore searching a process step in a pipeline becomes to searching in a multi-dimension matrix.

Following is the algorithm of searching a step in an arbitrary nested pipeline:

1) Search in the type list of the main pipeline, if find the step, return the address of this step;

2) Otherwise, if the first node in the type list is a nested pipeline, recursively calls search function to search the node;

3) If the step is not in the first node, or if the first node is not a nested pipeline (a simple step), search the next node in the type list. Repeat the procedure until find the type or reach the last type of the type list: NullType.

The algorithm is a depth-first searching. Following is the rationale of this algorithm:

StepType * GetStep(pipeline, step)
{
get pipeline typelist;
if (step is NullType) return 0;
if (pipeline is not a pipeline) return 0;
address = 0;
if (step in the typelist)
return the address of the step;
else
{
Get the first node in the typelist;
if( the first node is a nested pipeline)
{
address = GetStep(first_node,step);
if (address!=0)
return address;
}
return GetStep(the_rest_nodes_listed_in_pipeline, step);
}
}

Although the algorithm is simple, difficulties are still there: Because everything of type list is in static world, we have to rely on compile time algorithm. Again, partial template specialization is our only weapon.

To access a step in pipeline, we need a helper class called GetStepHelper. GetStepHelper has four template parameters. The first one H is the pipeline type. The second parameter TList is the step type list of the pipeline. The third parameter T is the step type you want to access. The last one is the return type of GetStep().  GetStepHelper class has a partially specialized form, GetStepHelper<…NullType>, for the searched type giving the search an exit. When the searched type is NullType, it returns 0.

GetStepHelper has only one static public function: GetStep. According to whether finding the step type T in the type list of the pipeline or not, GetStep calls two different overloaded private function: GetField(H&, Int2Type<true>) when find type T in the pipeline type list; or calls GetField(H&,Int2Type<false>) when type T is not found in the type list (The reason to use Int2Type<> is that member template function can not be partially specialized).   The latter one calls another private template function GetStep_(…) to find out if the step of type T is in the first node of the type list. Because any node in the type list could be either a nested pipeline (which’s isPipeline= true) or a simple process step (isPipeline=false), GetStep_ has two overloaded forms: when the node is a simple process step (isPipeline = false), GetStep_(…, Int2Type<false>) is called and always returns 0; Otherwise, GetStep_(…, Int2Type<true>) is called, which recursively calls itself by passing the Head’s type list to GetStepHelper structure. Thus GetStep_ can search all child pipelines in the first node. After search in the first node and all its child pipeline (if any), GetField tests the return address of GetStep_, if the return address is 0 (invalid), GetField searches the rest of the type list in the pipeline by recursively calls GetStepHelper with passing the second template parameter “Tail” of the type list.

The source code is listed here:

//////////////////////////////////////////////////////
//GetStepHelper helps pipeline to get a step in it
//template parameters:
//H: the pipeline type
//TList: the typelist in the pipeline
//T: the type of the element you want to access
//RType the return type of GetStep()
//////////////////////////////////////////////////////
template <class H,class TList,class T, class RType >
struct GetStepHelper
{
typedef typename TList::Head Head;
typedef typename TList::Tail Tail;
private:
template <class R>
static RType*
GetStep_(R& obj, Int2Type<true> )
{
return GetStepHelper<Head, Head::StepList, T,RType>::GetStep( obj );
}
template <class R>
static RType* GetStep_(R& obj, Int2Type<false> )
{
return 0;
}
static RType* GetField(H& obj, Int2Type<true>)
{
return &(Field<T>( obj));
}
static RType* GetField(H& obj, Int2Type<false>)
{
RType* result= GetStep_<Head> ( Field<Head>(obj)[0],Int2Type< Head::isPipeline >() ) ;
if( result != 0 )
return result;
else
return GetStepHelper<H, Tail, T,RType>::GetStep( obj);
}
public:
static RType* GetStep(H& obj)
{
return GetField(obj, Int2Type< IndexOf<TList,T>::value!=-1 >());
}
};
template <class H,class T,class RType>
struct GetStepHelper<H,NullType, T,RType>
{
static RType* GetStep(H&)
{
return 0;
}
};

Calculate duration

Since we got everything we need to build a pipeline, let’s build a complex calculation. Say we want to calculate “how long do those people 1) in certain area(s) 2) in certain week(s) 3) in certain week day(s) watch certain TV station(s). Please notice that these questions (area, week, week day, and station) are in different data structures. Therefore, we need to build multiple pipelines. Each pipeline handles one data structure.

Before building a pipeline, we need some more filters and calculators.

  • WeekDayFilter filters RespondentDailyRec to find out if the daily respondent record is in certain weekday(s).
  • StationFilter filters tuning event which tuned into certain station(s). It works on TuningEvent structure. 
  • DurationCalculator accumulates the duration respondents watched. It works on TuningEvent structure. 

You can find the source code in: 
http://codecentral.borland.com/codecentral/ccweb.exe/listing?id=20455

After implementing the new filters and calculators, we can start to build the new pipeline.

Before building the pipeline, let’s see the logic of duration calculation:



for (each respondent)
{
if ( respondent passes AreaFilter) // if the respondent is in the areas
{
Accumulate population using PopulationCalculator;
if (respondent passes WeekFilter) //if the respondent is in the weeks
{
for (each RespondentDailyRec of the respondent) //each week day
{
if(the RespondentDailyRec passes WeekDayFilter) //if the rec is on the week day we want
{
for( each tuning event in the RespondentDailyRec)
{
if(the event passes StationFilter) //if the event is watching the station we want
{
Accumulate duration using DurationCalculator;
}
}
}
}
}
}
}

By the way, you can see how nasty the if-then-else structure is here. It is not only error prone, but also very rigid: even if you just want to change the order of the filters or calculators, you would have to pay special attention not to break the code. In fact the calculation shown in this example is drastically simplified. In the real world, the calculation logic is much more complicated. Thus the if-then-else structure would cause more difficulties to maintain.

Expressed in pipeline structure, above logic includes the following steps:

Step 1:  Check if the respondent in the areas we want

Step 2: Accumulate the population

Step 3: Check if the respondent in the weeks we want

Step 4: For each RespondentDailyRec in the respondent, check if it is in the week days we want

Step 5: For each tuning event in the RespondentDailyRec structure, check if the event watching the station we want

Step 6: Accumulate duration

If a respondent and its data reach a certain step, then it means the respondent satisfies all conditions it passed. For example, if a respondent reaches step 6, it means that the respondent is in correct area, correct week, watched correct station(s) in correct week day(s).

Building the pipeline to calculate the duration is building the pipeline to hold the six steps above. Because area and week are in Respondent data structure, let’s first start with building the respondent pipeline to handle this data structure (Respondent pipeline is also the main pipeline).


typedef TYPELIST_4(AreaFilter,PopulationCalculator, WeekFilter , DailyRspPipeline ) Mainlist;
class RspPipeLine: public Pipeline<Mainlist,Respondent&,StepArray>
{
public:
RspPipeLine()
{
GetStep<DailyRspPipeline>().push_back(DailyRspPipeline());
}
};

The first three steps are the first three types in the pipeline type list. The last step of RspPipeLine is a nested pipeline: DailyRspPipeline. DailyRspPipeline has two roles: process unit and sub-pipeline. As a process unit, DailyRspPipeline implements operator (); in the mean time, DailyRspPipeline is also a pipeline. As a pipeline, DailyRspPipline processes data in RespondentDailyRec structure. The operator () links the two roles together.

Following is the source code of DailyRspPipeline:

typedef TYPELIST_2(WeekDayFilter, EventPipeline) Sublist;
class DailyRspPipeline : public Pipeline<Sublist,RespondentDailyRec&, StepArray>
{
public:
typedef Respondent& ParamType;
DailyRspPipeline()
{
GetStep<EventPipeline>().push_back(EventPipeline());
}
bool operator()(Respondent& rsp)
{
vector< RespondentDailyRec >::iterator dailyRsp;
for(dailyRsp = rsp.rspDailyRecs.begin();dailyRsp!=rsp.rspDailyRecs.end(); ++dailyRsp)
{
Process(*dailyRsp);
}
return true;
}
};

Step 4 is the first type in DailyRspPipeline’s type list. The second type of DailyRspPipeline is another nested pipeline: EventPipeline. EventPipeline processes data structure TuningEvent in RespondentDailyRec.



typedef TYPELIST_2( StationFilter, DurationCalculator) EventCalcualtorList;
class EventPipeline: public Pipeline< EventCalcualtorList, TuningEvent&, StepArray>
{
public:
typedef RespondentDailyRec& ParamType;
EventPipeline(){}
bool operator() (RespondentDailyRec& dailyRsp)
{
vector< TuningEvent >::iterator event;
for( event = dailyRsp.tunings.begin(); event!= dailyRsp.tunings.end(); ++event)
{
Process( *event );
}
return true;
}
};
Now we have built everything. Let’s take a look how to use our new pipeline to calculate the duration satisfied all the criteria.

int main(int argc, char* argv[])
{
TestData testdata; //build test data
RspPipeLine pipeline; //build pipeline

//Initialize area Filter
set<int> areas;
areas.insert(1);
areas.insert(101);
//add areaFilter into the pipeline
pipeline.GetStep<AreaFilter>().push_back(AreaFilter(areas));
//add populationCalculator into the pipeline
pipeline.GetStep<PopulationCalculator>().push_back(PopulationCalculator());
//initialize weekFilter
set<int> weeks;
weeks.insert(1);
weeks.insert(2);
//add weekfilter into the pipeline
pipeline.GetStep<WeekDayFilter>().push_back( WeekDayFilter(weeks) );
//initialize stationFilter
set<int> stations;
stations.insert(1);
//add stationFilter into the pipeline
pipeline.GetStep<StationFilter>().push_back( StationFilter( stations ) );
//add durationCalculator into the pipeline
pipeline.GetStep<DurationCalculator>().push_back( DurationCalculator() );

vector<Respondent>::iterator rsp;
//let all data go through the pipeline
for(rsp = testdata.respondents.begin(); rsp!=testdata.respondents.end();++rsp)
{
pipeline.Process(*rsp);
}
cout<<"n"<<"population: "<< pipeline.GetUnit<PopulationCalculator>(0).population;
cout<<"n Duration:"<< pipeline.GetUnit<DurationCalculator>(0).duration;

getchar();
return 0;
}

Summary

Pipeline strategy encourages people to develop highly reusable and maintainable code. Usually developing a pipeline application has two steps:

1) Develop process units. Appendix A illustrates how to develop process unit.

2) Build pipeline by inheriting from the ‘Pipeline’ classes.

Pros and Cons

Of course there are more than one way to implement pipeline strategy. Normally they fall into two categories: static pipeline and dynamic pipeline. Compare with dynamic strategy, static pipeline has some pros and cons.

Pros:

Because pipeline is defined in compile time, the compiler can check type mistakes for you. In other words, if you mistakenly insert a unit of step/type A into step/type B, you don’t need to wait until run time or, even worse, your users to find them, the compiler will tell you.

All pipeline steps and process units in a pipeline can be accessed in a unified way. Users do not care if the pipeline is nested or not, or in which sub-pipeline the step or unit is. 

Cons:

All steps of the pipeline can only be built in compile time. That is, you can not dynamically change your pipeline steps. All you can do is add instances of process unit into your predefined steps.

Because the pipeline library relies on compile time to generate the code the steps for the pipeline shouldn’t be too long. A pipeline which has hundreds of steps will most likely exceed the limit of the compiler.

Appendix A: Process unit and user defined pipeline specification

1.         Process unit must be a ‘functor’. In other words, it must implement operator (). By default, operator () should return bool. If you have special requirement to return other types, you have to implement “ProcessHelper” partial template specification of that type. The parameter type of operator () must be consistence with the pipeline template parameter type (the third template parameter). In addition, process unit must typedef the ParamType. Following is an example:



typedef Respondent& ParamType;

2.          Process unit must have a static const data member ‘isPipeline’. This variable helps the pipeline to figure out if a unit inside the pipeline is a nested pipeline or a pure process unit. Setting ‘isPipeline’ to ‘true’ means this unit is a nested pipeline. Otherwise, it is a simple unit.

3.         All user defined pipelines must inherit from Pipeline class

4.         If a step is a nested pipeline, then this step should contain only one (the first one) instance (unit) in it.

Appendix B: Example files explanation

 

You can download the library source code and an example source code from the following address:

http://codecentral.borland.com/codecentral/ccweb.exe/listing?id=20455

1.       pipeline.h

This file is the only file of pipeline library. To use the pipeline library, the only thing you need to do is include this file (It relies on some Loki library header files). However, if you don’t want to download Loki, you can include ‘LokiPipelinePart.h’ instead (I make this default). ‘LokiPipelinePart.h’ includes all Loki code you need to compile the pipeline library. If you do want use Loki, include "loki/HierarchyGenerators.h" and remember to make it searchable in you project.

2.       LokiPipelinePart.h

This file includes everything you need from Loki. I prepared this file to those people who do not bother to download Loki library. Let ‘Pipeline.h’ file include this file in case you do not have Loki library. However, this file is modified for Borland C++ compiler, it can not be compiled by GCC. If you want to try pipeline library on compilers other than Borland C++, use Loki library directly.

3.       Loki Library

You can download the library from the following address:

http://sourceforge.net/projects/loki-lib/

Also, you can find ported version on specific compilers such as Borland C++, Visual C++, g++, etc. on Internet.

4.       Calculator.h and Calculator.cpp

All the process unit classes for the example.

5.       ExamplePipeline.h

All the pipeline classes for the example.

6.       testdata.h and testdata.cpp

A class to produce some test data.

7.       testtypes.h

Data structure for the example.

8.       maintest.cpp

The main function for the example.


Appendix D:  Books and Web Sites

[Modern C++ Design]

Andrei Alexandrescu

Modern C++ Design

Generic Programming and Design Patterns Applied

Addison-Wesley, Reading, MA, 2001

 

[C++ Templates]

David Vandevoorde, Nicolai M.Joscuttis

C++ Templates

The Complete Guide

Addison-Wesley, 2002

ISBN: 0-201-73484-2

 

[Boost]

The Boost Repository for Free, Peer-Reviewed C++ Libraries

http://www.boost.org

 

[Loki]

A free C++ library provides all feature described in [Modern C++]

http://sourceforge.net/projects/loki-exp


Appendix D:  Contact information

For further discussion, you can reach me at the following email address:

Danny Wang

Toronto Canada

Wxg250@yahoo.com




Server Response from: ETNASC01