s
This commit is contained in:
commit
b9476e9db1
8 changed files with 342 additions and 12 deletions
|
@ -13,6 +13,7 @@ void BoundedBuffer::push(string item){
|
||||||
}
|
}
|
||||||
|
|
||||||
string BoundedBuffer::pop(){
|
string BoundedBuffer::pop(){
|
||||||
|
<<<<<<< HEAD
|
||||||
if(data.size() > 0){
|
if(data.size() > 0){
|
||||||
full->P();
|
full->P();
|
||||||
mutex->P();
|
mutex->P();
|
||||||
|
@ -25,5 +26,14 @@ string BoundedBuffer::pop(){
|
||||||
return "quit";
|
return "quit";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
=======
|
||||||
|
full->P();
|
||||||
|
mutex->P();
|
||||||
|
string item = data.back();
|
||||||
|
data.pop_back();
|
||||||
|
mutex->V();
|
||||||
|
empty->V();
|
||||||
|
return item;
|
||||||
|
>>>>>>> 05aaecdb17812b161db2168b80a19fc2c783800b
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,18 @@ public:
|
||||||
empty = new Semaphore(b);
|
empty = new Semaphore(b);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< HEAD
|
||||||
void set_empty(int e){empty = new Semaphore(e);}
|
void set_empty(int e){empty = new Semaphore(e);}
|
||||||
|
=======
|
||||||
|
void set_empty(int e){
|
||||||
|
delete empty;
|
||||||
|
empty = new Semaphore(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool is_empty() {
|
||||||
|
return (data.size() == 0);
|
||||||
|
}
|
||||||
|
>>>>>>> 05aaecdb17812b161db2168b80a19fc2c783800b
|
||||||
|
|
||||||
void push(string item);
|
void push(string item);
|
||||||
|
|
||||||
|
|
7
dataserver.cpp
Executable file → Normal file
7
dataserver.cpp
Executable file → Normal file
|
@ -152,13 +152,12 @@ void handle_process_loop(RequestChannel & _channel) {
|
||||||
|
|
||||||
for(;;) {
|
for(;;) {
|
||||||
|
|
||||||
cout << "Reading next request from channel (" << _channel.name() << ") ..." << flush;
|
//cout << "Reading next request from channel (" << _channel.name() << ") ..." << flush;
|
||||||
string request = _channel.cread();
|
string request = _channel.cread();
|
||||||
cout << " done (" << _channel.name() << ")." << endl;
|
//cout << " done (" << _channel.name() << ")." << endl;
|
||||||
cout << "New request is " << request << endl;
|
//cout << "New request is " << request << endl;
|
||||||
|
|
||||||
if (request.compare("quit") == 0) {
|
if (request.compare("quit") == 0) {
|
||||||
cout << "\nquit\n";
|
|
||||||
_channel.cwrite("bye");
|
_channel.cwrite("bye");
|
||||||
usleep(10000); // give the other end a bit of time.
|
usleep(10000); // give the other end a bit of time.
|
||||||
break; // break out of the loop;
|
break; // break out of the loop;
|
||||||
|
|
10
makefile
Executable file → Normal file
10
makefile
Executable file → Normal file
|
@ -1,15 +1,15 @@
|
||||||
# makefile
|
# makefile
|
||||||
|
|
||||||
all: dataserver simpleclient
|
all: dataserver client
|
||||||
|
|
||||||
reqchannel.o: reqchannel.h reqchannel.cpp
|
reqchannel.o: reqchannel.h reqchannel.cpp
|
||||||
g++ -std=c++11 -c -g reqchannel.cpp
|
g++ -std=c++11 -lpthread -c -g reqchannel.cpp
|
||||||
|
|
||||||
dataserver: dataserver.cpp reqchannel.o
|
dataserver: dataserver.cpp reqchannel.o
|
||||||
g++ -std=c++11 -g -o dataserver dataserver.cpp reqchannel.o -lpthread
|
g++ -std=c++11 -lpthread -g -o dataserver dataserver.cpp reqchannel.o
|
||||||
|
|
||||||
simpleclient: simpleclient.cpp reqchannel.o BoundedBuffer.cpp
|
client: simpleclient.cpp reqchannel.o BoundedBuffer.cpp
|
||||||
g++ -std=c++11 -g -o simpleclient simpleclient.cpp reqchannel.o BoundedBuffer.cpp
|
g++ -std=c++11 -lpthread -g -o client simpleclient.cpp reqchannel.o BoundedBuffer.cpp
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
$(RM) *.o
|
$(RM) *.o
|
||||||
|
|
4
reqchannel.cpp
Executable file → Normal file
4
reqchannel.cpp
Executable file → Normal file
|
@ -139,11 +139,11 @@ RequestChannel::RequestChannel(const string _name, const Side _side) : my_name(_
|
||||||
}
|
}
|
||||||
|
|
||||||
RequestChannel::~RequestChannel() {
|
RequestChannel::~RequestChannel() {
|
||||||
cout << "close requests channel " << my_name << endl;
|
//cout << "close requests channel " << my_name << endl;
|
||||||
close(wfd);
|
close(wfd);
|
||||||
close(rfd);
|
close(rfd);
|
||||||
if (my_side == SERVER_SIDE) {
|
if (my_side == SERVER_SIDE) {
|
||||||
cout << "close IPC mechanisms on server side for channel " << my_name << endl;
|
//cout << "close IPC mechanisms on server side for channel " << my_name << endl;
|
||||||
/* Destruct the underlying IPC mechanisms. */
|
/* Destruct the underlying IPC mechanisms. */
|
||||||
if (remove(pipe_name(READ_MODE)) != 0) {
|
if (remove(pipe_name(READ_MODE)) != 0) {
|
||||||
perror(string("Request Channel (" + my_name + ") : Error deleting pipe for reading").c_str());
|
perror(string("Request Channel (" + my_name + ") : Error deleting pipe for reading").c_str());
|
||||||
|
|
0
reqchannel.h
Executable file → Normal file
0
reqchannel.h
Executable file → Normal file
0
semaphore.h
Executable file → Normal file
0
semaphore.h
Executable file → Normal file
310
simpleclient.cpp
Normal file
310
simpleclient.cpp
Normal file
|
@ -0,0 +1,310 @@
|
||||||
|
/*
|
||||||
|
File: simpleclient.C
|
||||||
|
|
||||||
|
Author: R. Bettati
|
||||||
|
Department of Computer Science
|
||||||
|
Texas A&M University
|
||||||
|
Date : 2013/01/31
|
||||||
|
|
||||||
|
Simple client main program for MP3 in CSCE 313
|
||||||
|
*/
|
||||||
|
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
/* DEFINES */
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
|
||||||
|
/* -- (none) -- */
|
||||||
|
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
/* INCLUDES */
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
|
||||||
|
#include <cassert>
|
||||||
|
#include <cstring>
|
||||||
|
#include <iostream>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
#include <sys/time.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
#include <errno.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include "reqchannel.h"
|
||||||
|
#include "semaphore.h"
|
||||||
|
#include "BoundedBuffer.h"
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
/* DATA STRUCTURES */
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
|
||||||
|
/* none */
|
||||||
|
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
/* CONSTANTS */
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
|
||||||
|
|
||||||
|
BoundedBuffer Request_Buffer;
|
||||||
|
BoundedBuffer Response_Buffer[3];
|
||||||
|
BoundedBuffer RB1;
|
||||||
|
|
||||||
|
struct stat_struct {
|
||||||
|
vector<int>* statt;
|
||||||
|
int tid;
|
||||||
|
};
|
||||||
|
|
||||||
|
int n = 1000, b = 100, w = 5, c = 0;//set defaults
|
||||||
|
|
||||||
|
Semaphore mutex(1);
|
||||||
|
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
/* FORWARDS */
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
|
||||||
|
void* Req_Thread(void* args);
|
||||||
|
void* Worker_Thread(void* args);
|
||||||
|
void* Stat_Thread(void* args);
|
||||||
|
long get_time_diff(struct timeval *start, struct timeval *end);
|
||||||
|
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
/* MAIN FUNCTION */
|
||||||
|
/*--------------------------------------------------------------------------*/
|
||||||
|
|
||||||
|
int main(int argc, char * argv[]) {
|
||||||
|
|
||||||
|
int option = -1;
|
||||||
|
|
||||||
|
while ((option = getopt(argc, argv, "n:b:w:")) != -1){
|
||||||
|
switch (option){
|
||||||
|
case 'n' :
|
||||||
|
n = atoi(optarg);
|
||||||
|
break;
|
||||||
|
case 'b' :
|
||||||
|
b = atoi(optarg);
|
||||||
|
break;
|
||||||
|
case 'w' :
|
||||||
|
w = atoi(optarg);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//cout << "n: " << n << "b: " << b << "w: " << w << "\n\n";
|
||||||
|
pid_t pid = fork();
|
||||||
|
|
||||||
|
if(pid == 0){
|
||||||
|
|
||||||
|
struct timeval tp_start;
|
||||||
|
struct timeval tp_end;
|
||||||
|
long time_diff;
|
||||||
|
assert(gettimeofday(&tp_start, 0) == 0);
|
||||||
|
|
||||||
|
Request_Buffer.set_empty(b);
|
||||||
|
Response_Buffer[0].set_empty(n);
|
||||||
|
Response_Buffer[1].set_empty(n);
|
||||||
|
Response_Buffer[2].set_empty(n);
|
||||||
|
|
||||||
|
vector<int> John_vec(100);
|
||||||
|
stat_struct John_struct;
|
||||||
|
John_struct.statt = &John_vec;
|
||||||
|
John_struct.tid = 0;
|
||||||
|
vector<int> Jane_vec(100);
|
||||||
|
stat_struct Jane_struct;
|
||||||
|
Jane_struct.statt = &Jane_vec;
|
||||||
|
Jane_struct.tid = 1;
|
||||||
|
vector<int> Joe_vec(100);
|
||||||
|
stat_struct Joe_struct;
|
||||||
|
Joe_struct.statt = &Joe_vec;
|
||||||
|
Joe_struct.tid = 2;
|
||||||
|
|
||||||
|
RequestChannel chan("control", RequestChannel::CLIENT_SIDE);
|
||||||
|
|
||||||
|
pthread_t request_thread_ids[3];
|
||||||
|
pthread_t worker_thread_ids[w];
|
||||||
|
pthread_t statistic_thread_ids[3];
|
||||||
|
RequestChannel* worker_return[w];
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
//---------------------------------------------
|
||||||
|
//------- pthread_create -------
|
||||||
|
//---------------------------------------------
|
||||||
|
int index[3];
|
||||||
|
for(int i=0; i < 3; i++){
|
||||||
|
index[i] = i;
|
||||||
|
pthread_create(&request_thread_ids[i], NULL, Req_Thread, (void *)&index[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i=0; i < w; i++){
|
||||||
|
string channel_name = chan.send_request("newthread");
|
||||||
|
worker_return[i] = new RequestChannel(channel_name, RequestChannel::CLIENT_SIDE);
|
||||||
|
pthread_create(&worker_thread_ids[i], NULL, Worker_Thread, worker_return[i]);
|
||||||
|
}
|
||||||
|
//
|
||||||
|
for(int i=0; i < 3; i++){
|
||||||
|
if(i == 0) {
|
||||||
|
pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&John_struct);
|
||||||
|
}
|
||||||
|
else if(i == 1) {
|
||||||
|
pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&Jane_struct);
|
||||||
|
}
|
||||||
|
else if(i == 2) {
|
||||||
|
pthread_create(&statistic_thread_ids[i], NULL, Stat_Thread, (void *)&Joe_struct);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//
|
||||||
|
|
||||||
|
//---------------------------------------------
|
||||||
|
//-------- pthread_join --------
|
||||||
|
//---------------------------------------------
|
||||||
|
|
||||||
|
for(int i=0; i < 3; i++){
|
||||||
|
pthread_join(request_thread_ids[i], NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i=0; i < w; i++){
|
||||||
|
pthread_join(worker_thread_ids[i], NULL);
|
||||||
|
}
|
||||||
|
//
|
||||||
|
for(int i=0; i < 3; i++){
|
||||||
|
pthread_join(statistic_thread_ids[i], NULL);
|
||||||
|
}
|
||||||
|
//
|
||||||
|
|
||||||
|
assert(gettimeofday(&tp_end, 0) == 0);
|
||||||
|
|
||||||
|
for(int i=0; i < w; i++){
|
||||||
|
worker_return[i]->send_request("quit");
|
||||||
|
}
|
||||||
|
|
||||||
|
cout << "\n\n JOHN HISTOGRAM\n";
|
||||||
|
//int ct = 0;
|
||||||
|
for(int i=0; i < 100; i++){
|
||||||
|
//
|
||||||
|
cout << "Num[" << i << "]: ";
|
||||||
|
for(int j=0; j < John_vec[i]; j++) {
|
||||||
|
cout << "*";
|
||||||
|
}
|
||||||
|
//
|
||||||
|
//cout << John_vec[i];
|
||||||
|
cout << "\n";
|
||||||
|
}
|
||||||
|
cout << "\n\n JANE HISTOGRAM\n";
|
||||||
|
//int ct = 0;
|
||||||
|
for(int i=0; i < 100; i++){
|
||||||
|
//
|
||||||
|
cout << "Num[" << i << "]: ";
|
||||||
|
for(int j=0; j < Jane_vec[i]; j++) {
|
||||||
|
cout << "*";
|
||||||
|
}
|
||||||
|
//
|
||||||
|
//cout << Jane_vec[i];
|
||||||
|
cout << "\n";
|
||||||
|
}
|
||||||
|
cout << "\n\n JOE HISTOGRAM\n";
|
||||||
|
//int ct = 0;
|
||||||
|
for(int i=0; i < 100; i++){
|
||||||
|
//
|
||||||
|
cout << "Num[" << i << "]: ";
|
||||||
|
for(int j=0; j < Joe_vec[i]; j++) {
|
||||||
|
cout << "*";
|
||||||
|
}
|
||||||
|
//
|
||||||
|
//cout << Joe_vec[i];
|
||||||
|
cout << "\n";
|
||||||
|
}
|
||||||
|
//STILL NEED TO OUTPUT THE HISTOGRAM SOMEHOW....
|
||||||
|
|
||||||
|
chan.send_request("quit");
|
||||||
|
|
||||||
|
time_diff = get_time_diff(&tp_start, &tp_end);
|
||||||
|
cout << "time_diff: " << time_diff << "musec, " << (time_diff/1000000) << "sec" << endl;
|
||||||
|
}else{
|
||||||
|
execl("dataserver", 0);
|
||||||
|
}
|
||||||
|
usleep(1000000);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
void* Worker_Thread(void* arg){
|
||||||
|
RequestChannel* chann = (RequestChannel*)arg;
|
||||||
|
|
||||||
|
string request;
|
||||||
|
while(true){
|
||||||
|
mutex.P();
|
||||||
|
if(!Request_Buffer.is_empty()) {
|
||||||
|
request = Request_Buffer.pop();
|
||||||
|
c++;
|
||||||
|
}
|
||||||
|
else if(c < (3*n)){
|
||||||
|
mutex.V();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
mutex.V();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
mutex.V();
|
||||||
|
string response = chann->send_request(request);
|
||||||
|
//cout << "\nc: " << c << "\n\n";
|
||||||
|
if(request == "data Joe Smith"){
|
||||||
|
Response_Buffer[0].push(response);
|
||||||
|
}else if(request == "data Jane Smith"){
|
||||||
|
Response_Buffer[1].push(response);
|
||||||
|
}else if(request == "data John Doe"){
|
||||||
|
Response_Buffer[2].push(response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void* Req_Thread(void* arg){
|
||||||
|
int person = *(int*)arg;
|
||||||
|
//cout << "PERSON: " << person << endl;
|
||||||
|
switch(person){
|
||||||
|
case 0:
|
||||||
|
for(int i=0; i < n; i++){ Request_Buffer.push("data Joe Smith"); }
|
||||||
|
break;
|
||||||
|
case 1:
|
||||||
|
for(int i=0; i < n; i++){ Request_Buffer.push("data Jane Smith"); }
|
||||||
|
break;
|
||||||
|
case 2:
|
||||||
|
for(int i=0; i < n; i++){ Request_Buffer.push("data John Doe"); }
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void* Stat_Thread(void* arg){
|
||||||
|
//send to proper response buffer based on which name is found!
|
||||||
|
stat_struct &sts = *(stat_struct*)arg;
|
||||||
|
vector<int> *stat_vec = sts.statt;
|
||||||
|
int person = sts.tid;
|
||||||
|
|
||||||
|
string num;
|
||||||
|
for(int i=0; i < n; i++){
|
||||||
|
num = Response_Buffer[person].pop();
|
||||||
|
//cout << "\nPERSON0: " << num << endl;
|
||||||
|
int v = atoi(num.c_str());
|
||||||
|
(*stat_vec)[v] = (*stat_vec)[v] + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
long get_time_diff(struct timeval * tp1, struct timeval * tp2) {
|
||||||
|
/* Prints to stdout the difference, in seconds and museconds, between two
|
||||||
|
timevals. */
|
||||||
|
|
||||||
|
//long sec = tp2->tv_sec - tp1->tv_sec;
|
||||||
|
long musec = tp2->tv_usec - tp1->tv_usec;
|
||||||
|
if (musec < 0) {
|
||||||
|
musec += 1000000;
|
||||||
|
}
|
||||||
|
return musec;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Reference in a new issue