97 lines
3.7 KiB
Python
97 lines
3.7 KiB
Python
![]() |
from sqlalchemy import create_engine
|
||
|
from sqlalchemy.orm import sessionmaker
|
||
|
from inspect import getmembers, isroutine
|
||
|
from lib.covidData import covidData, Base
|
||
|
|
||
|
|
||
|
def init_database(config_data):
|
||
|
try:
|
||
|
with open(config_data['postgres_pass']) as pgdb_pass:
|
||
|
engine = create_engine(
|
||
|
f"postgresql+psycopg2://{config_data['postgres_user']}:{pgdb_pass.readline().strip()}@{config_data['postgres_host']}:{config_data['postgres_port']}/{config_data['postgres_db']}")
|
||
|
Base.metadata.bind = engine
|
||
|
covidDataSession = sessionmaker(bind=engine)
|
||
|
return covidDataSession()
|
||
|
except Exception as e:
|
||
|
print(
|
||
|
f'There was an issue opening the config file for the postgres password.\n{e}')
|
||
|
exit(1)
|
||
|
|
||
|
|
||
|
def set_data(session, selection, import_data):
|
||
|
new_data = covidData(selection=selection.upper())
|
||
|
for n in range(1, 8):
|
||
|
data = import_data[n].strip().replace(
|
||
|
',', '').replace('+', '').replace('+', '')
|
||
|
if(data and not data == ''):
|
||
|
if(n == 1):
|
||
|
new_data.total_cases = int(data)
|
||
|
if(n == 2):
|
||
|
new_data.new_cases = int(data)
|
||
|
if(n == 3):
|
||
|
new_data.total_deaths = int(data)
|
||
|
if(n == 4):
|
||
|
new_data.new_deaths = int(data)
|
||
|
if(n == 5):
|
||
|
new_data.total_recovered = int(data)
|
||
|
if(n == 6):
|
||
|
new_data.active_cases = int(data)
|
||
|
if(n == 7):
|
||
|
new_data.serious_critical = int(data)
|
||
|
else:
|
||
|
if(n == 1):
|
||
|
new_data.total_cases = 0
|
||
|
if(n == 2):
|
||
|
new_data.new_cases = 0
|
||
|
if(n == 3):
|
||
|
new_data.total_deaths = 0
|
||
|
if(n == 4):
|
||
|
new_data.new_deaths = 0
|
||
|
if(n == 5):
|
||
|
new_data.total_recovered = 0
|
||
|
if(n == 6):
|
||
|
new_data.active_cases = 0
|
||
|
if(n == 7):
|
||
|
new_data.serious_critical = 0
|
||
|
new_data.total_cases_per_one_mil = import_data[8].strip()
|
||
|
try:
|
||
|
session.query(covidData).filter(covidData.selection == selection).one()
|
||
|
try:
|
||
|
session.merge(new_data)
|
||
|
session.commit()
|
||
|
except Exception as e:
|
||
|
session.rollback()
|
||
|
print(f'There was an issue trying to add new data:\n{e}')
|
||
|
# exit(1)
|
||
|
except:
|
||
|
try:
|
||
|
session.add(new_data)
|
||
|
session.commit()
|
||
|
except Exception as e:
|
||
|
session.rollback()
|
||
|
print(f'There was an issue trying to add new data:\n{e}')
|
||
|
# exit(1)
|
||
|
|
||
|
|
||
|
def get_formatted_data(session, selection):
|
||
|
print('Formatting data.')
|
||
|
columns = sorted([i for i in covidData.__dict__.keys() if i[:1] != '_'])
|
||
|
columns = [' '.join([d.capitalize() for d in c.replace(
|
||
|
'per', '/').split('_')]) for c in columns]
|
||
|
all_data_query = session.query(covidData).filter(
|
||
|
covidData.selection == selection).all()
|
||
|
output = ''
|
||
|
for data in all_data_query:
|
||
|
get_values = [attr for attr in getmembers(data, lambda a:not(
|
||
|
isroutine(a))) if not(attr[0].startswith('__') and attr[0].endswith('__')) and not attr[0].startswith('_') and not attr[0] == 'metadata']
|
||
|
output += f'{columns[3]}: {get_values[3][1]}\n'
|
||
|
output += f'{columns[5]}: {get_values[5][1]}\n'
|
||
|
output += f'{columns[1]}: {get_values[1][1]}\n'
|
||
|
output += f'{columns[7]}: {get_values[7][1]}\n'
|
||
|
output += f'{columns[2]}: {get_values[2][1]}\n'
|
||
|
output += f'{columns[8]}: {get_values[8][1]}\n'
|
||
|
output += f'{columns[0]}: {get_values[0][1]}\n'
|
||
|
output += f'{columns[4]}: {get_values[4][1]}\n'
|
||
|
output += f'{columns[6]}: {get_values[6][1]}\n'
|
||
|
return output
|