C version with timings (to be compiled with gcc -pthread -lm)
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #include <math.h> #include <time.h> pthread_mutex_t mutex; typedef struct __task { long n; float x; struct __task * next; } TASK; TASK * tasks = NULL; typedef unsigned long long ticks; static __inline__ ticks getticks(void) { unsigned a, d; asm volatile("rdtsc" : "=a" (a), "=d" (d)); return ((ticks)a) | (((ticks)d) << 32); } void * worker(void * x) { int wno = *((int *) x); int * taskno = (int *) calloc(1, sizeof(int)); *taskno = 0; #ifdef DEBUG printf("Worker no %d created\n", wno); #endif while(1==1) { // get a task from the list pthread_mutex_lock(&mutex); TASK * t = tasks; if(tasks!=NULL) { tasks = tasks->next; } pthread_mutex_unlock(&mutex); // process if(t == NULL) break; // nothing else to process #ifdef DEBUG printf("Thread %ld processing task <%ld,%f>\n",pthread_self(), t->n, t->x); #endif { long l; for(l=0; l<t->n; l++) t->x = sin(t->x); } // write results #ifdef DEBUG printf("Thread %ld computed %f\n",pthread_self(),t->x); #endif (*taskno)++; } return ((void *) taskno); } int main(int argc, char * argv[]) { int opt, nw, m, i ; float ta,tb,elapsed; pthread_t *w; struct timeval t0, t1; clock_t before, after; struct timespec c0,c1,p0,p1; ticks tck0, tck1; while((opt = getopt(argc, argv, "n:m:"))!= (-1)) { switch(opt) { case 'n': { nw = atoi(optarg); break; } case 'm': { m = atoi(optarg); break; } default: { printf("Usage is:\n%s -n pardegree -m streamlenght\n", argv[0]); return(0); } } } #ifdef DEBUG printf("%d workers and %d tasks\n", nw, m); #endif w = (pthread_t *) calloc(nw, sizeof(pthread_t)); if((pthread_mutex_init(&mutex,NULL)!=0)) { printf("Failed to initialize the mutex\n"); return(-1); } for(i=0; i<m; i++) { TASK * t = (TASK *) calloc(1, sizeof(TASK)); t->x = (float)i ; t->n = (long) i*(100000L); pthread_mutex_lock(&mutex); // may be in principle concurrent with task consuming t->next = tasks; tasks = t; pthread_mutex_unlock(&mutex); } before = clock(); clock_gettime(CLOCK_THREAD_CPUTIME_ID,&c0); clock_gettime(CLOCK_PROCESS_CPUTIME_ID,&p0); gettimeofday(&t0,NULL); tck0 = getticks(); #ifdef DEBUG printf("Start %ld %ld \n", t0.tv_sec, t0.tv_usec); #endif for(i=0; i<nw; i++) { // create the workers: they'll start working immediately if(pthread_create(&w[i],NULL, worker, &i)!=0) { printf("Error creating workers\n"); return(-1); } } // workers running: wait termination for(i=0; i<nw; i++) { void * retcode; pthread_join(w[i],&retcode); printf("Worker %d terminated computing %d tasks\n", i, *((int *) retcode)); fflush(stdout); } gettimeofday(&t1,NULL); clock_gettime(CLOCK_THREAD_CPUTIME_ID,&c1); clock_gettime(CLOCK_PROCESS_CPUTIME_ID,&p1); after = clock(); tck1 = getticks(); #ifdef DEBUG printf("Start %ld %ld \n", t1.tv_sec, t1.tv_usec); #endif printf("Elapsed %f msecs\n", (t1.tv_sec-t0.tv_sec)*1000.0 + ((float) (t1.tv_usec - t0.tv_usec))/1000.0); printf("Clock cycles %ld (%f msecs)\n", after-before, ((after-before)/ CLOCKS_PER_SEC)); { float tproc = ((float) (p1.tv_sec - p0.tv_sec))*1000.0 + ((float) (p1.tv_nsec-p0.tv_nsec))/1000000.0; float tthread = ((float) (c1.tv_sec - c0.tv_sec))*1000.0 + ((float) (c1.tv_nsec-c0.tv_nsec))/1000000.0; printf("clock_gettime (per process) %f msecs\n", tproc); printf("clock_gettime (per thread) %f msecs\n", tthread); printf("clock_gettime: average per worker %f\n", (tproc - tthread)/nw); } printf("Ticks %lld\n", tck1-tck0); printf("Workers terminated\n"); return(0); }
C++ version with timings (to be compiled with g++ -std=c++0x -pthread -lm -rt)
#include <iostream> #include <vector> #include <list> #include <stdlib.h> #include <unistd.h> #include <thread> #include <cmath> #include <mutex> #include <sys/time.h> using namespace std; typedef struct __task { float x; long n; } farm_task_t; mutex locche; list<farm_task_t *> tasks; void worker(int no) { int taskno = 0; thread::id mythreadid = this_thread::get_id(); #ifdef DEBUG cout << "Worker No. " << no << " started (on thread " << mythreadid << ")" << endl; #endif while(true) { farm_task_t t; locche.lock(); if(!tasks.empty()) { t = *(tasks.front()); tasks.pop_front(); locche.unlock(); } else { locche.unlock(); break; // TODO } #ifdef DEBUG cout << "Worker no. " << no << " got task " << t.x << " " << t.n << endl; #endif taskno++; for(long i=0; i<t.n; i++) { t.x = sin(t.x); } } #ifdef DEBUG cout << "Worker " << no << " computed " << taskno << " tasks " << endl; #endif return; } int main(int argc, char * argv[]) { int opt, nw, ntasks; struct timeval t0, t1; while((opt = getopt(argc, argv, "n:m:"))!= (-1)) { switch(opt) { case 'n': { nw = atoi(optarg); break; } case 'm': { ntasks = atoi(optarg); break; } default: { printf("Usage is:\n%s -n pardegree -m streamlenght\n", argv[0]); return(0); } } } cout << "Initializing ..." << endl; for(int i=0; i<ntasks; i++) { farm_task_t * t = (farm_task_t *) calloc(1, sizeof(farm_task_t)); t->x = (float) i; t->n = i*100000L; tasks.push_back(t); } cout << "Tasks created ..." << endl; vector<thread> workers; gettimeofday(&t0,NULL); for(int i=0; i<nw; i++) { workers.push_back(thread(bind(worker,i))); } #ifdef DEBUG cout << "Workers created ..." << endl; #endif #ifdef DEBUG cout << "Awaiting worker termination ..." << endl; #endif for(auto &t : workers) { t.join(); } gettimeofday(&t1,NULL); cout << "All workers terminated " << endl; cout << "Elapsed time " << (t1.tv_sec-t0.tv_sec)*1000.0 + ((float) (t1.tv_usec - t0.tv_usec))/1000.0 << " msecs " << endl; return(0); }
Version C++/Phtread with synchronizations wrapped into a new Queue<T> type:
#include <iostream> #include <queue> #include <pthread.h> using namespace std; class Task { private: int n; float * v; public: Task(int n, float * v):n(n),v(v) {} float * get() { return v; } int length() { return n; } }; template <class Task> class Queue { private: std::queue<Task> tasks; pthread_mutex_t lock; public: Queue() { lock = PTHREAD_MUTEX_INITIALIZER; } ~Queue() { // TBD } void send(Task m) { pthread_mutex_lock(&lock); tasks.push(m); pthread_mutex_unlock(&lock); return; } Task receive() { pthread_mutex_lock(&lock); Task t = (Task) tasks.front(); tasks.pop(); pthread_mutex_unlock(&lock); return t; } }; typedef struct __comms { Queue<Task> * in; Queue<Task> * out; } COMMS; Task f(Task x) { return x; } void * body(void * x) { COMMS * q = (COMMS *) x; int * i = new int(); while(true) { Task t = (q->in)->receive(); if(t.length() < 0) break; // EOS Task r = f(t); (*i)++; q->out->send(r); } return ((void *) i); } int main(int argc, char * argv []) { Queue<Task> tasks; Queue<Task> ress; COMMS c; c.in = &tasks; c.out = &ress; int nw = atoi(argv[1]); pthread_t * tid = new pthread_t[nw]; for(int i=0; i<nw; i++) { int ret = pthread_create(&tid[i], NULL, body, (void *) &c); if(ret != 0) { // TBD cerr << "error creating thread " << i << endl; } } return(0); }