This commit is contained in:
Eric Buxkemper 2015-11-10 15:47:13 -06:00
parent a89c689ccb
commit 7f3859db68
3 changed files with 77 additions and 45 deletions

View file

@ -7,15 +7,18 @@
void BoundedBuffer::push(string item){ void BoundedBuffer::push(string item){
empty->P(); empty->P();
mutex->P();
data.push_back(item); data.push_back(item);
mutex->V();
full->V();//increment the number of full slots now. full->V();//increment the number of full slots now.
} }
string BoundedBuffer::pop(){ string BoundedBuffer::pop(){
string item = data.back(); string item = data.back();
full->P(); full->P();
mutex->P();
data.pop_back(); data.pop_back();
mutex->V();
empty->V(); empty->V();
return item; return item;
} }

View file

@ -21,7 +21,7 @@ class BoundedBuffer{
vector<string> data; vector<string> data;
//BoundedBuffer(
public: public:
BoundedBuffer(){empty = new Semaphore(100);}//default b value BoundedBuffer(){empty = new Semaphore(100);}//default b value

View file

@ -56,6 +56,7 @@ int n = 1000, b = 100, w = 5;//set defaults
void* Req_Thread(string name); void* Req_Thread(string name);
void* Worker_Thread(char* arg[], RequestChannel* chan); void* Worker_Thread(char* arg[], RequestChannel* chan);
void* local_send_request(string name, string response);
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
/* MAIN FUNCTION */ /* MAIN FUNCTION */
@ -82,64 +83,75 @@ int main(int argc, char * argv[]) {
pid_t pid = fork(); pid_t pid = fork();
if(pid != 0){ if(pid == 0){
execve("dataserver", argv, argv);
}else{
cout << "CLIENT STARTED:" << endl; cout << "CLIENT STARTED:" << endl;
cout << "Establishing control channel... " << flush; cout << "Establishing control channel... " << flush;
RequestChannel chan("control", RequestChannel::CLIENT_SIDE); RequestChannel chan("control", RequestChannel::CLIENT_SIDE);
cout << "done." << endl; cout << "done." << endl;
for(int i=0; i < n*3; i++){ for(int i = 0; i < 3; i++){
switch(i%3){ pid = fork();
case 0: if(pid){ //if in the parent still
if(i == 0){
Req_Thread(" Joe Smith"); Req_Thread(" Joe Smith");
cout << "Joe\n"; }else if(i == 1){
break;
case 1:
Req_Thread(" Jane Smith"); Req_Thread(" Jane Smith");
cout << "Jane\n"; }else if(i == 2){
break;
case 2:
Req_Thread(" John Doe"); Req_Thread(" John Doe");
cout << "John\n"; }
continue;
}else if (pid == 0){
break; break;
} else {
exit(1);
} }
} }
// 0 is the parent.
RequestChannel* worker_ret;
string channel_name;
if(pid){
RequestChannel chan("control", RequestChannel::CLIENT_SIDE);
cout << "done." << endl;
for(int i=0; i < w; i++){
pid = fork();
if(pid) {
pid = fork();
channel_name = chan.send_request("newthread");
cout << "Chan: " << channel_name << endl;
worker_ret = new RequestChannel(channel_name, RequestChannel::CLIENT_SIDE);
Worker_Thread(argv, worker_ret);
continue;
}else if(pid == 0){
break;
}else{
exit(1);
}
}
wait();
//chan.send_request("quit");
} else {
//this is where the child threads from the
//req_thread threads end up.
}
for(int i=0; i < 3; i++){ for(int i=0; i < 3; i++){
//stat thread //stat thread
pid = fork(); //pid = fork();
if(pid != 0){ if(pid != 0){
break; break;
} }
} }
RequestChannel* worker_ret;
string channel_name;
if(pid == 0){
for(int i=0; i < w; i++){
channel_name = chan.send_request("newthread");
cout << "Chan: " << channel_name << endl;
worker_ret = new RequestChannel(channel_name, RequestChannel::CLIENT_SIDE);
pid = fork();
if(pid != 0) {
Worker_Thread(argv, worker_ret);
break;
}
}
chan.send_request("quit");
}else{ }else{
execve("dataserver", argv, argv);
}
} }
usleep(1000000); usleep(1000000);
} }
@ -150,14 +162,31 @@ void* Worker_Thread(char* arg[], RequestChannel* chan){
cout << "Request: " << req << endl; cout << "Request: " << req << endl;
string response = chan->send_request(req); string response = chan->send_request(req);
cout << "Response: " << response << endl; cout << "Response: " << response << endl;
//local_send_request(req, response);
} }
chan->send_request("quit"); //chan->send_request("quit");
} }
void* Req_Thread(string name){ void* Req_Thread(string name){
for(int i = 0; i < n; i++){
cout << "data " << name << endl;
Request_Buffer->push("data" + name); Request_Buffer->push("data" + name);
} }
void* local_send_request(){
} }
void* local_send_request(string name, string response){
//send to proper response buffer based on which name is found!
if(name.compare("data Joe Smith") == 0){
Response_Buffers[0]->push(response);
}else if(name.compare("data Jane Smith") == 0){
Response_Buffers[1]->push(response);
}else if(name.compare("data John Doe") == 0){
Response_Buffers[2]->push(response);
}else{
cout << "ERROR: Invalid push into Response Buffers\n";
}
}