These are the programs used in the course notes. Comments may be found in the course notes book.
#include <vector> #include <iostream> #include <ff/farm.hpp> using namespace ff; class Worker: public ff_node { public: void * svc(void * task) { int * t = (int *)task; std::cout << "Worker got " << *t << std::endl; (*t)++; std::cout << "Worker emitting " << *t << std::endl; return t; } }; class Collector: public ff_node { public: void * svc(void * task) { int * t = (int *)task; std::cout << "Collector received " << *t << "\n"; if (*t == -1) return NULL; return task; } }; // the load-balancer filter class Emitter: public ff_node { public: Emitter(int max_task):ntask(max_task) {}; void * svc(void *) { int * task = new int(ntask); --ntask; if (ntask<0) return NULL; std::cout << "Emitting task " << ntask << std::endl;; return task; } private: int ntask; }; int main(int argc, char * argv[]) { if (argc<3) { std::cerr << "use: " << argv[0] << " nworkers streamlen\n"; return -1; } int nworkers=atoi(argv[1]); int streamlen=atoi(argv[2]); if (!nworkers || !streamlen) { std::cerr << "Wrong parameters values\n"; return -1; } ff_farm<> farm; Emitter E(streamlen); farm.add_emitter(&E); std::vector<ff_node *> w; for(int i=0;i<nworkers;++i) w.push_back(new Worker); farm.add_workers(w); Collector C; farm.add_collector(&C); if (farm.run_and_wait_end()<0) { error("running farm\n"); return -1; } std::cerr << "DONE, time= " << farm.ffTime() << " (ms)\n"; farm.ffStats(std::cerr); return 0; }
#include <vector> #include <iostream> #include <ff/farm.hpp> using namespace ff; class Worker: public ff_node { public: void * svc(void * task) { int * t = (int *)task; (*t)++; return task; } }; class Collector: public ff_node { public: void * svc(void * task) { int * t = (int *)task; std::cout << "Collector got " << *t << std::endl; return GO_ON; } }; class Emitter: public ff_node { public: Emitter(int n) { streamlen = n; task = 0; }; void * svc(void *) { sleep(1); task++; int * t = new int(task); if (task<streamlen) return t; else return NULL; } private: int streamlen; int task; }; int main(int argc, char * argv[]) { if (argc<3) { std::cerr << "use: " << argv[0] << " nworkers streamlen\n"; return -1; } int nworkers=atoi(argv[1]); int streamlen=atoi(argv[2]); if (!nworkers || !streamlen) { std::cerr << "Wrong parameters values\n"; return -1; } ff_farm<> farm; // farm object Emitter E(streamlen); farm.add_emitter(&E); std::vector<ff_node *> w; for(int i=0;i<nworkers;++i) w.push_back(new Worker); farm.add_workers(w); // add all workers to the farm Collector C; farm.add_collector(&C); if (farm.run_and_wait_end()<0) { error("running farm\n"); return -1; } return 0; }
#include <vector> #include <iostream> #include <ff/farm.hpp> static int * results; typedef struct task_t { int i; int t; } TASK; using namespace ff; class Worker: public ff_node { public: void * svc(void * task) { TASK * t = (TASK *) task; results[t->i] = ++(t->t); return GO_ON; } }; class Emitter: public ff_node { public: Emitter(int n) { streamlen = n; task = 0; }; void * svc(void *) { task++; TASK * t = (TASK *) calloc(1,sizeof(TASK)); t->i = task; t->t = task*task; if (task<streamlen) return t; else return NULL; } private: int streamlen; int task; }; int main(int argc, char * argv[]) { int nworkers=atoi(argv[1]); int streamlen=atoi(argv[2]); results = (int *) calloc(streamlen,sizeof(int)); ff_farm<> farm; Emitter E(streamlen); farm.add_emitter(&E); std::vector<ff_node *> w; for(int i=0;i<nworkers;++i) w.push_back(new Worker); farm.add_workers(w); std::cout << "Before starting computation" << std::endl; for(int i=0; i<streamlen; i++) std::cout << i << " : " << results[i] << std::endl; if (farm.run_and_wait_end()<0) { error("running farm\n"); return -1; } std::cout << "After computation" << std::endl; for(int i=0; i<streamlen; i++) std::cout << i << " : " << results[i] << std::endl; return 0; }