===== Sample pipeline applications in FastFlow =====
The code appearing on this page is commented in the course notes PDF, Appendix A: FastFlow.
This is the code of a single concurrent activity printing an "Hello World". We use a single stage pipeline without any kind of stream.
#include
#include
using namespace ff;
class Stage1: public ff_node {
public:
void * svc(void * task) {
std::cout << "Hello world" << std::endl;
return NULL;
}
};
int main(int argc, char * argv[]) {
ff_pipeline pipe;
pipe.add_stage(new Stage1());
ffTime(START_TIME);
if (pipe.run_and_wait_end()<0) {
error("running pipeline\n");
return -1;
}
ffTime(STOP_TIME);
std::cerr << "DONE, pipe time= " << pipe.ffTime() << " (ms)\n";
std::cerr << "DONE, total time= " << ffTime(GET_TIME) << " (ms)\n";
pipe.ffStats(std::cerr);
return 0;
}
The second version following uses a two stage pipeline: the first stage prints "Hello" then send "world" to be printed by the second stage. The program does not terminate.
#include
#include
using namespace ff;
class Stage1: public ff_node {
public:
void * svc(void * task) {
std::cout << "Hello " << std::endl;
char * p = (char *) calloc(sizeof(char),10);
strcpy(p,"World");
sleep(1);
return ((void *)p);
}
};
class Stage2: public ff_node {
public:
void * svc(void * task) {
std::cout << ((char *)task) << std::endl;
free(task);
return GO_ON;
}
};
int main(int argc, char * argv[]) {
ff_pipeline pipe;
pipe.add_stage(new Stage1());
pipe.add_stage(new Stage2());
if (pipe.run_and_wait_end()<0) {
error("running pipeline\n");
return -1;
}
return 0;
}
We add termination here:
#include
#include
using namespace ff;
class Stage1: public ff_node {
public:
Stage1() { first = (1==1); }
void * svc(void * task) {
if(first) {
std::cout << "Hello " << std::endl;
char * p = (char *) calloc(sizeof(char),10);
strcpy(p,"World");
sleep(1);
first = 0;
return ((void *)p);
} else {
return NULL;
}
}
private:
int first;
};
class Stage2: public ff_node {
public:
void * svc(void * task) {
std::cout << ((char *)task) << std::endl;
free(task);
return GO_ON;
}
};
int main(int argc, char * argv[]) {
ff_pipeline pipe;
pipe.add_stage(new Stage1());
pipe.add_stage(new Stage2());
if (pipe.run_and_wait_end()<0) {
error("running pipeline\n");
return -1;
}
return 0;
}