This commit is contained in:
shadow8t4 2015-11-13 10:02:10 -06:00
parent 0030b63b19
commit 05aaecdb17
6 changed files with 150 additions and 68 deletions

View file

@ -13,7 +13,6 @@ void BoundedBuffer::push(string item){
} }
string BoundedBuffer::pop(){ string BoundedBuffer::pop(){
if(data.size() > 0){
full->P(); full->P();
mutex->P(); mutex->P();
string item = data.back(); string item = data.back();
@ -21,9 +20,5 @@ string BoundedBuffer::pop(){
mutex->V(); mutex->V();
empty->V(); empty->V();
return item; return item;
}else{
return "quit";
}
} }

View file

@ -26,7 +26,14 @@ public:
empty = new Semaphore(b); empty = new Semaphore(b);
} }
void set_empty(int e){empty = new Semaphore(e);} void set_empty(int e){
delete empty;
empty = new Semaphore(e);
}
bool is_empty() {
return (data.size() == 0);
}
void push(string item); void push(string item);

View file

@ -152,10 +152,10 @@ void handle_process_loop(RequestChannel & _channel) {
for(;;) { for(;;) {
cout << "Reading next request from channel (" << _channel.name() << ") ..." << flush; //cout << "Reading next request from channel (" << _channel.name() << ") ..." << flush;
string request = _channel.cread(); string request = _channel.cread();
cout << " done (" << _channel.name() << ")." << endl; //cout << " done (" << _channel.name() << ")." << endl;
cout << "New request is " << request << endl; //cout << "New request is " << request << endl;
if (request.compare("quit") == 0) { if (request.compare("quit") == 0) {
_channel.cwrite("bye"); _channel.cwrite("bye");

View file

@ -1,6 +1,6 @@
# makefile # makefile
all: dataserver simpleclient all: dataserver client
reqchannel.o: reqchannel.h reqchannel.cpp reqchannel.o: reqchannel.h reqchannel.cpp
g++ -std=c++11 -lpthread -c -g reqchannel.cpp g++ -std=c++11 -lpthread -c -g reqchannel.cpp
@ -8,8 +8,8 @@ reqchannel.o: reqchannel.h reqchannel.cpp
dataserver: dataserver.cpp reqchannel.o dataserver: dataserver.cpp reqchannel.o
g++ -std=c++11 -lpthread -g -o dataserver dataserver.cpp reqchannel.o g++ -std=c++11 -lpthread -g -o dataserver dataserver.cpp reqchannel.o
simpleclient: simpleclient.cpp reqchannel.o BoundedBuffer.cpp client: simpleclient.cpp reqchannel.o BoundedBuffer.cpp
g++ -std=c++11 -lpthread -g -o simpleclient simpleclient.cpp reqchannel.o BoundedBuffer.cpp g++ -std=c++11 -lpthread -g -o client simpleclient.cpp reqchannel.o BoundedBuffer.cpp
clean: clean:
$(RM) *.o $(RM) *.o

View file

@ -139,11 +139,11 @@ RequestChannel::RequestChannel(const string _name, const Side _side) : my_name(_
} }
RequestChannel::~RequestChannel() { RequestChannel::~RequestChannel() {
cout << "close requests channel " << my_name << endl; //cout << "close requests channel " << my_name << endl;
close(wfd); close(wfd);
close(rfd); close(rfd);
if (my_side == SERVER_SIDE) { if (my_side == SERVER_SIDE) {
cout << "close IPC mechanisms on server side for channel " << my_name << endl; //cout << "close IPC mechanisms on server side for channel " << my_name << endl;
/* Destruct the underlying IPC mechanisms. */ /* Destruct the underlying IPC mechanisms. */
if (remove(pipe_name(READ_MODE)) != 0) { if (remove(pipe_name(READ_MODE)) != 0) {
perror(string("Request Channel (" + my_name + ") : Error deleting pipe for reading").c_str()); perror(string("Request Channel (" + my_name + ") : Error deleting pipe for reading").c_str());

View file

@ -24,6 +24,7 @@
#include <iostream> #include <iostream>
#include <sys/types.h> #include <sys/types.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/time.h>
#include <pthread.h> #include <pthread.h>
#include <errno.h> #include <errno.h>
@ -51,11 +52,14 @@ BoundedBuffer Request_Buffer;
BoundedBuffer Response_Buffer[3]; BoundedBuffer Response_Buffer[3];
BoundedBuffer RB1; BoundedBuffer RB1;
vector<int> John_vec(100); struct stat_struct {
vector<int> Jane_vec(100); vector<int>* statt;
vector<int> Joe_vec(100); int tid;
};
int n = 1000, b = 100, w = 5;//set defaults int n = 1000, b = 100, w = 5, c = 0;//set defaults
Semaphore mutex(1);
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
/* FORWARDS */ /* FORWARDS */
@ -64,6 +68,7 @@ int n = 1000, b = 100, w = 5;//set defaults
void* Req_Thread(void* args); void* Req_Thread(void* args);
void* Worker_Thread(void* args); void* Worker_Thread(void* args);
void* Stat_Thread(void* args); void* Stat_Thread(void* args);
long get_time_diff(struct timeval *start, struct timeval *end);
/*--------------------------------------------------------------------------*/ /*--------------------------------------------------------------------------*/
/* MAIN FUNCTION */ /* MAIN FUNCTION */
@ -86,17 +91,33 @@ int main(int argc, char * argv[]) {
break; break;
} }
} }
//cout << "n: " << n << "b: " << b << "w: " << w << "\n\n";
pid_t pid = fork(); pid_t pid = fork();
if(pid != 0){ if(pid == 0){
struct timeval tp_start;
struct timeval tp_end;
long time_diff;
assert(gettimeofday(&tp_start, 0) == 0);
Request_Buffer.set_empty(b); Request_Buffer.set_empty(b);
Response_Buffer[0].set_empty(b); Response_Buffer[0].set_empty(n);
Response_Buffer[1].set_empty(b); Response_Buffer[1].set_empty(n);
Response_Buffer[2].set_empty(b); Response_Buffer[2].set_empty(n);
vector<int> John_vec(100);
stat_struct John_struct;
John_struct.statt = &John_vec;
John_struct.tid = 0;
vector<int> Jane_vec(100);
stat_struct Jane_struct;
Jane_struct.statt = &Jane_vec;
Jane_struct.tid = 1;
vector<int> Joe_vec(100);
stat_struct Joe_struct;
Joe_struct.statt = &Joe_vec;
Joe_struct.tid = 2;
RequestChannel chan("control", RequestChannel::CLIENT_SIDE); RequestChannel chan("control", RequestChannel::CLIENT_SIDE);
@ -122,10 +143,19 @@ int main(int argc, char * argv[]) {
worker_return[i] = new RequestChannel(channel_name, RequestChannel::CLIENT_SIDE); worker_return[i] = new RequestChannel(channel_name, RequestChannel::CLIENT_SIDE);
pthread_create(&worker_thread_ids[i], NULL, Worker_Thread, worker_return[i]); pthread_create(&worker_thread_ids[i], NULL, Worker_Thread, worker_return[i]);
} }
//
for(int i=0; i < 3; i++){ for(int i=0; i < 3; i++){
pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&i); if(i == 0) {
pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&John_struct);
} }
else if(i == 1) {
pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&Jane_struct);
}
else if(i == 2) {
pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&Joe_struct);
}
}
//
//--------------------------------------------- //---------------------------------------------
//-------- pthread_join -------- //-------- pthread_join --------
@ -138,24 +168,60 @@ int main(int argc, char * argv[]) {
for(int i=0; i < w; i++){ for(int i=0; i < w; i++){
pthread_join(worker_thread_ids[i], NULL); pthread_join(worker_thread_ids[i], NULL);
} }
//
for(int i=0; i < 3; i++){ for(int i=0; i < 3; i++){
pthread_join(statistic_thread_ids[i], NULL); pthread_join(statistic_thread_ids[i], NULL);
} }
//
assert(gettimeofday(&tp_end, 0) == 0);
for(int i=0; i < w; i++){ for(int i=0; i < w; i++){
worker_return[i]->send_request("quit"); worker_return[i]->send_request("quit");
} }
cout << "\n\n HISTOGRAM \n"; cout << "\n\n JOHN HISTOGRAM\n";
int ct = 0; //int ct = 0;
for(int i=0; i < 100; i++){ for(int i=0; i < 100; i++){
cout << "Num: " << i << "\tTimes: " << John_vec[i] << endl; //
cout << "Num[" << i << "]: ";
for(int j=0; j < John_vec[i]; j++) {
cout << "*";
}
//
//cout << John_vec[i];
cout << "\n";
}
cout << "\n\n JANE HISTOGRAM\n";
//int ct = 0;
for(int i=0; i < 100; i++){
//
cout << "Num[" << i << "]: ";
for(int j=0; j < Jane_vec[i]; j++) {
cout << "*";
}
//
//cout << Jane_vec[i];
cout << "\n";
}
cout << "\n\n JOE HISTOGRAM\n";
//int ct = 0;
for(int i=0; i < 100; i++){
//
cout << "Num[" << i << "]: ";
for(int j=0; j < Joe_vec[i]; j++) {
cout << "*";
}
//
//cout << Joe_vec[i];
cout << "\n";
} }
//STILL NEED TO OUTPUT THE HISTOGRAM SOMEHOW.... //STILL NEED TO OUTPUT THE HISTOGRAM SOMEHOW....
chan.send_request("quit"); chan.send_request("quit");
time_diff = get_time_diff(&tp_start, &tp_end);
cout << "time_diff: " << time_diff << "musec, " << (time_diff/1000000) << "sec" << endl;
}else{ }else{
execl("dataserver", 0); execl("dataserver", 0);
} }
@ -165,17 +231,31 @@ int main(int argc, char * argv[]) {
void* Worker_Thread(void* arg){ void* Worker_Thread(void* arg){
RequestChannel* chann = (RequestChannel*)arg; RequestChannel* chann = (RequestChannel*)arg;
string request;
while(true){ while(true){
string request = Request_Buffer.pop(); mutex.P();
if(!Request_Buffer.is_empty()) {
request = Request_Buffer.pop();
c++;
}
else if(c < (3*n)){
mutex.V();
continue;
}
else {
mutex.V();
break;
}
mutex.V();
string response = chann->send_request(request); string response = chann->send_request(request);
//cout << "\nc: " << c << "\n\n";
if(request == "data Joe Smith"){ if(request == "data Joe Smith"){
Response_Buffer[0].push(response); Response_Buffer[0].push(response);
}else if(request == "data Jane Smith"){ }else if(request == "data Jane Smith"){
Response_Buffer[1].push(response); Response_Buffer[1].push(response);
}else if(request == "data John Doe"){ }else if(request == "data John Doe"){
Response_Buffer[2].push(response); Response_Buffer[2].push(response);
}else if(request == "quit"){ //Break out of the thread if quit response is found
break;
} }
} }
} }
@ -183,48 +263,48 @@ void* Worker_Thread(void* arg){
void* Req_Thread(void* arg){ void* Req_Thread(void* arg){
int person = *(int*)arg; int person = *(int*)arg;
cout << "PERSON: " << person << endl; //cout << "PERSON: " << person << endl;
switch(person){ switch(person){
case 0: case 0:
for(int i=0; i < n; i++){ Request_Buffer.push("data Joe Smith"); cout << "JOE";} for(int i=0; i < n; i++){ Request_Buffer.push("data Joe Smith"); }
break; break;
case 1: case 1:
for(int i=0; i < n; i++){ Request_Buffer.push("data Jane Smith"); cout << "JANE";} for(int i=0; i < n; i++){ Request_Buffer.push("data Jane Smith"); }
break; break;
case 2: case 2:
for(int i=0; i < n; i++){ Request_Buffer.push("data John Doe"); cout << "JOHN";} for(int i=0; i < n; i++){ Request_Buffer.push("data John Doe"); }
break; break;
} }
} }
void* Stat_Thread(void* arg){ void* Stat_Thread(void* arg){
//send to proper response buffer based on which name is found! //send to proper response buffer based on which name is found!
int person = *(int*)arg; stat_struct &sts = *(stat_struct*)arg;
vector<int> *stat_vec = sts.statt;
int person = sts.tid;
string num; string num;
if(person == 0){
for(int i=0; i < n; i++){ for(int i=0; i < n; i++){
num = Response_Buffer[0].pop(); num = Response_Buffer[person].pop();
//cout << "\nPERSON0: " << num << endl;
int v = atoi(num.c_str()); int v = atoi(num.c_str());
John_vec[v] = John_vec[v] + 1; (*stat_vec)[v] = (*stat_vec)[v] + 1;
}
}else if(person == 1){
for(int i=0; i < n; i++){
num = Response_Buffer[1].pop();
cout << "\nPERSON1: " << num << endl;
int v = atoi(num.c_str());
Jane_vec[v] = Jane_vec[v] + 1;
}
}else if(person == 2){
for(int i=0; i < n; i++){
num = Response_Buffer[2].pop();
int v = atoi(num.c_str());
John_vec[v] = John_vec[v] + 1;
}
} }
} }
long get_time_diff(struct timeval * tp1, struct timeval * tp2) {
/* Prints to stdout the difference, in seconds and museconds, between two
timevals. */
//long sec = tp2->tv_sec - tp1->tv_sec;
long musec = tp2->tv_usec - tp1->tv_usec;
if (musec < 0) {
musec += 1000000;
}
return musec;
}