Sample FastFlow farms

These are the programs used in the course notes. Comments may be found in the course notes book.

farm1.cpp
#include <vector>
#include <iostream>
#include <ff/farm.hpp>
 
 
using namespace ff;
 
class Worker: public ff_node {
public:
 
    void * svc(void * task) {
        int * t = (int *)task;
        std::cout << "Worker got " << *t << std::endl;
        (*t)++;
      	std::cout << "Worker emitting " << *t << std::endl;
        return t;
    }
};
 
class Collector: public ff_node {
public:
    void * svc(void * task) {
        int * t = (int *)task;
        std::cout << "Collector received " << *t << "\n";
        if (*t == -1) return NULL;
        return task;
    }
};
 
// the load-balancer filter
class Emitter: public ff_node {
public:
    Emitter(int max_task):ntask(max_task) {};
 
    void * svc(void *) {	
        int * task = new int(ntask);
        --ntask;
        if (ntask<0) return NULL;
        std::cout << "Emitting task " << ntask << std::endl;;
        return task;
    }
private:
    int ntask;
};
 
 
int main(int argc, char * argv[]) {
 
    if (argc<3) {
        std::cerr << "use: " 
                  << argv[0] 
                  << " nworkers streamlen\n";
        return -1;
    }
 
    int nworkers=atoi(argv[1]);
    int streamlen=atoi(argv[2]);
 
    if (!nworkers || !streamlen) {
        std::cerr << "Wrong parameters values\n";
        return -1;
    }
 
    ff_farm<> farm;
 
    Emitter E(streamlen);
    farm.add_emitter(&E);
 
    std::vector<ff_node *> w;
    for(int i=0;i<nworkers;++i) w.push_back(new Worker);
    farm.add_workers(w); 
 
    Collector C;
    farm.add_collector(&C);
 
    if (farm.run_and_wait_end()<0) {
        error("running farm\n");
        return -1;
    }
    std::cerr << "DONE, time= " << farm.ffTime() << " (ms)\n";
    farm.ffStats(std::cerr);
 
    return 0;
}
farme.cpp
#include <vector>
#include <iostream>
#include <ff/farm.hpp>
 
 
using namespace ff;
 
class Worker: public ff_node {
public:
    void * svc(void * task) {
        int * t = (int *)task;
        (*t)++;
        return task;
    }
};
 
 
class Collector: public ff_node {
public:
    void * svc(void * task) {
        int * t = (int *)task;
        std::cout << "Collector got " << *t << std::endl;
        return GO_ON;
    }
};
 
class Emitter: public ff_node {
public:
    Emitter(int n) {
        streamlen = n; 
        task = 0; 
    };
 
    void * svc(void *) {	
        sleep(1);
        task++;
        int * t = new int(task);
        if (task<streamlen) 
            return t;
        else 
            return NULL; 
    }
 
private:
    int streamlen;
    int task; 
};
 
 
int main(int argc, char * argv[]) {
 
    if (argc<3) {
        std::cerr << "use: " 
                  << argv[0] 
                  << " nworkers streamlen\n";
        return -1;
    }
 
    int nworkers=atoi(argv[1]);
    int streamlen=atoi(argv[2]);
 
    if (!nworkers || !streamlen) {
        std::cerr << "Wrong parameters values\n";
        return -1;
    }
 
    ff_farm<> farm; // farm object
 
    Emitter E(streamlen);
    farm.add_emitter(&E);
 
    std::vector<ff_node *> w;
    for(int i=0;i<nworkers;++i) w.push_back(new Worker);
    farm.add_workers(w); // add all workers to the farm
 
    Collector C;
    farm.add_collector(&C);
 
    if (farm.run_and_wait_end()<0) {
        error("running farm\n");
        return -1;
    }
    return 0;
}
farmNoC.cpp
#include <vector>
#include <iostream>
#include <ff/farm.hpp>
 
static int * results; 
 
typedef struct task_t {
    int i; 
    int t; 
} TASK; 
 
using namespace ff;
 
class Worker: public ff_node {
public:
    void * svc(void * task) {
        TASK * t = (TASK *) task; 
        results[t->i] = ++(t->t);
        return GO_ON;
    }
};
 
 
class Emitter: public ff_node {
public:
    Emitter(int n) {
        streamlen = n; 
        task = 0; 
    };
 
    void * svc(void *) {	
        task++;
        TASK * t = (TASK *) calloc(1,sizeof(TASK));
        t->i = task;
        t->t = task*task;
        if (task<streamlen) 
            return t;
        else 
            return NULL; 
    }
 
private:
    int streamlen;
    int task; 
};
 
 
int main(int argc, char * argv[]) {
 
    int nworkers=atoi(argv[1]);
    int streamlen=atoi(argv[2]);
    results = (int *) calloc(streamlen,sizeof(int));
 
    ff_farm<> farm;
 
    Emitter E(streamlen);
    farm.add_emitter(&E);
 
    std::vector<ff_node *> w;
    for(int i=0;i<nworkers;++i)
        w.push_back(new Worker);
    farm.add_workers(w);
 
    std::cout << "Before starting computation" << std::endl;
    for(int i=0; i<streamlen; i++) 
        std::cout << i << " : " << results[i] << std::endl;
    if (farm.run_and_wait_end()<0) {
        error("running farm\n");
        return -1;
    }
    std::cout << "After computation" << std::endl;
    for(int i=0; i<streamlen; i++) 
        std::cout << i << " : " << results[i] << std::endl;
    return 0;
}