This commit is contained in:
Eric Buxkemper 2015-11-05 18:33:21 -06:00
parent 0e6a35f250
commit 0c2059d598
6 changed files with 716 additions and 0 deletions

8
BoundedBuffer.h Executable file
View file

@ -0,0 +1,8 @@
class BoundBuffer{
Semaphore full(n);
Semaphore empty(n);
Semaphore mutex(n);
}

184
dataserver.cpp Executable file
View file

@ -0,0 +1,184 @@
/*
File: dataserver.C
Author: R. Bettati
Department of Computer Science
Texas A&M University
Date : 2012/07/16
Dataserver main program for MP3 in CSCE 313
*/
/*--------------------------------------------------------------------------*/
/* DEFINES */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* INCLUDES */
/*--------------------------------------------------------------------------*/
#include <cassert>
#include <cstring>
#include <sstream>
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <pthread.h>
#include <errno.h>
#include <unistd.h>
#include <stdlib.h>
#include "reqchannel.h"
using namespace std;
/*--------------------------------------------------------------------------*/
/* DATA STRUCTURES */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* CONSTANTS */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* VARIABLES */
/*--------------------------------------------------------------------------*/
static int nthreads = 0;
/*--------------------------------------------------------------------------*/
/* FORWARDS */
/*--------------------------------------------------------------------------*/
void handle_process_loop(RequestChannel & _channel);
/*--------------------------------------------------------------------------*/
/* LOCAL FUNCTIONS -- SUPPORT FUNCTIONS */
/*--------------------------------------------------------------------------*/
string int2string(int number) {
stringstream ss;//create a stringstream
ss << number;//add number to the stream
return ss.str();//return a string with the contents of the stream
}
/*--------------------------------------------------------------------------*/
/* LOCAL FUNCTIONS -- THREAD FUNCTIONS */
/*--------------------------------------------------------------------------*/
void * handle_data_requests(void * args) {
RequestChannel * data_channel = (RequestChannel*)args;
// -- Handle client requests on this channel.
handle_process_loop(*data_channel);
// -- Client has quit. We remove channel.
delete data_channel;
}
/*--------------------------------------------------------------------------*/
/* LOCAL FUNCTIONS -- INDIVIDUAL REQUESTS */
/*--------------------------------------------------------------------------*/
void process_hello(RequestChannel & _channel, const string & _request) {
_channel.cwrite("hello to you too");
}
void process_data(RequestChannel & _channel, const string & _request) {
usleep(1000 + (rand() % 5000));
//_channel.cwrite("here comes data about " + _request.substr(4) + ": " + int2string(random() % 100));
_channel.cwrite(int2string(rand() % 100));
}
void process_newthread(RequestChannel & _channel, const string & _request) {
int error;
nthreads ++;
// -- Name new data channel
string new_channel_name = "data" + int2string(nthreads) + "_";
// cout << "new channel name = " << new_channel_name << endl;
// -- Pass new channel name back to client
_channel.cwrite(new_channel_name);
// -- Construct new data channel (pointer to be passed to thread function)
RequestChannel * data_channel = new RequestChannel(new_channel_name, RequestChannel::SERVER_SIDE);
// -- Create new thread to handle request channel
pthread_t thread_id;
// cout << "starting new thread " << nthreads << endl;
if (error = pthread_create(& thread_id, NULL, handle_data_requests, data_channel)) {
fprintf(stderr, "p_create failed: %s\n", strerror(error));
}
}
/*--------------------------------------------------------------------------*/
/* LOCAL FUNCTIONS -- THE PROCESS REQUEST LOOP */
/*--------------------------------------------------------------------------*/
void process_request(RequestChannel & _channel, const string & _request) {
if (_request.compare(0, 5, "hello") == 0) {
process_hello(_channel, _request);
}
else if (_request.compare(0, 4, "data") == 0) {
process_data(_channel, _request);
}
else if (_request.compare(0, 9, "newthread") == 0) {
process_newthread(_channel, _request);
}
else {
_channel.cwrite("unknown request");
}
}
void handle_process_loop(RequestChannel & _channel) {
for(;;) {
cout << "Reading next request from channel (" << _channel.name() << ") ..." << flush;
string request = _channel.cread();
cout << " done (" << _channel.name() << ")." << endl;
cout << "New request is " << request << endl;
if (request.compare("quit") == 0) {
_channel.cwrite("bye");
usleep(10000); // give the other end a bit of time.
break; // break out of the loop;
}
process_request(_channel, request);
}
}
/*--------------------------------------------------------------------------*/
/* MAIN FUNCTION */
/*--------------------------------------------------------------------------*/
int main(int argc, char * argv[]) {
// cout << "Establishing control channel... " << flush;
RequestChannel control_channel("control", RequestChannel::SERVER_SIDE);
// cout << "done.\n" << flush;
handle_process_loop(control_channel);
}

225
reqchannel.cpp Executable file
View file

@ -0,0 +1,225 @@
/*
File: requestchannel.C
Author: R. Bettati
Department of Computer Science
Texas A&M University
Date : 2012/07/11
*/
/*--------------------------------------------------------------------------*/
/* DEFINES */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* INCLUDES */
/*--------------------------------------------------------------------------*/
#include <cassert>
#include <cstring>
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include "reqchannel.h"
using namespace std;
/*--------------------------------------------------------------------------*/
/* DATA STRUCTURES */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* CONSTANTS */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* FORWARDS */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* PRIVATE METHODS FOR CLASS R e q u e s t C h a n n e l */
/*--------------------------------------------------------------------------*/
char * RequestChannel::pipe_name(Mode _mode) {
string pname = "fifo_" + my_name;
if (my_side == CLIENT_SIDE) {
if (_mode == READ_MODE)
pname += "1";
else
pname += "2";
} else {
/* SERVER_SIDE */
if (_mode == READ_MODE)
pname += "2";
else
pname += "1";
}
char * result = new char[pname.size()+1];
strncpy(result, pname.c_str(), pname.size()+1);
return result;
}
void RequestChannel::open_write_pipe(char * _pipe_name) {
// cout << "mkfifo write pipe\n" << flush;
if (mkfifo(_pipe_name, 0600) < 0) {
if (errno != EEXIST) {
perror("Error creating pipe for writing; exit program");
exit(1);
}
}
// cout << "open write pipe\n" << flush;
wfd = open(_pipe_name, O_WRONLY);
if (wfd < 0) {
perror("Error opening pipe for writing; exit program");
exit(1);
}
// cout << "done opening write pipe\n" << flush;
}
void RequestChannel::open_read_pipe(char * _pipe_name) {
// cout << "mkfifo read pipe\n" << flush;
if (mkfifo(_pipe_name, 0600) < 0) {
if (errno != EEXIST) {
perror("Error creating pipe for writing; exit program");
exit(1);
}
}
// cout << "open read pipe\n" << flush;
rfd = open(_pipe_name, O_RDONLY);
if (rfd < 0) {
perror("Error opening pipe for reading; exit program");
exit(1);
}
// cout << "done opening read pipe\n" << flush;
}
/*--------------------------------------------------------------------------*/
/* CONSTRUCTOR/DESTRUCTOR FOR CLASS R e q u e s t C h a n n e l */
/*--------------------------------------------------------------------------*/
RequestChannel::RequestChannel(const string _name, const Side _side) : my_name(_name), my_side(_side) {
if (_side == SERVER_SIDE) {
open_write_pipe(pipe_name(WRITE_MODE));
open_read_pipe(pipe_name(READ_MODE));
} else {
open_read_pipe(pipe_name(READ_MODE));
open_write_pipe(pipe_name(WRITE_MODE));
}
}
RequestChannel::~RequestChannel() {
cout << "close requests channel " << my_name << endl;
close(wfd);
close(rfd);
if (my_side == SERVER_SIDE) {
cout << "close IPC mechanisms on server side for channel " << my_name << endl;
/* Destruct the underlying IPC mechanisms. */
if (remove(pipe_name(READ_MODE)) != 0) {
perror(string("Request Channel (" + my_name + ") : Error deleting pipe for reading").c_str());
}
if (remove(pipe_name(WRITE_MODE)) != 0) {
perror(string("Request Channel (" + my_name + ") : Error deleting pipe for writing").c_str());
}
}
}
/*--------------------------------------------------------------------------*/
/* READ/WRITE FROM/TO REQUEST CHANNELS */
/*--------------------------------------------------------------------------*/
const int MAX_MESSAGE = 255;
string RequestChannel::send_request(string _request) {
cwrite(_request);
string s = cread();
return s;
}
string RequestChannel::cread() {
char buf[MAX_MESSAGE];
if (read(rfd, buf, MAX_MESSAGE) < 0) {
perror(string("Request Channel (" + my_name + "): Error reading from pipe!").c_str());
}
string s = buf;
// cout << "Request Channel (" << my_name << ") reads [" << buf << "]\n";
return s;
}
int RequestChannel::cwrite(string _msg) {
if (_msg.length() >= MAX_MESSAGE) {
cerr << "Message too long for Channel!\n";
return -1;
}
// cout << "Request Channel (" << my_name << ") writing [" << _msg << "]";
const char * s = _msg.c_str();
if (write(wfd, s, strlen(s)+1) < 0) {
perror(string("Request Channel (" + my_name + ") : Error writing to pipe!").c_str());
}
// cout << "(" << my_name << ") done writing." << endl;
}
/*--------------------------------------------------------------------------*/
/* ACCESS THE NAME OF REQUEST CHANNEL */
/*--------------------------------------------------------------------------*/
string RequestChannel::name() {
return my_name;
}
/*--------------------------------------------------------------------------*/
/* ACCESS FILE DESCRIPTORS OF REQUEST CHANNEL */
/*--------------------------------------------------------------------------*/
int RequestChannel::read_fd() {
return rfd;
}
int RequestChannel::write_fd() {
return wfd;
}

119
reqchannel.h Executable file
View file

@ -0,0 +1,119 @@
/*
File: reqchannel.H
Author: R. Bettati
Department of Computer Science
Texas A&M University
Date : 2012/07/11
*/
#ifndef _reqchannel_H_ // include file only once
#define _reqchannel_H_
/*--------------------------------------------------------------------------*/
/* DEFINES */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* INCLUDES */
/*--------------------------------------------------------------------------*/
#include <iostream>
#include <fstream>
#include <string>
using namespace std;
/*--------------------------------------------------------------------------*/
/* DATA STRUCTURES */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* FORWARDS */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* CLASS R e q u e s t C h a n n e l */
/*--------------------------------------------------------------------------*/
class RequestChannel {
public:
typedef enum {SERVER_SIDE, CLIENT_SIDE} Side;
typedef enum {READ_MODE, WRITE_MODE} Mode;
private:
string my_name;
Side my_side;
/* The current implementation uses named pipes. */
int wfd;
int rfd;
char * pipe_name(Mode _mode);
void open_read_pipe(char * _pipe_name);
void open_write_pipe(char * _pipe_name);
public:
/* -- CONSTRUCTOR/DESTRUCTOR */
RequestChannel(const string _name, const Side _side);
/* Creates a "local copy" of the channel specified by the given name.
If the channel does not exist, the associated IPC mechanisms are
created. If the channel exists already, this object is associated with the channel.
The channel has two ends, which are conveniently called "SERVER_SIDE" and "CLIENT_SIDE".
If two processes connect through a channel, one has to connect on the server side
and the other on the client side. Otherwise the results are unpredictable.
NOTE: If the creation of the request channel fails (typically happens when too many
request channels are being created) and error message is displayed, and the program
unceremoniously exits.
NOTE: It is easy to open too many request channels in parallel. In most systems,
limits on the number of open files per process limit the number of established
request channels to 125.
*/
~RequestChannel();
/* Destructor of the local copy of the bus. By default, the Server Side deletes any IPC
mechanisms associated with the channel. */
string send_request(string _request);
/* Send a string over the channel and wait for a reply. */
string cread();
/* Blocking read of data from the channel. Returns a string of characters
read from the channel. Returns NULL if read failed. */
int cwrite(string _msg);
/* Write the data to the channel. The function returns the number of characters written
to the channel. */
string name();
/* Returns the name of the request channel. */
int read_fd();
/* Returns the file descriptor used to read from the channel. */
int write_fd();
/* Returns the file descriptor used to write to the channel. */
};
#endif

80
semaphore.h Executable file
View file

@ -0,0 +1,80 @@
/*
File: semaphore.H
Author: R. Bettati
Department of Computer Science
Texas A&M University
Date : 08/02/11
*/
#ifndef _semaphore_H_ // include file only once
#define _semaphore_H_
/*--------------------------------------------------------------------------*/
/* DEFINES */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* INCLUDES */
/*--------------------------------------------------------------------------*/
#include <pthread.h>
#include <mutex>
/*--------------------------------------------------------------------------*/
/* DATA STRUCTURES */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* FORWARDS */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* CLASS S e m a p h o r e */
/*--------------------------------------------------------------------------*/
class Semaphore {
private:
/* -- INTERNAL DATA STRUCTURES
You may need to change them to fit your implementation. */
int value;
pthread_mutex_t m;
pthread_cond_t c;
public:
/* -- CONSTRUCTOR/DESTRUCTOR */
Semaphore(int _val){value = _val;}
~Semaphore(){}
/* -- SEMAPHORE OPERATIONS */
int P(){
Lock();
}
//;
int V();
};
/*
class BoundBuffer{
Semaphore full(n);
Semaphore empty(n);
Semaphore mutex(n);
};
*/
#endif

100
simpleclient.cpp Executable file
View file

@ -0,0 +1,100 @@
/*
File: simpleclient.C
Author: R. Bettati
Department of Computer Science
Texas A&M University
Date : 2013/01/31
Simple client main program for MP3 in CSCE 313
*/
/*--------------------------------------------------------------------------*/
/* DEFINES */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* INCLUDES */
/*--------------------------------------------------------------------------*/
#include <cassert>
#include <cstring>
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>
#include "reqchannel.h"
#include "semaphore.h"
using namespace std;
/*--------------------------------------------------------------------------*/
/* DATA STRUCTURES */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* CONSTANTS */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* FORWARDS */
/*--------------------------------------------------------------------------*/
/* -- (none) -- */
/*--------------------------------------------------------------------------*/
/* MAIN FUNCTION */
/*--------------------------------------------------------------------------*/
int main(int argc, char * argv[]) {
Semaphore sema(5);
cout << "CLIENT STARTED:" << endl;
pid_t pid = fork();
if(pid == 0){
cout << "Establishing control channel... " << flush;
RequestChannel chan("control", RequestChannel::CLIENT_SIDE);
cout << "done." << endl;
/* -- Start sending a sequence of requests */
string reply1 = chan.send_request("hello");
cout << "Reply to request 'hello' is '" << reply1 << "'" << endl;
string reply2 = chan.send_request("data Joe Smith");
cout << "Reply to request 'data Joe Smith' is '" << reply2 << "'" << endl;
string reply3 = chan.send_request("data Jane Smith");
cout << "Reply to request 'data Jane Smith' is '" << reply3 << "'" << endl;
string reply5 = chan.send_request("newthread");
cout << "Reply to request 'newthread' is " << reply5 << "'" << endl;
RequestChannel chan2(reply5, RequestChannel::CLIENT_SIDE);
string reply6 = chan2.send_request("data John Doe");
cout << "Reply to request 'data John Doe' is '" << reply6 << "'" << endl;
string reply7 = chan2.send_request("quit");
cout << "Reply to request 'quit' is '" << reply7 << "'" << endl;
string reply4 = chan.send_request("quit");
cout << "Reply to request 'quit' is '" << reply4 << "'" << endl;
}else{
execve("dataserver", argv, argv);
}
usleep(1000000);
}