from functools import reduce import numpy as np from keras.models import Sequential from keras.layers import Dense from os.path import join # Used to format our input binary state. def format_input(acc, elem): hex_elem = (elem - (elem >> 4 << 4)) for x in range(16): if x == hex_elem: acc.append(1) else: acc.append(0) hex_elem = (elem >> 4) % 16 for x in range(16): if x == hex_elem: acc.append(1) else: acc.append(0) return acc # Calculate Manhattan distance between two points. def man_dist(x, y): for a, b in zip(x, y): a_one, a_two = x b_one, b_two = y return (abs(a_one - b_one) + abs(a_two - b_two)) # Calculate Manhattan distance between each set of two points in a list. def man_dist_state(x, y): return sum(man_dist(a, b) for a, b in zip(x, y)) # Used to format the positions we parsed from our binary input. def format_pos(acc, elem): hex_elem = (elem[1] - (elem[1] >> 4 << 4)) if hex_elem == 0: acc.append((hex_elem, (3,3))) else: acc.append((hex_elem, ((15 - ((elem[0]) * 2)) % 4,int((15 - ((elem[0]) * 2)) / 4)))) hex_elem = (elem[1] >> 4) % 16 if hex_elem == 0: acc.append((hex_elem, (3,3))) else: acc.append((hex_elem, ((15 - ((elem[0]) * 2 + 1)) % 4,int((15 - ((elem[0]) * 2 + 1)) / 4)))) return acc # The title of this function is slightly misleading. # I'm simply generating a list of positions that each # puzzle piece in the current parsed state SHOULD be at. # I organize this in order of the pieces as they were # parsed so the two lists line up perfectly. def generate_pos(acc, elem): if(elem[0] == 0): acc.append((3,3)) else: acc.append((((elem[0] - 1) % 4), (int((elem[0] - 1)/4)))) return acc # Used to format our ending Manhattan distance into a format # that can be compared with our 29 output neurons. def format_man_dist(elem): acc = [] for x in range(28, -1, -1): if x == elem: acc.append(1) else: acc.append(0) return acc target = [] for i in range(29): filename = join('/pub/faculty_share/daugher/datafiles/data/' + str(i) + 'states.bin') # Debugging to print the current file from which states are being parsed. #print(i) temp = [] with open(filename, 'rb') as f: data = f.read(8) counter = 0 while(data and counter < 2000): temp.append(format_man_dist(i)) data = f.read(8) counter += 1 target.append(temp) #print(target[28][500]) # Sets up a Sequential model, Sequential is all # that should need to be used for this project, # considering that it will only be dealing with # a linear stack of layers of neurons. model = Sequential() # Adding layers to the model. model.add(Dense(units=240, activation='tanh', input_dim=240)) model.add(Dense(units=120, activation='tanh')) model.add(Dense(units=60, activation='tanh')) model.add(Dense(units=29, activation='sigmoid')) # Configure the learning process. model.compile(optimizer='sgd', loss='mean_squared_error', metrics=['accuracy']) for i in range(29): filename = join('/pub/faculty_share/daugher/datafiles/data/' + str(i) + 'states.bin') # Debugging to print the current file from which states are being parsed. print(i) with open(filename, 'rb') as f: data = f.read(8) counter = 0 training = [] while(data and counter < 2000): bin_data = reduce(format_input, list(data), []) bin_data.reverse() bin_data = bin_data[16:] training.append(bin_data) data = f.read(8) counter += 1 #print(training[0]) # Train the network. model.fit(np.array(training), np.array(target[i]), epochs=8, batch_size=2000) #model.train_on_batch(np.array(temp), np.array(target)) # Used for testing data for i in range(11, 29): filename = join('/pub/faculty_share/daugher/datafiles/data/', str(i) + 'states.bin') print(i) with open(filename, 'rb') as f: for i in range(2000): data = f.read(8) data = f.read(8) counter = 0 testing = [] testing_target = [] while(data and counter < 10000): bin_data = reduce(format_input, list(data), []) bin_data.reverse() bin_data = bin_data[16:] testing.append(bin_data) pos_data = reduce(format_pos, enumerate(list(data)), []) pos_data.reverse() pos_data = pos_data[1:] state_pos = [] for p in pos_data: state_pos.append(p[1]) testing_target_pos = reduce(generate_pos, pos_data, []) testing_target.append(format_man_dist(man_dist_state(state_pos, testing_target_pos))) counter += 1 data = f.read(8) # Evaluate accuracy loss_and_metrics = model.evaluate(np.array(testing),np.array(testing_target), batch_size=1000) # Generating predictions: predictions = model.predict(np.array(testing), batch_size=1000) output = [] for p in range(len(predictions)): if np.argmax(testing_target[p]) < 18: output.append(100*((18 - (28 - np.argmax(predictions[p]))) / (18 - np.argmax(testing_target[p])))) else: output.append(0) #for i in range(len(output)): # print(output[i]) print("Percentage possible improvement: ", np.array(output).mean()) print(model.metrics_names[0], loss_and_metrics[0]) print(model.metrics_names[1], loss_and_metrics[1])