===== FastFlow sample code =====
This is the code used during the FastFlow lesson.
All code should be compiled with a command such as
g++ -Idir-where-ff-dir-has-been-saved -lpthread -o file file.cpp
// we include farm, pipeline and allocator hpp files
// as we use only this three features of FastFlow
#include
#include
#include
// we use the FastFlow namespace
using namespace ff;
// we use the fastflow mem allocator
// should be initialized (main) before actually using it
static ff_allocator allocator;
// we implement a pipeline application here:
//
// Pipeline
//
// +------+ +------+ +------+ +------+
// |Stage1| -> |Stage2|->|Stage3|->|Stage4|
// +------+ +------+ +------+ +------+
//
// Stage1 -> generate an integer stream
// StageN -> prints the input stream
// StageK -> increases integers
//
// definition of the generic inc stage
// subclass ff_node (seq code wrapper)
class IntIncStage:public ff_node {
// method called on initialization (optional)
int svc_init() {
std::cout << "IntIncStage " << ff_node::get_my_id()
<< " inited !" << std::endl;
}
// method wrapping "task computation code"
// for each task on the input stream, this method is called
// and it returns the data to be placed onto the output stream
// in and out data are pointers, as usual
void * svc (void * task) {
int * i = (int *) task;
std::cout << "Stage " << ff_node::get_my_id()
<< " got task " << *i ;
(*i)++;
std::cout << " computed " << *i << std::endl;
return task;
}
// this is called before termination
// (optional)
void svc_end() {
std::cout << "Stage " << ff_node::get_my_id()
<< " terminating " << std::endl;
}
};
// last stage: print in stream contents
// it's sequential => subclass ff_node
class DrainStage:public ff_node {
// print initialization message
int svc_init() {
std::cout << "DrainStage " << ff_node::get_my_id()
<< " inited !" << std::endl;
}
// stage body: actual wrapping the sequential code
void * svc (void * task) {
int * i = (int *) task;
std::cout << "Stage " << ff_node::get_my_id()
<< " got result " << *i << std::endl;
allocator.free(task);
return task;
}
};
// stage generating the stream
// seq => subclass ff_node
// to output data items on the output stream uses ff_send_out()
// to terminate the stream, outputs a FF_EOS
// to terminate, returns a NULL
class GenerateStream:public ff_node {
private:
int ntasks;
public:
// constructor: used to pass the stream "size"
GenerateStream(int ntasks):ntasks(ntasks) { }
int svc_init() {
std::cout << "GenerateStream(" << ntasks << ") "
<< ff_node::get_my_id() << " inited !" << std::endl;
}
// Stage body: output a stream with ntasks, ntasks-1, ntasks-2, ...
// down to 0
// then output the FF_EOS (end of stream)
// and terminate (return NULL)
void * svc(void * task) {
while(ntasks != 0) {
int * tt = (int *) allocator.malloc(sizeof(int));
*tt = ntasks;
ff::ff_node::ff_send_out((void *) tt);
ntasks--;
std::cout << "GenerateStream(" << ntasks << ")" << std::endl;
}
ff_send_out(EOS);
return NULL;
}
};
int main(int argc, char * argv[]) {
int tasks = 10; // dummy, should be taken from the input line
// allocator must be initialized before using it
allocator.init();
// declare a pipeline object
ff_pipeline pipe_a;
// add stages in order:
// the first stage added is the first pipeline stage
// the i-th stage added is the i-th pipeline stage
pipe_a.add_stage(new GenerateStream(tasks));
pipe_a.add_stage(new IntIncStage);
pipe_a.add_stage(new IntIncStage);
pipe_a.add_stage(new DrainStage);
// now we ca run the application:
std::cout << "Starting application ..." << std::endl;
pipe_a.run();
std::cout << "Application started" << std::endl;
// here more (unrelated) work can be performed ...
// when results are needed we should wait for application termination
pipe_a.wait();
// alternatively, if we have nothing else to do, we can issue a single
// call such as: pipe_a.run_and_wait_end();
// in this case the pipeline is started and its termination awaited
// synchronously
std::cout << "Application terminated" << std::endl;
return(0);
}
In case we want to parallelize the increase integers stage with a farm, we can keep the same code and use the following main code:
int main(int argc, char * argv[]) {
int tasks = 10; // dummy, should be taken from the input line
// allocator must be initialized before using it
allocator.init();
// declare a pipeline object
ff_pipeline pipe_a;
// add stages in order:
// the first stage added is the first pipeline stage
// the i-th stage added is the i-th pipeline stage
pipe_a.add_stage(new GenerateStream(tasks));
// we create a farm to process increments in the second stage
// declare a ff_farm object
ff_farm<> farm;
// default collector
farm.add_collector(NULL);
// declare worker vector (string)
std::vector workers;
// two workers
workers.push_back(new IntIncStage);
workers.push_back(new IntIncStage);
// add workers to farm
farm.add_workers(workers);
// second pipeline stage is the farm
pipe_a.add_stage(&farm);
// the third one is the print stream
pipe_a.add_stage(new DrainStage);
// now we ca run the application:
std::cout << "Starting application ..." << std::endl;
pipe_a.run();
std::cout << "Application started" << std::endl;
// here more (unrelated) work can be performed ...
// when results are needed we should wait for application termination
pipe_a.wait();
// alternatively, if we have nothing else to do, we can issue a single
// call such as: pipe_a.run_and_wait_end();
// in this case the pipeline is started and its termination awaited
// synchronously
std::cout << "Application terminated" << std::endl;
return(0);
}
Sample farm code (taken from FF tests/ directory):
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/* ***************************************************************************
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 as
* published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
* As a special exception, you may use this file as part of a free software
* library without restriction. Specifically, if other files instantiate
* templates or use macros or inline functions from this file, or you compile
* this file and link it with other files to produce an executable, this
* file does not by itself cause the resulting executable to be covered by
* the GNU General Public License. This exception does not however
* invalidate any other reasons why the executable file might be covered by
* the GNU General Public License.
*
****************************************************************************
*/
/**
Very basic test for the FastFlow farm.
*/
#include
#include
#include
using namespace ff;
// generic worker
class Worker: public ff_node {
public:
void * svc(void * task) {
int * t = (int *)task;
std::cout << "Worker " << ff_node::get_my_id()
<< " received task " << *t << "\n";
return task;
}
// I don't need the following functions for this test
//int svc_init() { return 0; }
//void svc_end() {}
};
// the gatherer filter
class Collector: public ff_node {
public:
void * svc(void * task) {
int * t = (int *)task;
if (*t == -1) return NULL;
return task;
}
};
// the load-balancer filter
class Emitter: public ff_node {
public:
Emitter(int max_task):ntask(max_task) {};
void * svc(void *) {
int * task = new int(ntask);
--ntask;
if (ntask<0) return NULL;
return task;
}
private:
int ntask;
};
int main(int argc, char * argv[]) {
if (argc<3) {
std::cerr << "use: "
<< argv[0]
<< " nworkers streamlen\n";
return -1;
}
int nworkers=atoi(argv[1]);
int streamlen=atoi(argv[2]);
if (!nworkers || !streamlen) {
std::cerr << "Wrong parameters values\n";
return -1;
}
ff_farm<> farm; // farm object
Emitter E(streamlen);
farm.add_emitter(&E);
std::vector w;
for(int i=0;i
Sample accelerator code:
#include
#include
#include
#include
using namespace std;
using namespace ff;
// should be global to be accessible from workers
#define MAX 1024
double x[MAX];
double y[MAX];
#define ITERNO 100000
int iterno = ITERNO;
class Worker: public ff_node {
int svc_init() {
cout << "Worker initialized" << endl;
return 0;
}
void * svc(void * in) {
int i = * ((int *) in);
// cout << "Computing " << i << endl;
for(int j=0; j farm(true,nw); // true for offloading, NW for the number of workers ...
std::vector w; // prepare workers
for(int i=0;i