===== Pipeline with Pthreads, sample code =====
Sample code relative to
* implementation of an unbounded channel with standard C++ and Pthread mechanisms
* implementation of the main forking the three stages and computing the pipeline
* implementation of a bounded channel
* implementation of the main using the bounded channel
* sample makefile
#include
#include
template class Channel {
private:
std::queue chan;
pthread_mutex_t mutex;
pthread_cond_t cond;
public:
Channel() {
mutex = PTHREAD_MUTEX_INITIALIZER;
cond = PTHREAD_COND_INITIALIZER;
}
void send(Task t) {
pthread_mutex_lock(&mutex);
chan.push(t);
pthread_cond_signal(&cond); // for waiting receives
pthread_mutex_unlock(&mutex);
return;
}
Task receive() {
pthread_mutex_lock(&mutex);
while(chan.empty()) {
pthread_cond_wait(&cond, &mutex);
}
Task t = chan.front();
chan.pop();
pthread_mutex_unlock(&mutex);
return(t);
}
};
#include
#include
#include
#include
using namespace std;
#include "Channel.hpp"
typedef enum {EOS, TASK} Task_t;
class Task {
private:
float x;
Task_t tag;
public:
Task(float x, Task_t tag):x(x),tag(tag) {}
Task(Task_t tag):tag(tag) {}
bool isEos() {
return(tag == EOS);
}
float get() {
return(x);
}
void set(float y) {
x = y;
return;
}
};
typedef struct __iochans {
Channel *in;
Channel *out;
} IO_Channels;
int ntasks = 0;
void * source(IO_Channels * chans) {
// produce as many tasks as required
for(int i=0; iout)->send(t);
#ifdef TRACEMSG
cout << "Source: output task " << i << endl;
#endif
}
cout << "Source: sending EOS" << endl;
(chans->out)->send(*(new Task(EOS)));
#ifdef TRACEMSG
cout << "Source: EOS Sent\n terminating ... " << endl;
#endif
return(NULL);
}
void * square(IO_Channels * chans) {
Task t = (chans->in)->receive();
while(!t.isEos()) {
#ifdef TRACEMSG
cout << "Stage1 received task " << t.get() << endl;
#endif
float x = t.get();
t.set(x*x);
(chans->out)->send(t);
t = (chans->in)->receive();
}
#ifdef TRACEMSG
cout << "Square: propagating EOS" << endl;
#endif
(chans->out)->send(t); //eos
return(NULL);
}
void * drain(IO_Channels * chans) {
float sum = 0.0;
int taskno = 0;
Task t = (chans->in)->receive();
while(!t.isEos()) {
#ifdef TRACEMSG
cout << "Drain received task " << t.get() << endl;
#endif
sum += t.get();
taskno++;
t = (chans->in)->receive();
}
#ifdef TRACEMSG
cout << "Drain: got EOS, terminating" << endl;
#endif
cout << "Drain computed " << sum << " from " << taskno << " tasks " << endl;
return(NULL);
}
int main(int argc, char * argv[]) {
// usage: a.out ntasks
Channel ch12, ch23;
IO_Channels stage1, stage2, stage3;
pthread_t tid1, tid2, tid3;
stage1.in = NULL; stage1.out = &ch12;
stage2.in = &ch12; stage2.out = &ch23;
stage3.in = &ch23; stage3.out = NULL;
ntasks = atoi(argv[1]);
// start third stage
if(pthread_create(&tid3, NULL, (void *(*)(void *))drain,
(void *) &stage3) != 0) {
cout << "Error while creating stage3 " << endl;
return(-1);
}
cout << "Drain started" << endl;
// start second stage
if(pthread_create(&tid2, NULL, (void *(*)(void *))square,
(void *) &stage2) != 0) {
cout << "Error while creating stage2 " << endl;
return(-1);
}
cout << "Square started!" << endl;
// start first stage
if(pthread_create(&tid1, NULL, (void *(*)(void *))source,
(void *) &stage1) != 0) {
cout << "Error while creating stage1 " << endl;
return(-1);
}
cout << "Source started " << endl;
void * retval;
pthread_join(tid3, &retval);
return(0);
}
#include
#include
template class Channel {
private:
std::queue chan;
pthread_mutex_t mutex;
pthread_cond_t cond_r, cond_w;
int max;
int msgno;
public:
Channel() {
mutex = PTHREAD_MUTEX_INITIALIZER;
cond_r = PTHREAD_COND_INITIALIZER;
cond_w = PTHREAD_COND_INITIALIZER;
msgno = 0;
max = 4;
}
Channel(int bound):max(bound) {
mutex = PTHREAD_MUTEX_INITIALIZER;
cond_r = PTHREAD_COND_INITIALIZER;
cond_w = PTHREAD_COND_INITIALIZER;
msgno = 0;
}
void send(Task t) {
pthread_mutex_lock(&mutex);
while(msgno == max)
pthread_cond_wait(&cond_w, &mutex);
chan.push(t);
msgno++;
pthread_cond_signal(&cond_r); // for waiting receives
pthread_mutex_unlock(&mutex);
return;
}
Task receive() {
pthread_mutex_lock(&mutex);
while(chan.empty()) {
pthread_cond_wait(&cond_r, &mutex);
}
Task t = chan.front();
chan.pop();
msgno--;
pthread_cond_signal(&cond_w);
pthread_mutex_unlock(&mutex);
return(t);
}
};
#include
#include
#include
#include
using namespace std;
#include "BChannel.hpp"
typedef enum {EOS, TASK} Task_t;
class Task {
private:
float x;
Task_t tag;
public:
Task(float x, Task_t tag):x(x),tag(tag) {}
Task(Task_t tag):tag(tag) {}
bool isEos() {
return(tag == EOS);
}
float get() {
return(x);
}
void set(float y) {
x = y;
return;
}
};
typedef struct __iochans {
Channel *in;
Channel *out;
} IO_Channels;
int ntasks = 0;
void * source(IO_Channels * chans) {
// produce as many tasks as required
for(int i=0; iout)->send(t);
#ifdef TRACEMSG
cout << "Source: output task " << i << endl;
#endif
}
cout << "Source: sending EOS" << endl;
(chans->out)->send(*(new Task(EOS)));
#ifdef TRACEMSG
cout << "Source: EOS Sent\n terminating ... " << endl;
#endif
return(NULL);
}
void * square(IO_Channels * chans) {
Task t = (chans->in)->receive();
while(!t.isEos()) {
#ifdef TRACEMSG
cout << "Stage1 received task " << t.get() << endl;
#endif
float x = t.get();
t.set(x*x);
(chans->out)->send(t);
t = (chans->in)->receive();
}
#ifdef TRACEMSG
cout << "Square: propagating EOS" << endl;
#endif
(chans->out)->send(t); //eos
return(NULL);
}
void * drain(IO_Channels * chans) {
float sum = 0.0;
int taskno = 0;
Task t = (chans->in)->receive();
while(!t.isEos()) {
#ifdef TRACEMSG
cout << "Drain received task " << t.get() << endl;
#endif
sum += t.get();
taskno++;
t = (chans->in)->receive();
}
#ifdef TRACEMSG
cout << "Drain: got EOS, terminating" << endl;
#endif
cout << "Drain computed " << sum << " from " << taskno << " tasks " << endl;
return(NULL);
}
int main(int argc, char * argv[]) {
// usage: a.out ntasks
Channel ch12, ch23;
IO_Channels stage1, stage2, stage3;
pthread_t tid1, tid2, tid3;
stage1.in = NULL; stage1.out = &ch12;
stage2.in = &ch12; stage2.out = &ch23;
stage3.in = &ch23; stage3.out = NULL;
ntasks = atoi(argv[1]);
// start third stage
if(pthread_create(&tid3, NULL, (void *(*)(void *))drain,
(void *) &stage3) != 0) {
cout << "Error while creating stage3 " << endl;
return(-1);
}
cout << "Drain started" << endl;
// start second stage
if(pthread_create(&tid2, NULL, (void *(*)(void *))square,
(void *) &stage2) != 0) {
cout << "Error while creating stage2 " << endl;
return(-1);
}
cout << "Square started!" << endl;
// start first stage
if(pthread_create(&tid1, NULL, (void *(*)(void *))source,
(void *) &stage1) != 0) {
cout << "Error while creating stage1 " << endl;
return(-1);
}
cout << "Source started " << endl;
void * retval;
pthread_join(tid3, &retval);
return(0);
}
CFLAGS = -DTRACEMSG -pthread -std=c++11
CC = g++
all: pipe bpipe
pipe: pipe.cpp Channel.hpp
$(CC) $(CFLAGS) pipe.cpp -o pipe
bpipe: bpipe.cpp BChannel.hpp
$(CC) $(CFLAGS) bpipe.cpp -o bpipe
clean:
rm -f pipe bpipe a.out *~