This repository has been archived on 2025-04-11. You can view files and clone it, but cannot push or open issues or pull requests.
uwu-me/uwume/lib/databaseMethods.py
2020-03-14 21:45:30 -05:00

99 lines
No EOL
3.2 KiB
Python

from sqlalchemy import create_engine, insert, update, select, MetaData, Table, Column, Integer, String, Boolean
from os import environ
pg_pass_file = open(environ['POSTGRES_PASSWORD_FILE'])
pg_pass = pg_pass_file.readline().strip('\n')
engine = create_engine(
f"postgresql+psycopg2://{environ['POSTGRES_USER']}:{pg_pass}@{environ['POSTGRES_HOST']}:{environ['POSTGRES_PORT']}/{environ['POSTGRES_DB']}")
conn = engine.connect()
md = MetaData()
def initialize_database():
if(not engine.dialect.has_table(engine, 'users')):
users = Table('users', md,
Column('id', Integer()),
Column('name', String(255)),
Column('password', String(255)),
Column('count', Integer()))
try:
md.create_all(engine)
except Exception as e:
print(f'{e}')
create_user(0, 'admin', 'admin', 0)
else:
print('Users table already created, skipping initialization.')
def create_user(idn, name, password, count):
users = Table('users', md, autoload=True, autoload_with=engine)
query = insert(users)
values = [{
'id': idn,
'name': name,
'password': password,
'count': count
}]
output = conn.execute(query, values)
def check_password(user, password):
users = Table('users', md, autoload=True, autoload_with=engine)
query = select([users.columns.password]).where(users.columns.name == user)
output = conn.execute(query)
return output.fetchone()[0] == password
def get_table():
users = Table('users', md, autoload=True, autoload_with=engine)
return users.columns.keys()
def get_count(user):
users = Table('users', md, autoload=True, autoload_with=engine)
query = select([users.columns.count]).where(users.columns.name == user)
output = conn.execute(query)
return output.fetchone()[0]
def update_count(user, count):
if(get_count(user) < count):
users = Table('users', md, autoload=True, autoload_with=engine)
query = update(users).values(count=count).where(users.columns.name == user)
return conn.execute(query)
def get_user(id):
users = Table('users', md, autoload=True, autoload_with=engine)
query = select([users.columns]).where(users.columns.name == id)
output = conn.execute(query)
return output.fetchone()[0]
def get_users():
users = Table('users', md, autoload=True, autoload_with=engine)
query = select([users.columns.name])
output = conn.execute(query)
user_list = output.fetchall()
user_list_formatted = []
for u in user_list:
user_list_formatted += [str(u[0])]
return user_list_formatted
def get_users_list(partition):
users = Table('users', md, autoload=True, autoload_with=engine)
query = select([users.columns.name, users.columns.count]).order_by(users.columns.name.asc()).offset(partition*10).limit(10)
output = conn.execute(query)
user_list = output.fetchall()
user_list_formatted = []
for u in user_list:
user_list_formatted += [(str(u[0]), str(u[1]))]
return user_list_formatted
def get_all():
users = Table('users', md, autoload=True, autoload_with=engine)
query = select([users])
output = conn.execute(query)
return output.fetchall()