===== Sample pipeline applications in FastFlow ===== The code appearing on this page is commented in the course notes PDF, Appendix A: FastFlow. This is the code of a single concurrent activity printing an "Hello World". We use a single stage pipeline without any kind of stream. #include #include using namespace ff; class Stage1: public ff_node { public: void * svc(void * task) { std::cout << "Hello world" << std::endl; return NULL; } }; int main(int argc, char * argv[]) { ff_pipeline pipe; pipe.add_stage(new Stage1()); ffTime(START_TIME); if (pipe.run_and_wait_end()<0) { error("running pipeline\n"); return -1; } ffTime(STOP_TIME); std::cerr << "DONE, pipe time= " << pipe.ffTime() << " (ms)\n"; std::cerr << "DONE, total time= " << ffTime(GET_TIME) << " (ms)\n"; pipe.ffStats(std::cerr); return 0; } The second version following uses a two stage pipeline: the first stage prints "Hello" then send "world" to be printed by the second stage. The program does not terminate. #include #include using namespace ff; class Stage1: public ff_node { public: void * svc(void * task) { std::cout << "Hello " << std::endl; char * p = (char *) calloc(sizeof(char),10); strcpy(p,"World"); sleep(1); return ((void *)p); } }; class Stage2: public ff_node { public: void * svc(void * task) { std::cout << ((char *)task) << std::endl; free(task); return GO_ON; } }; int main(int argc, char * argv[]) { ff_pipeline pipe; pipe.add_stage(new Stage1()); pipe.add_stage(new Stage2()); if (pipe.run_and_wait_end()<0) { error("running pipeline\n"); return -1; } return 0; } We add termination here: #include #include using namespace ff; class Stage1: public ff_node { public: Stage1() { first = (1==1); } void * svc(void * task) { if(first) { std::cout << "Hello " << std::endl; char * p = (char *) calloc(sizeof(char),10); strcpy(p,"World"); sleep(1); first = 0; return ((void *)p); } else { return NULL; } } private: int first; }; class Stage2: public ff_node { public: void * svc(void * task) { std::cout << ((char *)task) << std::endl; free(task); return GO_ON; } }; int main(int argc, char * argv[]) { ff_pipeline pipe; pipe.add_stage(new Stage1()); pipe.add_stage(new Stage2()); if (pipe.run_and_wait_end()<0) { error("running pipeline\n"); return -1; } return 0; }