diff --git a/BoundedBuffer.h b/BoundedBuffer.h new file mode 100755 index 0000000..225328c --- /dev/null +++ b/BoundedBuffer.h @@ -0,0 +1,8 @@ + + +class BoundBuffer{ + Semaphore full(n); + Semaphore empty(n); + Semaphore mutex(n); +} + diff --git a/dataserver.cpp b/dataserver.cpp new file mode 100755 index 0000000..df2a709 --- /dev/null +++ b/dataserver.cpp @@ -0,0 +1,184 @@ +/* + File: dataserver.C + + Author: R. Bettati + Department of Computer Science + Texas A&M University + Date : 2012/07/16 + + Dataserver main program for MP3 in CSCE 313 +*/ + +/*--------------------------------------------------------------------------*/ +/* DEFINES */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* INCLUDES */ +/*--------------------------------------------------------------------------*/ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "reqchannel.h" + +using namespace std; + +/*--------------------------------------------------------------------------*/ +/* DATA STRUCTURES */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* CONSTANTS */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* VARIABLES */ +/*--------------------------------------------------------------------------*/ + +static int nthreads = 0; + +/*--------------------------------------------------------------------------*/ +/* FORWARDS */ +/*--------------------------------------------------------------------------*/ + +void handle_process_loop(RequestChannel & _channel); + +/*--------------------------------------------------------------------------*/ +/* LOCAL FUNCTIONS -- SUPPORT FUNCTIONS */ +/*--------------------------------------------------------------------------*/ + +string int2string(int number) { + stringstream ss;//create a stringstream + ss << number;//add number to the stream + return ss.str();//return a string with the contents of the stream +} + +/*--------------------------------------------------------------------------*/ +/* LOCAL FUNCTIONS -- THREAD FUNCTIONS */ +/*--------------------------------------------------------------------------*/ + +void * handle_data_requests(void * args) { + + RequestChannel * data_channel = (RequestChannel*)args; + + // -- Handle client requests on this channel. + + handle_process_loop(*data_channel); + + // -- Client has quit. We remove channel. + + delete data_channel; +} + +/*--------------------------------------------------------------------------*/ +/* LOCAL FUNCTIONS -- INDIVIDUAL REQUESTS */ +/*--------------------------------------------------------------------------*/ + +void process_hello(RequestChannel & _channel, const string & _request) { + _channel.cwrite("hello to you too"); +} + +void process_data(RequestChannel & _channel, const string & _request) { + usleep(1000 + (rand() % 5000)); + //_channel.cwrite("here comes data about " + _request.substr(4) + ": " + int2string(random() % 100)); + _channel.cwrite(int2string(rand() % 100)); +} + +void process_newthread(RequestChannel & _channel, const string & _request) { + int error; + nthreads ++; + + // -- Name new data channel + + string new_channel_name = "data" + int2string(nthreads) + "_"; + // cout << "new channel name = " << new_channel_name << endl; + + // -- Pass new channel name back to client + + _channel.cwrite(new_channel_name); + + // -- Construct new data channel (pointer to be passed to thread function) + + RequestChannel * data_channel = new RequestChannel(new_channel_name, RequestChannel::SERVER_SIDE); + + // -- Create new thread to handle request channel + + pthread_t thread_id; + // cout << "starting new thread " << nthreads << endl; + if (error = pthread_create(& thread_id, NULL, handle_data_requests, data_channel)) { + fprintf(stderr, "p_create failed: %s\n", strerror(error)); + } + +} + +/*--------------------------------------------------------------------------*/ +/* LOCAL FUNCTIONS -- THE PROCESS REQUEST LOOP */ +/*--------------------------------------------------------------------------*/ + +void process_request(RequestChannel & _channel, const string & _request) { + + if (_request.compare(0, 5, "hello") == 0) { + process_hello(_channel, _request); + } + else if (_request.compare(0, 4, "data") == 0) { + process_data(_channel, _request); + } + else if (_request.compare(0, 9, "newthread") == 0) { + process_newthread(_channel, _request); + } + else { + _channel.cwrite("unknown request"); + } + +} + +void handle_process_loop(RequestChannel & _channel) { + + for(;;) { + + cout << "Reading next request from channel (" << _channel.name() << ") ..." << flush; + string request = _channel.cread(); + cout << " done (" << _channel.name() << ")." << endl; + cout << "New request is " << request << endl; + + if (request.compare("quit") == 0) { + _channel.cwrite("bye"); + usleep(10000); // give the other end a bit of time. + break; // break out of the loop; + } + + process_request(_channel, request); + } + +} + +/*--------------------------------------------------------------------------*/ +/* MAIN FUNCTION */ +/*--------------------------------------------------------------------------*/ + +int main(int argc, char * argv[]) { + + // cout << "Establishing control channel... " << flush; + RequestChannel control_channel("control", RequestChannel::SERVER_SIDE); + // cout << "done.\n" << flush; + + handle_process_loop(control_channel); + +} + diff --git a/reqchannel.cpp b/reqchannel.cpp new file mode 100755 index 0000000..1e363ad --- /dev/null +++ b/reqchannel.cpp @@ -0,0 +1,225 @@ +/* + File: requestchannel.C + + Author: R. Bettati + Department of Computer Science + Texas A&M University + Date : 2012/07/11 + +*/ + +/*--------------------------------------------------------------------------*/ +/* DEFINES */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* INCLUDES */ +/*--------------------------------------------------------------------------*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "reqchannel.h" + +using namespace std; + +/*--------------------------------------------------------------------------*/ +/* DATA STRUCTURES */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* CONSTANTS */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* FORWARDS */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* PRIVATE METHODS FOR CLASS R e q u e s t C h a n n e l */ +/*--------------------------------------------------------------------------*/ + +char * RequestChannel::pipe_name(Mode _mode) { + string pname = "fifo_" + my_name; + + if (my_side == CLIENT_SIDE) { + if (_mode == READ_MODE) + pname += "1"; + else + pname += "2"; + } else { + /* SERVER_SIDE */ + if (_mode == READ_MODE) + pname += "2"; + else + pname += "1"; + } + char * result = new char[pname.size()+1]; + strncpy(result, pname.c_str(), pname.size()+1); + return result; +} + +void RequestChannel::open_write_pipe(char * _pipe_name) { + + // cout << "mkfifo write pipe\n" << flush; + + if (mkfifo(_pipe_name, 0600) < 0) { + if (errno != EEXIST) { + perror("Error creating pipe for writing; exit program"); + exit(1); + } + } + + // cout << "open write pipe\n" << flush; + + wfd = open(_pipe_name, O_WRONLY); + if (wfd < 0) { + perror("Error opening pipe for writing; exit program"); + exit(1); + } + + // cout << "done opening write pipe\n" << flush; + +} + +void RequestChannel::open_read_pipe(char * _pipe_name) { + + // cout << "mkfifo read pipe\n" << flush; + + if (mkfifo(_pipe_name, 0600) < 0) { + if (errno != EEXIST) { + perror("Error creating pipe for writing; exit program"); + exit(1); + } + } + + // cout << "open read pipe\n" << flush; + + rfd = open(_pipe_name, O_RDONLY); + if (rfd < 0) { + perror("Error opening pipe for reading; exit program"); + exit(1); + } + + // cout << "done opening read pipe\n" << flush; + +} + +/*--------------------------------------------------------------------------*/ +/* CONSTRUCTOR/DESTRUCTOR FOR CLASS R e q u e s t C h a n n e l */ +/*--------------------------------------------------------------------------*/ + +RequestChannel::RequestChannel(const string _name, const Side _side) : my_name(_name), my_side(_side) { + + if (_side == SERVER_SIDE) { + open_write_pipe(pipe_name(WRITE_MODE)); + open_read_pipe(pipe_name(READ_MODE)); + } else { + open_read_pipe(pipe_name(READ_MODE)); + open_write_pipe(pipe_name(WRITE_MODE)); + } + +} + +RequestChannel::~RequestChannel() { + cout << "close requests channel " << my_name << endl; + close(wfd); + close(rfd); + if (my_side == SERVER_SIDE) { + cout << "close IPC mechanisms on server side for channel " << my_name << endl; + /* Destruct the underlying IPC mechanisms. */ + if (remove(pipe_name(READ_MODE)) != 0) { + perror(string("Request Channel (" + my_name + ") : Error deleting pipe for reading").c_str()); + } + + if (remove(pipe_name(WRITE_MODE)) != 0) { + perror(string("Request Channel (" + my_name + ") : Error deleting pipe for writing").c_str()); + } + } +} + +/*--------------------------------------------------------------------------*/ +/* READ/WRITE FROM/TO REQUEST CHANNELS */ +/*--------------------------------------------------------------------------*/ + +const int MAX_MESSAGE = 255; + +string RequestChannel::send_request(string _request) { + cwrite(_request); + string s = cread(); + return s; +} + +string RequestChannel::cread() { + + char buf[MAX_MESSAGE]; + + if (read(rfd, buf, MAX_MESSAGE) < 0) { + perror(string("Request Channel (" + my_name + "): Error reading from pipe!").c_str()); + } + + string s = buf; + + // cout << "Request Channel (" << my_name << ") reads [" << buf << "]\n"; + + return s; + +} + +int RequestChannel::cwrite(string _msg) { + + if (_msg.length() >= MAX_MESSAGE) { + cerr << "Message too long for Channel!\n"; + return -1; + } + + // cout << "Request Channel (" << my_name << ") writing [" << _msg << "]"; + + const char * s = _msg.c_str(); + + if (write(wfd, s, strlen(s)+1) < 0) { + perror(string("Request Channel (" + my_name + ") : Error writing to pipe!").c_str()); + } + + // cout << "(" << my_name << ") done writing." << endl; +} + +/*--------------------------------------------------------------------------*/ +/* ACCESS THE NAME OF REQUEST CHANNEL */ +/*--------------------------------------------------------------------------*/ + +string RequestChannel::name() { + return my_name; +} + +/*--------------------------------------------------------------------------*/ +/* ACCESS FILE DESCRIPTORS OF REQUEST CHANNEL */ +/*--------------------------------------------------------------------------*/ + +int RequestChannel::read_fd() { + return rfd; +} + +int RequestChannel::write_fd() { + return wfd; +} + + + diff --git a/reqchannel.h b/reqchannel.h new file mode 100755 index 0000000..022a8f4 --- /dev/null +++ b/reqchannel.h @@ -0,0 +1,119 @@ +/* + File: reqchannel.H + + Author: R. Bettati + Department of Computer Science + Texas A&M University + Date : 2012/07/11 + +*/ + +#ifndef _reqchannel_H_ // include file only once +#define _reqchannel_H_ + +/*--------------------------------------------------------------------------*/ +/* DEFINES */ +/*--------------------------------------------------------------------------*/ + +/* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* INCLUDES */ +/*--------------------------------------------------------------------------*/ + +#include +#include + +#include + +using namespace std; + +/*--------------------------------------------------------------------------*/ +/* DATA STRUCTURES */ +/*--------------------------------------------------------------------------*/ + +/* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* FORWARDS */ +/*--------------------------------------------------------------------------*/ + +/* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* CLASS R e q u e s t C h a n n e l */ +/*--------------------------------------------------------------------------*/ + +class RequestChannel { + +public: + + typedef enum {SERVER_SIDE, CLIENT_SIDE} Side; + + typedef enum {READ_MODE, WRITE_MODE} Mode; + +private: + + string my_name; + + Side my_side; + + /* The current implementation uses named pipes. */ + + int wfd; + int rfd; + + char * pipe_name(Mode _mode); + void open_read_pipe(char * _pipe_name); + void open_write_pipe(char * _pipe_name); + +public: + + /* -- CONSTRUCTOR/DESTRUCTOR */ + + RequestChannel(const string _name, const Side _side); + /* Creates a "local copy" of the channel specified by the given name. + If the channel does not exist, the associated IPC mechanisms are + created. If the channel exists already, this object is associated with the channel. + The channel has two ends, which are conveniently called "SERVER_SIDE" and "CLIENT_SIDE". + If two processes connect through a channel, one has to connect on the server side + and the other on the client side. Otherwise the results are unpredictable. + + NOTE: If the creation of the request channel fails (typically happens when too many + request channels are being created) and error message is displayed, and the program + unceremoniously exits. + + NOTE: It is easy to open too many request channels in parallel. In most systems, + limits on the number of open files per process limit the number of established + request channels to 125. + */ + + ~RequestChannel(); + /* Destructor of the local copy of the bus. By default, the Server Side deletes any IPC + mechanisms associated with the channel. */ + + string send_request(string _request); + /* Send a string over the channel and wait for a reply. */ + + string cread(); + /* Blocking read of data from the channel. Returns a string of characters + read from the channel. Returns NULL if read failed. */ + + int cwrite(string _msg); + /* Write the data to the channel. The function returns the number of characters written + to the channel. */ + + string name(); + /* Returns the name of the request channel. */ + + int read_fd(); + /* Returns the file descriptor used to read from the channel. */ + + int write_fd(); + /* Returns the file descriptor used to write to the channel. */ +}; + + +#endif + + diff --git a/semaphore.h b/semaphore.h new file mode 100755 index 0000000..255f7df --- /dev/null +++ b/semaphore.h @@ -0,0 +1,80 @@ +/* + File: semaphore.H + + Author: R. Bettati + Department of Computer Science + Texas A&M University + Date : 08/02/11 + +*/ + +#ifndef _semaphore_H_ // include file only once +#define _semaphore_H_ + +/*--------------------------------------------------------------------------*/ +/* DEFINES */ +/*--------------------------------------------------------------------------*/ + +/* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* INCLUDES */ +/*--------------------------------------------------------------------------*/ + +#include +#include + +/*--------------------------------------------------------------------------*/ +/* DATA STRUCTURES */ +/*--------------------------------------------------------------------------*/ + +/* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* FORWARDS */ +/*--------------------------------------------------------------------------*/ + +/* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* CLASS S e m a p h o r e */ +/*--------------------------------------------------------------------------*/ + +class Semaphore { +private: + /* -- INTERNAL DATA STRUCTURES + You may need to change them to fit your implementation. */ + + int value; + pthread_mutex_t m; + pthread_cond_t c; + +public: + + /* -- CONSTRUCTOR/DESTRUCTOR */ + + Semaphore(int _val){value = _val;} + + ~Semaphore(){} + + /* -- SEMAPHORE OPERATIONS */ + + int P(){ + Lock(); + + + } + //; + int V(); +}; + +/* +class BoundBuffer{ + Semaphore full(n); + Semaphore empty(n); + Semaphore mutex(n); +}; +*/ +#endif + + diff --git a/simpleclient.cpp b/simpleclient.cpp new file mode 100755 index 0000000..6188ac0 --- /dev/null +++ b/simpleclient.cpp @@ -0,0 +1,100 @@ +/* + File: simpleclient.C + + Author: R. Bettati + Department of Computer Science + Texas A&M University + Date : 2013/01/31 + + Simple client main program for MP3 in CSCE 313 +*/ + +/*--------------------------------------------------------------------------*/ +/* DEFINES */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* INCLUDES */ +/*--------------------------------------------------------------------------*/ + +#include +#include +#include +#include +#include + +#include +#include + +#include "reqchannel.h" +#include "semaphore.h" + +using namespace std; + +/*--------------------------------------------------------------------------*/ +/* DATA STRUCTURES */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* CONSTANTS */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* FORWARDS */ +/*--------------------------------------------------------------------------*/ + + /* -- (none) -- */ + +/*--------------------------------------------------------------------------*/ +/* MAIN FUNCTION */ +/*--------------------------------------------------------------------------*/ + +int main(int argc, char * argv[]) { + + + + Semaphore sema(5); + cout << "CLIENT STARTED:" << endl; + + pid_t pid = fork(); + + if(pid == 0){ + cout << "Establishing control channel... " << flush; + RequestChannel chan("control", RequestChannel::CLIENT_SIDE); + cout << "done." << endl; + + /* -- Start sending a sequence of requests */ + + string reply1 = chan.send_request("hello"); + cout << "Reply to request 'hello' is '" << reply1 << "'" << endl; + + string reply2 = chan.send_request("data Joe Smith"); + cout << "Reply to request 'data Joe Smith' is '" << reply2 << "'" << endl; + + string reply3 = chan.send_request("data Jane Smith"); + cout << "Reply to request 'data Jane Smith' is '" << reply3 << "'" << endl; + + string reply5 = chan.send_request("newthread"); + cout << "Reply to request 'newthread' is " << reply5 << "'" << endl; + RequestChannel chan2(reply5, RequestChannel::CLIENT_SIDE); + + string reply6 = chan2.send_request("data John Doe"); + cout << "Reply to request 'data John Doe' is '" << reply6 << "'" << endl; + + string reply7 = chan2.send_request("quit"); + cout << "Reply to request 'quit' is '" << reply7 << "'" << endl; + + string reply4 = chan.send_request("quit"); + cout << "Reply to request 'quit' is '" << reply4 << "'" << endl; + }else{ + execve("dataserver", argv, argv); + } + + usleep(1000000); +}