magistraleinformaticanetworking:spm:samplefastflowcode
FastFlow sample code
This is the code used during the FastFlow lesson. All code should be compiled with a command such as
g++ -Idir-where-ff-dir-has-been-saved -lpthread -o file file.cpp
- pipe.cpp
// we include farm, pipeline and allocator hpp files // as we use only this three features of FastFlow #include <ff/node.hpp> #include <ff/pipeline.hpp> #include <ff/allocator.hpp> // we use the FastFlow namespace using namespace ff; // we use the fastflow mem allocator // should be initialized (main) before actually using it static ff_allocator allocator; // we implement a pipeline application here: // // Pipeline // // +------+ +------+ +------+ +------+ // |Stage1| -> |Stage2|->|Stage3|->|Stage4| // +------+ +------+ +------+ +------+ // // Stage1 -> generate an integer stream // StageN -> prints the input stream // StageK -> increases integers // // definition of the generic inc stage // subclass ff_node (seq code wrapper) class IntIncStage:public ff_node { // method called on initialization (optional) int svc_init() { std::cout << "IntIncStage " << ff_node::get_my_id() << " inited !" << std::endl; } // method wrapping "task computation code" // for each task on the input stream, this method is called // and it returns the data to be placed onto the output stream // in and out data are pointers, as usual void * svc (void * task) { int * i = (int *) task; std::cout << "Stage " << ff_node::get_my_id() << " got task " << *i ; (*i)++; std::cout << " computed " << *i << std::endl; return task; } // this is called before termination // (optional) void svc_end() { std::cout << "Stage " << ff_node::get_my_id() << " terminating " << std::endl; } }; // last stage: print in stream contents // it's sequential => subclass ff_node class DrainStage:public ff_node { // print initialization message int svc_init() { std::cout << "DrainStage " << ff_node::get_my_id() << " inited !" << std::endl; } // stage body: actual wrapping the sequential code void * svc (void * task) { int * i = (int *) task; std::cout << "Stage " << ff_node::get_my_id() << " got result " << *i << std::endl; allocator.free(task); return task; } }; // stage generating the stream // seq => subclass ff_node // to output data items on the output stream uses ff_send_out() // to terminate the stream, outputs a FF_EOS // to terminate, returns a NULL class GenerateStream:public ff_node { private: int ntasks; public: // constructor: used to pass the stream "size" GenerateStream(int ntasks):ntasks(ntasks) { } int svc_init() { std::cout << "GenerateStream(" << ntasks << ") " << ff_node::get_my_id() << " inited !" << std::endl; } // Stage body: output a stream with ntasks, ntasks-1, ntasks-2, ... // down to 0 // then output the FF_EOS (end of stream) // and terminate (return NULL) void * svc(void * task) { while(ntasks != 0) { int * tt = (int *) allocator.malloc(sizeof(int)); *tt = ntasks; ff::ff_node::ff_send_out((void *) tt); ntasks--; std::cout << "GenerateStream(" << ntasks << ")" << std::endl; } ff_send_out(EOS); return NULL; } }; int main(int argc, char * argv[]) { int tasks = 10; // dummy, should be taken from the input line // allocator must be initialized before using it allocator.init(); // declare a pipeline object ff_pipeline pipe_a; // add stages in order: // the first stage added is the first pipeline stage // the i-th stage added is the i-th pipeline stage pipe_a.add_stage(new GenerateStream(tasks)); pipe_a.add_stage(new IntIncStage); pipe_a.add_stage(new IntIncStage); pipe_a.add_stage(new DrainStage); // now we ca run the application: std::cout << "Starting application ..." << std::endl; pipe_a.run(); std::cout << "Application started" << std::endl; // here more (unrelated) work can be performed ... // when results are needed we should wait for application termination pipe_a.wait(); // alternatively, if we have nothing else to do, we can issue a single // call such as: pipe_a.run_and_wait_end(); // in this case the pipeline is started and its termination awaited // synchronously std::cout << "Application terminated" << std::endl; return(0); }
In case we want to parallelize the increase integers stage with a farm, we can keep the same code and use the following main code:
- altmain.cpp
int main(int argc, char * argv[]) { int tasks = 10; // dummy, should be taken from the input line // allocator must be initialized before using it allocator.init(); // declare a pipeline object ff_pipeline pipe_a; // add stages in order: // the first stage added is the first pipeline stage // the i-th stage added is the i-th pipeline stage pipe_a.add_stage(new GenerateStream(tasks)); // we create a farm to process increments in the second stage // declare a ff_farm object ff_farm<> farm; // default collector farm.add_collector(NULL); // declare worker vector (string) std::vector<ff_node *> workers; // two workers workers.push_back(new IntIncStage); workers.push_back(new IntIncStage); // add workers to farm farm.add_workers(workers); // second pipeline stage is the farm pipe_a.add_stage(&farm); // the third one is the print stream pipe_a.add_stage(new DrainStage); // now we ca run the application: std::cout << "Starting application ..." << std::endl; pipe_a.run(); std::cout << "Application started" << std::endl; // here more (unrelated) work can be performed ... // when results are needed we should wait for application termination pipe_a.wait(); // alternatively, if we have nothing else to do, we can issue a single // call such as: pipe_a.run_and_wait_end(); // in this case the pipeline is started and its termination awaited // synchronously std::cout << "Application terminated" << std::endl; return(0); }
Sample farm code (taken from FF tests/ directory):
- farm.cpp
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ /* *************************************************************************** * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License version 2 as * published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * * As a special exception, you may use this file as part of a free software * library without restriction. Specifically, if other files instantiate * templates or use macros or inline functions from this file, or you compile * this file and link it with other files to produce an executable, this * file does not by itself cause the resulting executable to be covered by * the GNU General Public License. This exception does not however * invalidate any other reasons why the executable file might be covered by * the GNU General Public License. * **************************************************************************** */ /** Very basic test for the FastFlow farm. */ #include <vector> #include <iostream> #include <ff/farm.hpp> using namespace ff; // generic worker class Worker: public ff_node { public: void * svc(void * task) { int * t = (int *)task; std::cout << "Worker " << ff_node::get_my_id() << " received task " << *t << "\n"; return task; } // I don't need the following functions for this test //int svc_init() { return 0; } //void svc_end() {} }; // the gatherer filter class Collector: public ff_node { public: void * svc(void * task) { int * t = (int *)task; if (*t == -1) return NULL; return task; } }; // the load-balancer filter class Emitter: public ff_node { public: Emitter(int max_task):ntask(max_task) {}; void * svc(void *) { int * task = new int(ntask); --ntask; if (ntask<0) return NULL; return task; } private: int ntask; }; int main(int argc, char * argv[]) { if (argc<3) { std::cerr << "use: " << argv[0] << " nworkers streamlen\n"; return -1; } int nworkers=atoi(argv[1]); int streamlen=atoi(argv[2]); if (!nworkers || !streamlen) { std::cerr << "Wrong parameters values\n"; return -1; } ff_farm<> farm; // farm object Emitter E(streamlen); farm.add_emitter(&E); std::vector<ff_node *> w; for(int i=0;i<nworkers;++i) w.push_back(new Worker); farm.add_workers(w); // add all workers to the farm Collector C; farm.add_collector(&C); if (farm.run_and_wait_end()<0) { error("running farm\n"); return -1; } std::cerr << "DONE, time= " << farm.ffTime() << " (ms)\n"; farm.ffStats(std::cerr); return 0; }
Sample accelerator code:
- accelerator.cpp
#include <iostream> #include <math.h> #include <ff/farm.hpp> #include <ff/node.hpp> using namespace std; using namespace ff; // should be global to be accessible from workers #define MAX 1024 double x[MAX]; double y[MAX]; #define ITERNO 100000 int iterno = ITERNO; class Worker: public ff_node { int svc_init() { cout << "Worker initialized" << endl; return 0; } void * svc(void * in) { int i = * ((int *) in); // cout << "Computing " << i << endl; for(int j=0; j<iterno; j++) x[i] = sin(x[i]); y[i] += x[i]; return in; } }; #define NW 2 int main(int argc, char * argv[]) { ffTime(START_TIME); cout << "init " << argc << endl; int nw = (argc==1 ? NW : atoi(argv[1])); iterno = (argc<=2 ? ITERNO : atoi(argv[2])); cout << "using " << nw << " workers iterating " << iterno << " times "<< endl; // init input (fake) for(int i=0; i<MAX; i++) { x[i] = (double)(i*10); y[i] = (double)(-i*13); } cout << "Setting up farm" << endl; ff_farm<> farm(true,nw); // true for offloading, NW for the number of workers ... std::vector<ff_node *> w; // prepare workers for(int i=0;i<nw;++i) w.push_back(new Worker); farm.add_workers(w); // add them to the farm farm.run_then_freeze(); // run farm asynchronously cout << "Sending tasks ..." << endl; int tasks[MAX]; for(int i=0; i<MAX; i++) { tasks[i]=i; farm.offload((void *) &tasks[i]); } farm.offload((void *) FF_EOS); cout << "Waiting termination" << endl; farm.wait(); cout << "Farm terminated after computing for " << farm.ffTime() << endl; ffTime(STOP_TIME); cout << "Spent overall " << ffTime(GET_TIME) << endl; }
magistraleinformaticanetworking/spm/samplefastflowcode.txt · Ultima modifica: 19/05/2011 alle 14:33 (14 anni fa) da Marco Danelutto