Almost Working

This commit is contained in:
Eric Buxkemper 2015-11-11 04:56:41 -06:00
parent 7f3859db68
commit 8b75f52d44
3 changed files with 143 additions and 107 deletions

View file

@ -5,7 +5,6 @@
#include <vector> #include <vector>
void BoundedBuffer::push(string item){ void BoundedBuffer::push(string item){
empty->P(); empty->P();
mutex->P(); mutex->P();
data.push_back(item); data.push_back(item);
@ -14,12 +13,17 @@ void BoundedBuffer::push(string item){
} }
string BoundedBuffer::pop(){ string BoundedBuffer::pop(){
string item = data.back(); if(data.size() > 0){
full->P(); full->P();
mutex->P(); mutex->P();
string item = data.back();
data.pop_back(); data.pop_back();
mutex->V(); mutex->V();
empty->V(); empty->V();
return item; return item;
}else{
return "quit";
}
} }

View file

@ -13,26 +13,20 @@ using namespace std;
class BoundedBuffer{ class BoundedBuffer{
int b_val;
Semaphore *full = new Semaphore(0); //initialized to 0, Since there are 0 full slots Semaphore *full = new Semaphore(0); //initialized to 0, Since there are 0 full slots
Semaphore *empty;// = new Semaphore(b); //initialized to b, Since all the slots are empty Semaphore *empty;// = new Semaphore(b); //initialized to b, Since all the slots are empty
Semaphore *mutex = new Semaphore(1); Semaphore *mutex = new Semaphore(1);
vector<string> data; vector<string> data;
public: public:
BoundedBuffer(){empty = new Semaphore(100);}//default b value BoundedBuffer(){empty = new Semaphore(200);}//default b value
BoundedBuffer(int b){ BoundedBuffer(int b){
b_val = b; empty = new Semaphore(b);
empty = new Semaphore(b_val);
} }
int get_val(){return b_val;} void set_empty(int e){empty = new Semaphore(e);}
void set_b(int b){ b_val = b;}
void push(string item); void push(string item);

View file

@ -24,6 +24,7 @@
#include <iostream> #include <iostream>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <pthread.h>
#include <errno.h> #include <errno.h>
#include <unistd.h> #include <unistd.h>
@ -39,24 +40,30 @@ using namespace std;
/* DATA STRUCTURES */ /* DATA STRUCTURES */
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
/* -- (none) -- */ /* none */
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
/* CONSTANTS */ /* CONSTANTS */
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
//RequestChannel* control;
BoundedBuffer* Request_Buffer; BoundedBuffer Request_Buffer;
BoundedBuffer* Response_Buffers[3]; BoundedBuffer Response_Buffer[3];
BoundedBuffer RB1;
vector<int> John_vec(100);
vector<int> Jane_vec(100);
vector<int> Joe_vec(100);
int n = 1000, b = 100, w = 5;//set defaults int n = 1000, b = 100, w = 5;//set defaults
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
/* FORWARDS */ /* FORWARDS */
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
void* Req_Thread(string name); void* Req_Thread(void* args);
void* Worker_Thread(char* arg[], RequestChannel* chan); void* Worker_Thread(void* args);
void* local_send_request(string name, string response); void* Stat_Thread(void* args);
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
/* MAIN FUNCTION */ /* MAIN FUNCTION */
@ -73,7 +80,6 @@ int main(int argc, char * argv[]) {
break; break;
case 'b' : case 'b' :
b = atoi(optarg); b = atoi(optarg);
Request_Buffer = new BoundedBuffer(b);
break; break;
case 'w' : case 'w' :
w = atoi(optarg); w = atoi(optarg);
@ -83,107 +89,139 @@ int main(int argc, char * argv[]) {
pid_t pid = fork(); pid_t pid = fork();
if(pid == 0){
cout << "CLIENT STARTED:" << endl;
cout << "Establishing control channel... " << flush;
RequestChannel chan("control", RequestChannel::CLIENT_SIDE);
cout << "done." << endl;
for(int i = 0; i < 3; i++){
pid = fork();
if(pid){ //if in the parent still
if(i == 0){
Req_Thread(" Joe Smith");
}else if(i == 1){
Req_Thread(" Jane Smith");
}else if(i == 2){
Req_Thread(" John Doe");
}
continue;
}else if (pid == 0){
break;
} else {
exit(1);
}
}
RequestChannel* worker_ret;
string channel_name;
if(pid){
RequestChannel chan("control", RequestChannel::CLIENT_SIDE);
cout << "done." << endl;
for(int i=0; i < w; i++){
pid = fork();
if(pid) {
pid = fork();
channel_name = chan.send_request("newthread");
cout << "Chan: " << channel_name << endl;
worker_ret = new RequestChannel(channel_name, RequestChannel::CLIENT_SIDE);
Worker_Thread(argv, worker_ret);
continue;
}else if(pid == 0){
break;
}else{
exit(1);
}
}
wait();
//chan.send_request("quit");
} else {
//this is where the child threads from the
//req_thread threads end up.
}
for(int i=0; i < 3; i++){
//stat thread
//pid = fork();
if(pid != 0){ if(pid != 0){
Request_Buffer.set_empty(b);
Response_Buffer[0].set_empty(b);
Response_Buffer[1].set_empty(b);
Response_Buffer[2].set_empty(b);
RequestChannel chan("control", RequestChannel::CLIENT_SIDE);
pthread_t request_thread_ids[3];
pthread_t worker_thread_ids[w];
pthread_t statistic_thread_ids[3];
RequestChannel* worker_return[w];
//---------------------------------------------
//------- pthread_create -------
//---------------------------------------------
int index[3];
for(int i=0; i < 3; i++){
index[i] = i;
pthread_create(&request_thread_ids[i], NULL, Req_Thread, (void *)&index[i]);
}
for(int i=0; i < w; i++){
string channel_name = chan.send_request("newthread");
worker_return[i] = new RequestChannel(channel_name, RequestChannel::CLIENT_SIDE);
pthread_create(&worker_thread_ids[i], NULL, Worker_Thread, worker_return[i]);
}
for(int i=0; i < 3; i++){
pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&i);
}
//---------------------------------------------
//-------- pthread_join --------
//---------------------------------------------
for(int i=0; i < 3; i++){
pthread_join(request_thread_ids[i], NULL);
}
for(int i=0; i < w; i++){
pthread_join(worker_thread_ids[i], NULL);
}
for(int i=0; i < 3; i++){
pthread_join(statistic_thread_ids[i], NULL);
}
for(int i=0; i < w; i++){
worker_return[i]->send_request("quit");
}
cout << "\n\n HISTOGRAM \n";
int ct = 0;
for(int i=0; i < 100; i++){
cout << "Num: " << i << "\tTimes: " << John_vec[i] << endl;
}
//STILL NEED TO OUTPUT THE HISTOGRAM SOMEHOW....
chan.send_request("quit");
}else{
execl("dataserver", 0);
}
usleep(1000000);
}
void* Worker_Thread(void* arg){
RequestChannel* chann = (RequestChannel*)arg;
while(true){
string request = Request_Buffer.pop();
string response = chann->send_request(request);
if(request == "data Joe Smith"){
Response_Buffer[0].push(response);
}else if(request == "data Jane Smith"){
Response_Buffer[1].push(response);
}else if(request == "data John Doe"){
Response_Buffer[2].push(response);
}else if(request == "quit"){ //Break out of the thread if quit response is found
break;
}
}
}
void* Req_Thread(void* arg){
int person = *(int*)arg;
cout << "PERSON: " << person << endl;
switch(person){
case 0:
for(int i=0; i < n; i++){ Request_Buffer.push("data Joe Smith"); cout << "JOE";}
break;
case 1:
for(int i=0; i < n; i++){ Request_Buffer.push("data Jane Smith"); cout << "JANE";}
break;
case 2:
for(int i=0; i < n; i++){ Request_Buffer.push("data John Doe"); cout << "JOHN";}
break; break;
} }
} }
}else{ void* Stat_Thread(void* arg){
execve("dataserver", argv, argv);
}
usleep(1000000);
}
void* Worker_Thread(char* arg[], RequestChannel* chan){
while(true){
string req = Request_Buffer->pop();
cout << "Request: " << req << endl;
string response = chan->send_request(req);
cout << "Response: " << response << endl;
//local_send_request(req, response);
}
//chan->send_request("quit");
}
void* Req_Thread(string name){
for(int i = 0; i < n; i++){
cout << "data " << name << endl;
Request_Buffer->push("data" + name);
}
}
void* local_send_request(string name, string response){
//send to proper response buffer based on which name is found! //send to proper response buffer based on which name is found!
if(name.compare("data Joe Smith") == 0){ int person = *(int*)arg;
Response_Buffers[0]->push(response);
}else if(name.compare("data Jane Smith") == 0){ string num;
Response_Buffers[1]->push(response); if(person == 0){
}else if(name.compare("data John Doe") == 0){ for(int i=0; i < n; i++){
Response_Buffers[2]->push(response); num = Response_Buffer[0].pop();
}else{ int v = atoi(num.c_str());
cout << "ERROR: Invalid push into Response Buffers\n"; John_vec[v] = John_vec[v] + 1;
}
}else if(person == 1){
for(int i=0; i < n; i++){
num = Response_Buffer[1].pop();
cout << "\nPERSON1: " << num << endl;
int v = atoi(num.c_str());
Jane_vec[v] = Jane_vec[v] + 1;
}
}else if(person == 2){
for(int i=0; i < n; i++){
num = Response_Buffer[2].pop();
int v = atoi(num.c_str());
John_vec[v] = John_vec[v] + 1;
}
} }
} }