from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from lib.covidData import covidData, Base def init_database(config_data): try: with open(config_data['postgres_pass']) as pgdb_pass: engine = create_engine( f"postgresql+psycopg2://{config_data['postgres_user']}:{pgdb_pass.readline().strip()}@{config_data['postgres_host']}:{config_data['postgres_port']}/{config_data['postgres_db']}") Base.metadata.bind = engine covidDataSession = sessionmaker(bind=engine) return covidDataSession() except Exception as e: print( f'There was an issue opening the config file for the postgres password.\n{e}') exit(1) def set_data(session, selection, import_data): new_data = covidData(selection=selection.upper(), selection_original=selection) for n in range(1, 8): data = import_data[n].strip().replace( ',', '').replace('+', '').replace('+', '') if(data and not data == ''): if(n == 1): new_data.total_cases = int(data) if(n == 2): new_data.new_cases = int(data) if(n == 3): new_data.total_deaths = int(data) if(n == 4): new_data.new_deaths = int(data) if(n == 5): new_data.total_recovered = int(data) if(n == 6): new_data.active_cases = int(data) if(n == 7): new_data.serious_critical = int(data) else: if(n == 1): new_data.total_cases = 0 if(n == 2): new_data.new_cases = 0 if(n == 3): new_data.total_deaths = 0 if(n == 4): new_data.new_deaths = 0 if(n == 5): new_data.total_recovered = 0 if(n == 6): new_data.active_cases = 0 if(n == 7): new_data.serious_critical = 0 new_data.total_cases_per_one_mil = import_data[8].strip() session.merge(new_data) session.commit() def get_formatted_data(session, selection): print('Formatting data.') columns = sorted([i for i in covidData.__dict__.keys() if i[:1] != '_']) columns = [' '.join([d.capitalize() for d in c.replace( 'per', '/').split('_')]) for c in columns] all_data_query = session.query(covidData).filter( covidData.selection == selection).all() return (columns, all_data_query) def get_top_n_rows(session, num): print(f'Getting top {num} rows.') top_n_rows = session.query(covidData).order_by(covidData.total_cases.desc()).limit(num).all() return top_n_rows