diff --git a/BoundedBuffer.cpp b/BoundedBuffer.cpp index 236e76f..1d52b88 100644 --- a/BoundedBuffer.cpp +++ b/BoundedBuffer.cpp @@ -13,6 +13,7 @@ void BoundedBuffer::push(string item){ } string BoundedBuffer::pop(){ +<<<<<<< HEAD if(data.size() > 0){ full->P(); mutex->P(); @@ -25,5 +26,14 @@ string BoundedBuffer::pop(){ return "quit"; } +======= + full->P(); + mutex->P(); + string item = data.back(); + data.pop_back(); + mutex->V(); + empty->V(); + return item; +>>>>>>> 05aaecdb17812b161db2168b80a19fc2c783800b } \ No newline at end of file diff --git a/BoundedBuffer.h b/BoundedBuffer.h index 855b21e..cf70b37 100644 --- a/BoundedBuffer.h +++ b/BoundedBuffer.h @@ -26,7 +26,18 @@ public: empty = new Semaphore(b); } +<<<<<<< HEAD void set_empty(int e){empty = new Semaphore(e);} +======= + void set_empty(int e){ + delete empty; + empty = new Semaphore(e); + } + + bool is_empty() { + return (data.size() == 0); + } +>>>>>>> 05aaecdb17812b161db2168b80a19fc2c783800b void push(string item); @@ -36,4 +47,4 @@ public: //b = user input }; -#endif +#endif diff --git a/dataserver.cpp b/dataserver.cpp old mode 100755 new mode 100644 index 2f91bd8..ace6ec8 --- a/dataserver.cpp +++ b/dataserver.cpp @@ -152,13 +152,12 @@ void handle_process_loop(RequestChannel & _channel) { for(;;) { - cout << "Reading next request from channel (" << _channel.name() << ") ..." << flush; + //cout << "Reading next request from channel (" << _channel.name() << ") ..." << flush; string request = _channel.cread(); - cout << " done (" << _channel.name() << ")." << endl; - cout << "New request is " << request << endl; + //cout << " done (" << _channel.name() << ")." << endl; + //cout << "New request is " << request << endl; if (request.compare("quit") == 0) { - cout << "\nquit\n"; _channel.cwrite("bye"); usleep(10000); // give the other end a bit of time. break; // break out of the loop; diff --git a/makefile b/makefile old mode 100755 new mode 100644 index 106521b..b2832a9 --- a/makefile +++ b/makefile @@ -1,15 +1,15 @@ # makefile -all: dataserver simpleclient +all: dataserver client reqchannel.o: reqchannel.h reqchannel.cpp - g++ -std=c++11 -c -g reqchannel.cpp + g++ -std=c++11 -lpthread -c -g reqchannel.cpp dataserver: dataserver.cpp reqchannel.o - g++ -std=c++11 -g -o dataserver dataserver.cpp reqchannel.o -lpthread + g++ -std=c++11 -lpthread -g -o dataserver dataserver.cpp reqchannel.o -simpleclient: simpleclient.cpp reqchannel.o BoundedBuffer.cpp - g++ -std=c++11 -g -o simpleclient simpleclient.cpp reqchannel.o BoundedBuffer.cpp +client: simpleclient.cpp reqchannel.o BoundedBuffer.cpp + g++ -std=c++11 -lpthread -g -o client simpleclient.cpp reqchannel.o BoundedBuffer.cpp clean: $(RM) *.o diff --git a/reqchannel.cpp b/reqchannel.cpp old mode 100755 new mode 100644 index 1e363ad..c6372b5 --- a/reqchannel.cpp +++ b/reqchannel.cpp @@ -139,11 +139,11 @@ RequestChannel::RequestChannel(const string _name, const Side _side) : my_name(_ } RequestChannel::~RequestChannel() { - cout << "close requests channel " << my_name << endl; + //cout << "close requests channel " << my_name << endl; close(wfd); close(rfd); if (my_side == SERVER_SIDE) { - cout << "close IPC mechanisms on server side for channel " << my_name << endl; + //cout << "close IPC mechanisms on server side for channel " << my_name << endl; /* Destruct the underlying IPC mechanisms. */ if (remove(pipe_name(READ_MODE)) != 0) { perror(string("Request Channel (" + my_name + ") : Error deleting pipe for reading").c_str()); diff --git a/reqchannel.h b/reqchannel.h old mode 100755 new mode 100644 diff --git a/semaphore.h b/semaphore.h old mode 100755 new mode 100644 diff --git a/simpleclient.cpp b/simpleclient.cpp new file mode 100644 index 0000000..70e9ad8 --- /dev/null +++ b/simpleclient.cpp @@ -0,0 +1,310 @@ +/* + File: simpleclient.C + + Author: R. Bettati + Department of Computer Science + Texas A&M University + Date : 2013/01/31 + + Simple client main program for MP3 in CSCE 313 +*/ + +/*--------------------------------------------------------------------------*/ +/* DEFINES */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* INCLUDES */ +/*--------------------------------------------------------------------------*/ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "reqchannel.h" +#include "semaphore.h" +#include "BoundedBuffer.h" + +using namespace std; + + +/*--------------------------------------------------------------------------*/ +/* DATA STRUCTURES */ +/*--------------------------------------------------------------------------*/ + + /* none */ + +/*--------------------------------------------------------------------------*/ +/* CONSTANTS */ +/*--------------------------------------------------------------------------*/ + + +BoundedBuffer Request_Buffer; +BoundedBuffer Response_Buffer[3]; +BoundedBuffer RB1; + +struct stat_struct { + vector* statt; + int tid; +}; + +int n = 1000, b = 100, w = 5, c = 0;//set defaults + +Semaphore mutex(1); + +/*--------------------------------------------------------------------------*/ +/* FORWARDS */ +/*--------------------------------------------------------------------------*/ + +void* Req_Thread(void* args); +void* Worker_Thread(void* args); +void* Stat_Thread(void* args); +long get_time_diff(struct timeval *start, struct timeval *end); + +/*--------------------------------------------------------------------------*/ +/* MAIN FUNCTION */ +/*--------------------------------------------------------------------------*/ + +int main(int argc, char * argv[]) { + + int option = -1; + + while ((option = getopt(argc, argv, "n:b:w:")) != -1){ + switch (option){ + case 'n' : + n = atoi(optarg); + break; + case 'b' : + b = atoi(optarg); + break; + case 'w' : + w = atoi(optarg); + break; + } + } + //cout << "n: " << n << "b: " << b << "w: " << w << "\n\n"; + pid_t pid = fork(); + + if(pid == 0){ + + struct timeval tp_start; + struct timeval tp_end; + long time_diff; + assert(gettimeofday(&tp_start, 0) == 0); + + Request_Buffer.set_empty(b); + Response_Buffer[0].set_empty(n); + Response_Buffer[1].set_empty(n); + Response_Buffer[2].set_empty(n); + + vector John_vec(100); + stat_struct John_struct; + John_struct.statt = &John_vec; + John_struct.tid = 0; + vector Jane_vec(100); + stat_struct Jane_struct; + Jane_struct.statt = &Jane_vec; + Jane_struct.tid = 1; + vector Joe_vec(100); + stat_struct Joe_struct; + Joe_struct.statt = &Joe_vec; + Joe_struct.tid = 2; + + RequestChannel chan("control", RequestChannel::CLIENT_SIDE); + + pthread_t request_thread_ids[3]; + pthread_t worker_thread_ids[w]; + pthread_t statistic_thread_ids[3]; + RequestChannel* worker_return[w]; + + + + + //--------------------------------------------- + //------- pthread_create ------- + //--------------------------------------------- + int index[3]; + for(int i=0; i < 3; i++){ + index[i] = i; + pthread_create(&request_thread_ids[i], NULL, Req_Thread, (void *)&index[i]); + } + + for(int i=0; i < w; i++){ + string channel_name = chan.send_request("newthread"); + worker_return[i] = new RequestChannel(channel_name, RequestChannel::CLIENT_SIDE); + pthread_create(&worker_thread_ids[i], NULL, Worker_Thread, worker_return[i]); + } + // + for(int i=0; i < 3; i++){ + if(i == 0) { + pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&John_struct); + } + else if(i == 1) { + pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&Jane_struct); + } + else if(i == 2) { + pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&Joe_struct); + } + } + // + + //--------------------------------------------- + //-------- pthread_join -------- + //--------------------------------------------- + + for(int i=0; i < 3; i++){ + pthread_join(request_thread_ids[i], NULL); + } + + for(int i=0; i < w; i++){ + pthread_join(worker_thread_ids[i], NULL); + } + // + for(int i=0; i < 3; i++){ + pthread_join(statistic_thread_ids[i], NULL); + } + // + + assert(gettimeofday(&tp_end, 0) == 0); + + for(int i=0; i < w; i++){ + worker_return[i]->send_request("quit"); + } + + cout << "\n\n JOHN HISTOGRAM\n"; + //int ct = 0; + for(int i=0; i < 100; i++){ + // + cout << "Num[" << i << "]: "; + for(int j=0; j < John_vec[i]; j++) { + cout << "*"; + } + // + //cout << John_vec[i]; + cout << "\n"; + } + cout << "\n\n JANE HISTOGRAM\n"; + //int ct = 0; + for(int i=0; i < 100; i++){ + // + cout << "Num[" << i << "]: "; + for(int j=0; j < Jane_vec[i]; j++) { + cout << "*"; + } + // + //cout << Jane_vec[i]; + cout << "\n"; + } + cout << "\n\n JOE HISTOGRAM\n"; + //int ct = 0; + for(int i=0; i < 100; i++){ + // + cout << "Num[" << i << "]: "; + for(int j=0; j < Joe_vec[i]; j++) { + cout << "*"; + } + // + //cout << Joe_vec[i]; + cout << "\n"; + } + //STILL NEED TO OUTPUT THE HISTOGRAM SOMEHOW.... + + chan.send_request("quit"); + + time_diff = get_time_diff(&tp_start, &tp_end); + cout << "time_diff: " << time_diff << "musec, " << (time_diff/1000000) << "sec" << endl; + }else{ + execl("dataserver", 0); + } + usleep(1000000); + +} + +void* Worker_Thread(void* arg){ + RequestChannel* chann = (RequestChannel*)arg; + + string request; + while(true){ + mutex.P(); + if(!Request_Buffer.is_empty()) { + request = Request_Buffer.pop(); + c++; + } + else if(c < (3*n)){ + mutex.V(); + continue; + } + else { + mutex.V(); + break; + } + mutex.V(); + string response = chann->send_request(request); + //cout << "\nc: " << c << "\n\n"; + if(request == "data Joe Smith"){ + Response_Buffer[0].push(response); + }else if(request == "data Jane Smith"){ + Response_Buffer[1].push(response); + }else if(request == "data John Doe"){ + Response_Buffer[2].push(response); + } + } +} + + +void* Req_Thread(void* arg){ + int person = *(int*)arg; + //cout << "PERSON: " << person << endl; + switch(person){ + case 0: + for(int i=0; i < n; i++){ Request_Buffer.push("data Joe Smith"); } + break; + case 1: + for(int i=0; i < n; i++){ Request_Buffer.push("data Jane Smith"); } + break; + case 2: + for(int i=0; i < n; i++){ Request_Buffer.push("data John Doe"); } + break; + } +} + +void* Stat_Thread(void* arg){ + //send to proper response buffer based on which name is found! + stat_struct &sts = *(stat_struct*)arg; + vector *stat_vec = sts.statt; + int person = sts.tid; + + string num; + for(int i=0; i < n; i++){ + num = Response_Buffer[person].pop(); + //cout << "\nPERSON0: " << num << endl; + int v = atoi(num.c_str()); + (*stat_vec)[v] = (*stat_vec)[v] + 1; + } +} + + + +long get_time_diff(struct timeval * tp1, struct timeval * tp2) { + /* Prints to stdout the difference, in seconds and museconds, between two + timevals. */ + + //long sec = tp2->tv_sec - tp1->tv_sec; + long musec = tp2->tv_usec - tp1->tv_usec; + if (musec < 0) { + musec += 1000000; + } + return musec; + +} + +