diff --git a/BoundedBuffer.cpp b/BoundedBuffer.cpp index 1b191c1..236e76f 100755 --- a/BoundedBuffer.cpp +++ b/BoundedBuffer.cpp @@ -5,7 +5,6 @@ #include void BoundedBuffer::push(string item){ - empty->P(); mutex->P(); data.push_back(item); @@ -14,12 +13,17 @@ void BoundedBuffer::push(string item){ } string BoundedBuffer::pop(){ - string item = data.back(); - full->P(); - mutex->P(); - data.pop_back(); - mutex->V(); - empty->V(); - return item; + if(data.size() > 0){ + full->P(); + mutex->P(); + string item = data.back(); + data.pop_back(); + mutex->V(); + empty->V(); + return item; + }else{ + return "quit"; + } + } \ No newline at end of file diff --git a/BoundedBuffer.h b/BoundedBuffer.h index 7c8a06c..855b21e 100755 --- a/BoundedBuffer.h +++ b/BoundedBuffer.h @@ -12,8 +12,6 @@ using namespace std; //extern unsigned int n, b, w; class BoundedBuffer{ - - int b_val; Semaphore *full = new Semaphore(0); //initialized to 0, Since there are 0 full slots Semaphore *empty;// = new Semaphore(b); //initialized to b, Since all the slots are empty @@ -21,18 +19,14 @@ class BoundedBuffer{ vector data; - public: - BoundedBuffer(){empty = new Semaphore(100);}//default b value + BoundedBuffer(){empty = new Semaphore(200);}//default b value BoundedBuffer(int b){ - b_val = b; - empty = new Semaphore(b_val); + empty = new Semaphore(b); } - int get_val(){return b_val;} - - void set_b(int b){ b_val = b;} + void set_empty(int e){empty = new Semaphore(e);} void push(string item); diff --git a/simpleclient.cpp b/simpleclient.cpp index 0e9a59a..3a39e7d 100755 --- a/simpleclient.cpp +++ b/simpleclient.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -39,24 +40,30 @@ using namespace std; /* DATA STRUCTURES */ /*--------------------------------------------------------------------------*/ - /* -- (none) -- */ + /* none */ /*--------------------------------------------------------------------------*/ /* CONSTANTS */ /*--------------------------------------------------------------------------*/ -//RequestChannel* control; -BoundedBuffer* Request_Buffer; -BoundedBuffer* Response_Buffers[3]; + +BoundedBuffer Request_Buffer; +BoundedBuffer Response_Buffer[3]; +BoundedBuffer RB1; + +vector John_vec(100); +vector Jane_vec(100); +vector Joe_vec(100); + int n = 1000, b = 100, w = 5;//set defaults /*--------------------------------------------------------------------------*/ /* FORWARDS */ /*--------------------------------------------------------------------------*/ -void* Req_Thread(string name); -void* Worker_Thread(char* arg[], RequestChannel* chan); -void* local_send_request(string name, string response); +void* Req_Thread(void* args); +void* Worker_Thread(void* args); +void* Stat_Thread(void* args); /*--------------------------------------------------------------------------*/ /* MAIN FUNCTION */ @@ -73,7 +80,6 @@ int main(int argc, char * argv[]) { break; case 'b' : b = atoi(optarg); - Request_Buffer = new BoundedBuffer(b); break; case 'w' : w = atoi(optarg); @@ -83,107 +89,139 @@ int main(int argc, char * argv[]) { pid_t pid = fork(); - if(pid == 0){ + if(pid != 0){ + + Request_Buffer.set_empty(b); + Response_Buffer[0].set_empty(b); + Response_Buffer[1].set_empty(b); + Response_Buffer[2].set_empty(b); + + - cout << "CLIENT STARTED:" << endl; - cout << "Establishing control channel... " << flush; RequestChannel chan("control", RequestChannel::CLIENT_SIDE); - cout << "done." << endl; - - for(int i = 0; i < 3; i++){ - pid = fork(); - if(pid){ //if in the parent still - if(i == 0){ - Req_Thread(" Joe Smith"); - }else if(i == 1){ - Req_Thread(" Jane Smith"); - }else if(i == 2){ - Req_Thread(" John Doe"); - } - continue; - }else if (pid == 0){ - break; - } else { - exit(1); - } + + pthread_t request_thread_ids[3]; + pthread_t worker_thread_ids[w]; + pthread_t statistic_thread_ids[3]; + RequestChannel* worker_return[w]; + + + + + //--------------------------------------------- + //------- pthread_create ------- + //--------------------------------------------- + int index[3]; + for(int i=0; i < 3; i++){ + index[i] = i; + pthread_create(&request_thread_ids[i], NULL, Req_Thread, (void *)&index[i]); } - - - - RequestChannel* worker_ret; - string channel_name; - - if(pid){ - RequestChannel chan("control", RequestChannel::CLIENT_SIDE); - cout << "done." << endl; - for(int i=0; i < w; i++){ - pid = fork(); - if(pid) { - pid = fork(); - channel_name = chan.send_request("newthread"); - cout << "Chan: " << channel_name << endl; - worker_ret = new RequestChannel(channel_name, RequestChannel::CLIENT_SIDE); - Worker_Thread(argv, worker_ret); - continue; - }else if(pid == 0){ - break; - }else{ - exit(1); - } - } - wait(); - //chan.send_request("quit"); - } else { - //this is where the child threads from the - //req_thread threads end up. + for(int i=0; i < w; i++){ + string channel_name = chan.send_request("newthread"); + worker_return[i] = new RequestChannel(channel_name, RequestChannel::CLIENT_SIDE); + pthread_create(&worker_thread_ids[i], NULL, Worker_Thread, worker_return[i]); + } + + for(int i=0; i < 3; i++){ + pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&i); + } + + //--------------------------------------------- + //-------- pthread_join -------- + //--------------------------------------------- + + for(int i=0; i < 3; i++){ + pthread_join(request_thread_ids[i], NULL); } + for(int i=0; i < w; i++){ + pthread_join(worker_thread_ids[i], NULL); + } for(int i=0; i < 3; i++){ - //stat thread - - //pid = fork(); - if(pid != 0){ - - break; - } - } + pthread_join(statistic_thread_ids[i], NULL); + } + for(int i=0; i < w; i++){ + worker_return[i]->send_request("quit"); + } + + cout << "\n\n HISTOGRAM \n"; + int ct = 0; + for(int i=0; i < 100; i++){ + cout << "Num: " << i << "\tTimes: " << John_vec[i] << endl; + + } + //STILL NEED TO OUTPUT THE HISTOGRAM SOMEHOW.... + + chan.send_request("quit"); }else{ - execve("dataserver", argv, argv); + execl("dataserver", 0); } usleep(1000000); + } -void* Worker_Thread(char* arg[], RequestChannel* chan){ +void* Worker_Thread(void* arg){ + RequestChannel* chann = (RequestChannel*)arg; while(true){ - string req = Request_Buffer->pop(); - cout << "Request: " << req << endl; - string response = chan->send_request(req); - cout << "Response: " << response << endl; - //local_send_request(req, response); - } - //chan->send_request("quit"); -} - -void* Req_Thread(string name){ - for(int i = 0; i < n; i++){ - cout << "data " << name << endl; - Request_Buffer->push("data" + name); + string request = Request_Buffer.pop(); + string response = chann->send_request(request); + if(request == "data Joe Smith"){ + Response_Buffer[0].push(response); + }else if(request == "data Jane Smith"){ + Response_Buffer[1].push(response); + }else if(request == "data John Doe"){ + Response_Buffer[2].push(response); + }else if(request == "quit"){ //Break out of the thread if quit response is found + break; + } } } -void* local_send_request(string name, string response){ + +void* Req_Thread(void* arg){ + int person = *(int*)arg; + cout << "PERSON: " << person << endl; + switch(person){ + case 0: + for(int i=0; i < n; i++){ Request_Buffer.push("data Joe Smith"); cout << "JOE";} + break; + case 1: + for(int i=0; i < n; i++){ Request_Buffer.push("data Jane Smith"); cout << "JANE";} + break; + case 2: + for(int i=0; i < n; i++){ Request_Buffer.push("data John Doe"); cout << "JOHN";} + break; + } +} + +void* Stat_Thread(void* arg){ //send to proper response buffer based on which name is found! - if(name.compare("data Joe Smith") == 0){ - Response_Buffers[0]->push(response); - }else if(name.compare("data Jane Smith") == 0){ - Response_Buffers[1]->push(response); - }else if(name.compare("data John Doe") == 0){ - Response_Buffers[2]->push(response); - }else{ - cout << "ERROR: Invalid push into Response Buffers\n"; + int person = *(int*)arg; + + string num; + if(person == 0){ + for(int i=0; i < n; i++){ + num = Response_Buffer[0].pop(); + int v = atoi(num.c_str()); + John_vec[v] = John_vec[v] + 1; + } + }else if(person == 1){ + for(int i=0; i < n; i++){ + num = Response_Buffer[1].pop(); + cout << "\nPERSON1: " << num << endl; + int v = atoi(num.c_str()); + + Jane_vec[v] = Jane_vec[v] + 1; + } + }else if(person == 2){ + for(int i=0; i < n; i++){ + num = Response_Buffer[2].pop(); + int v = atoi(num.c_str()); + John_vec[v] = John_vec[v] + 1; + } } }