481 lines
17 KiB
Python
481 lines
17 KiB
Python
#handles the database interactions
|
|
import os, json, datetime, re
|
|
import sqlite3 as sql
|
|
from random import sample
|
|
|
|
data_dir = "data"
|
|
|
|
def create_connection():
|
|
#create connection, create db if doesn't exist
|
|
conn = None
|
|
try:
|
|
conn = sql.connect(os.path.join(data_dir, "matteo.db"))
|
|
|
|
# enable write-ahead log for performance and resilience
|
|
conn.execute('pragma journal_mode=wal')
|
|
|
|
return conn
|
|
except:
|
|
print("oops, db connection no work")
|
|
return conn
|
|
|
|
|
|
def initialcheck():
|
|
conn = create_connection()
|
|
soulscream_table_check_string = """ CREATE TABLE IF NOT EXISTS soulscreams (
|
|
counter integer PRIMARY KEY,
|
|
name text NOT NULL,
|
|
soulscream text NOT NULL,
|
|
timestamp text NOT NULL
|
|
); """
|
|
|
|
player_cache_table_check_string = """ CREATE TABLE IF NOT EXISTS players (
|
|
counter integer PRIMARY KEY,
|
|
name text NOT NULL,
|
|
json_string text NOT NULL,
|
|
timestamp text NOT NULL
|
|
); """
|
|
|
|
player_table_check_string = """ CREATE TABLE IF NOT EXISTS user_designated_players (
|
|
user_id integer PRIMARY KEY,
|
|
user_name text,
|
|
player_id text NOT NULL,
|
|
player_name text NOT NULL,
|
|
player_json_string text NOT NULL
|
|
);"""
|
|
|
|
player_stats_table_check_string = """ CREATE TABLE IF NOT EXISTS stats (
|
|
counter integer PRIMARY KEY,
|
|
id text,
|
|
name text,
|
|
json_string text,
|
|
outs_pitched integer DEFAULT 0,
|
|
walks_allowed integer DEFAULT 0,
|
|
hits_allowed integer DEFAULT 0,
|
|
strikeouts_given integer DEFAULT 0,
|
|
runs_allowed integer DEFAULT 0,
|
|
plate_appearances integer DEFAULT 0,
|
|
walks_taken integer DEFAULT 0,
|
|
sacrifices integer DEFAULT 0,
|
|
hits integer DEFAULT 0,
|
|
home_runs integer DEFAULT 0,
|
|
total_bases integer DEFAULT 0,
|
|
rbis integer DEFAULT 0,
|
|
strikeouts_taken integer DEFAULT 0
|
|
);"""
|
|
|
|
teams_table_check_string = """ CREATE TABLE IF NOT EXISTS teams (
|
|
counter integer PRIMARY KEY,
|
|
name text NOT NULL,
|
|
team_json_string text NOT NULL,
|
|
timestamp text NOT NULL,
|
|
owner_id integer
|
|
); """
|
|
|
|
one_big_league_check_string = """ CREATE TABLE IF NOT EXISTS one_big_league (
|
|
counter integer PRIMARY KEY,
|
|
team_name text NOT NULL,
|
|
teams_beaten_list text,
|
|
current_opponent_pool text,
|
|
obl_points int DEFAULT 0,
|
|
rival_name text
|
|
);"""
|
|
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
c.execute(soulscream_table_check_string)
|
|
c.execute(player_cache_table_check_string)
|
|
c.execute(player_table_check_string)
|
|
c.execute(player_stats_table_check_string)
|
|
c.execute(teams_table_check_string)
|
|
c.execute(one_big_league_check_string)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_stats(player_name):
|
|
conn = create_connection()
|
|
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM players WHERE name=?", (player_name,))
|
|
player = c.fetchone()
|
|
try:
|
|
cachetime = datetime.datetime.fromisoformat(player[3])
|
|
if datetime.datetime.now(datetime.timezone.utc) - cachetime >= datetime.timedelta(days = 7):
|
|
#delete old cache
|
|
c.execute("DELETE FROM players WHERE name=?", (player_name,))
|
|
conn.commit()
|
|
conn.close()
|
|
return None
|
|
except TypeError:
|
|
conn.close()
|
|
return None
|
|
|
|
conn.close()
|
|
return player[2] #returns a json_string
|
|
|
|
conn.close()
|
|
return None
|
|
|
|
def cache_stats(name, json_string):
|
|
conn = create_connection()
|
|
store_string = """ INSERT INTO players(name, json_string, timestamp)
|
|
VALUES (?,?, ?) """
|
|
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
c.execute(store_string, (name, json_string, datetime.datetime.now(datetime.timezone.utc)))
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
|
|
|
|
def get_soulscream(username):
|
|
conn = create_connection()
|
|
|
|
#returns none if not found or more than 3 days old
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
c.execute("SELECT * FROM soulscreams WHERE name=?", (username,))
|
|
scream = c.fetchone()
|
|
|
|
try:
|
|
cachetime = datetime.datetime.fromisoformat(scream[3])
|
|
if datetime.datetime.now(datetime.timezone.utc) - cachetime >= datetime.timedelta(days = 7):
|
|
#delete old cache
|
|
c.execute("DELETE FROM soulscreams WHERE name=?", (username,))
|
|
conn.commit()
|
|
conn.close()
|
|
return None
|
|
except TypeError:
|
|
conn.close()
|
|
return None
|
|
|
|
conn.close()
|
|
return scream[2]
|
|
|
|
conn.close()
|
|
return None
|
|
|
|
|
|
|
|
def cache_soulscream(username, soulscream):
|
|
conn = create_connection()
|
|
store_string = """ INSERT INTO soulscreams(name, soulscream, timestamp)
|
|
VALUES (?,?, ?) """
|
|
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
c.execute(store_string, (username, soulscream, datetime.datetime.now(datetime.timezone.utc)))
|
|
conn.commit()
|
|
|
|
conn.close()
|
|
|
|
|
|
def designate_player(user, player_json):
|
|
conn = create_connection()
|
|
store_string = """ INSERT INTO user_designated_players(user_id, user_name, player_id, player_name, player_json_string)
|
|
VALUES (?, ?, ?, ?, ?)"""
|
|
|
|
user_player = get_user_player_conn(conn, user)
|
|
c = conn.cursor()
|
|
if user_player is not None:
|
|
c.execute("DELETE FROM user_designated_players WHERE user_id=?", (user.id,)) #delete player if already exists
|
|
c.execute(store_string, (user.id, user.name, player_json["id"], player_json["name"], json.dumps(player_json)))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def get_user_player_conn(conn, user):
|
|
try:
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
c.execute("SELECT player_json_string FROM user_designated_players WHERE user_id=?", (user.id,))
|
|
try:
|
|
return json.loads(c.fetchone()[0])
|
|
except TypeError:
|
|
return False
|
|
else:
|
|
conn.close()
|
|
return False
|
|
except:
|
|
conn.close()
|
|
return False
|
|
conn.close()
|
|
return False
|
|
|
|
def get_user_player(user):
|
|
conn = create_connection()
|
|
player = get_user_player_conn(conn, user)
|
|
conn.close()
|
|
return player
|
|
|
|
def save_team(name, team_json_string, user_id):
|
|
conn = create_connection()
|
|
try:
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
store_string = """ INSERT INTO teams(name, team_json_string, timestamp, owner_id)
|
|
VALUES (?,?, ?, ?) """
|
|
c.execute(store_string, (re.sub('[^A-Za-z0-9 ]+', '', name), team_json_string, datetime.datetime.now(datetime.timezone.utc), user_id)) #this regex removes all non-standard characters
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
conn.close()
|
|
return False
|
|
except:
|
|
return False
|
|
conn.close()
|
|
return False
|
|
|
|
def update_team(name, team_json_string):
|
|
conn = create_connection()
|
|
try:
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
store_string = "UPDATE teams SET team_json_string = ? WHERE name=?"
|
|
c.execute(store_string, (team_json_string, (re.sub('[^A-Za-z0-9 ]+', '', name)))) #this regex removes all non-standard characters
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
conn.close()
|
|
return False
|
|
except:
|
|
conn.close()
|
|
return False
|
|
conn.close()
|
|
return False
|
|
|
|
def get_team(name, owner=False):
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
if not owner:
|
|
c.execute("SELECT team_json_string FROM teams WHERE name=?", (re.sub('[^A-Za-z0-9 ]+', '', name),)) #see above note re: regex
|
|
else:
|
|
c.execute("SELECT * FROM teams WHERE name=?", (re.sub('[^A-Za-z0-9 ]+', '', name),)) #see above note re: regex
|
|
team = c.fetchone()
|
|
|
|
conn.close()
|
|
|
|
return team #returns a json string if owner is false, otherwise returns (counter, name, team_json_string, timestamp, owner_id)
|
|
|
|
|
|
|
|
conn.close()
|
|
return None
|
|
|
|
def delete_team(team):
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
try:
|
|
c = conn.cursor()
|
|
c.execute("DELETE FROM teams WHERE name=?", (re.sub('[^A-Za-z0-9 ]+', '', team.name),))
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
except:
|
|
conn.close()
|
|
return False
|
|
conn.close()
|
|
return False
|
|
|
|
def assign_owner(team_name, owner_id):
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
try:
|
|
c = conn.cursor()
|
|
c.execute("UPDATE teams SET owner_id = ? WHERE name = ?",(owner_id, re.sub('[^A-Za-z0-9 ]+', '', team_name)))
|
|
conn.commit()
|
|
conn.close()
|
|
return True
|
|
except:
|
|
conn.close()
|
|
return False
|
|
conn.close()
|
|
return False
|
|
|
|
def get_all_teams():
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
c.execute("SELECT team_json_string FROM teams")
|
|
team_strings = c.fetchall()
|
|
conn.close()
|
|
return team_strings
|
|
|
|
conn.close()
|
|
return None
|
|
|
|
def get_all_team_names():
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
c.execute("SELECT name FROM teams")
|
|
team_names = c.fetchall()
|
|
team_names_out = [name for (name,) in team_names]
|
|
conn.close()
|
|
return team_names_out
|
|
conn.close()
|
|
return None
|
|
|
|
def get_filtered_teams(i_filter_list):
|
|
teams_list = get_all_team_names()
|
|
out_list = []
|
|
filter_list = [re.sub('[^A-Za-z0-9 %]+', '', filter_team) for filter_team in i_filter_list]
|
|
for team in teams_list:
|
|
if re.sub('[^A-Za-z0-9 %]+', '', team) not in filter_list:
|
|
out_list.append(team)
|
|
return out_list
|
|
|
|
def search_teams(search_string):
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c = conn.cursor()
|
|
c.execute("SELECT team_json_string, counter FROM teams WHERE name LIKE ?",(re.sub('[^A-Za-z0-9 %]+', '', f"%{search_string}%"),))
|
|
team_json_strings = c.fetchall()
|
|
conn.close()
|
|
return team_json_strings
|
|
|
|
conn.close()
|
|
return None
|
|
|
|
def add_stats(player_game_stats_list):
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c=conn.cursor()
|
|
for (name, player_stats_dic) in player_game_stats_list:
|
|
c.execute("SELECT * FROM stats WHERE name=?",(name,))
|
|
this_player = c.fetchone()
|
|
if this_player is not None:
|
|
for stat in player_stats_dic.keys():
|
|
c.execute(f"SELECT {stat} FROM stats WHERE name=?",(name,))
|
|
old_value = int(c.fetchone()[0])
|
|
c.execute(f"UPDATE stats SET {stat} = ? WHERE name=?",(player_stats_dic[stat]+old_value,name))
|
|
else:
|
|
c.execute("INSERT INTO stats(name) VALUES (?)",(name,))
|
|
for stat in player_stats_dic.keys():
|
|
c.execute(f"UPDATE stats SET {stat} = ? WHERE name=?",(player_stats_dic[stat],name))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def add_team_obl(team):
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c=conn.cursor()
|
|
opponents = sample(get_filtered_teams([team.name]), 5)
|
|
c.execute("INSERT INTO one_big_league(team_name, current_opponent_pool) VALUES (?, ?)", (team.name, list_to_newline_string(opponents)))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def save_obl_results(winning_team, losing_team, xvi_team = None):
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c=conn.cursor()
|
|
try:
|
|
c.execute("SELECT teams_beaten_list, current_opponent_pool, obl_points FROM one_big_league WHERE team_name = ?", (winning_team.name,))
|
|
beaten_string, opponents_string, obl_points = c.fetchone()
|
|
except:
|
|
return
|
|
|
|
beaten_teams = newline_string_to_list(beaten_string)
|
|
opponent_teams = newline_string_to_list(opponents_string)
|
|
if re.sub('[^A-Za-z0-9 %]+', '', losing_team.name) in opponent_teams:
|
|
beaten_teams.append(losing_team.name)
|
|
try:
|
|
opponent_teams = sample(get_filtered_teams([winning_team.name] + beaten_teams), 5)
|
|
except ValueError:
|
|
opponent_teams = get_filtered_teams([winning_team.name] + beaten_teams)
|
|
obl_points += 1
|
|
|
|
c.execute("UPDATE one_big_league SET teams_beaten_list = ?, current_opponent_pool = ?, obl_points = ? WHERE team_name = ?", (list_to_newline_string(beaten_teams), list_to_newline_string(opponent_teams), obl_points, winning_team.name))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
if xvi_team is not None:
|
|
add_obl_point(xvi_team)
|
|
return
|
|
|
|
def add_obl_point(team):
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c=conn.cursor()
|
|
|
|
c.execute("SELECT obl_points FROM one_big_league WHERE team_name = ?", (team.name,))
|
|
xvi_obl_points = c.fetchone()[0]
|
|
xvi_obl_points += 1
|
|
c.execute("UPDATE one_big_league SET obl_points = ? WHERE team_name = ?", (xvi_obl_points, team.name))
|
|
conn.commit()
|
|
conn.close()
|
|
return
|
|
|
|
def get_obl_stats(team, full = False):
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c=conn.cursor()
|
|
opponents_string = None
|
|
while opponents_string is None:
|
|
c.execute("SELECT teams_beaten_list, current_opponent_pool, rival_name, obl_points FROM one_big_league WHERE team_name = ?", (team.name,))
|
|
try:
|
|
beaten_string, opponents_string, rival_name, obl_points = c.fetchone()
|
|
except TypeError: #add team to OBL
|
|
obl_points = 0
|
|
add_team_obl(team)
|
|
|
|
beaten_teams = newline_string_to_list(beaten_string)
|
|
opponent_teams = opponents_string
|
|
|
|
teams_list = [name for name, points in obl_leaderboards()]
|
|
rank = teams_list.index(team.name) + 1
|
|
if not full:
|
|
return (obl_points, opponent_teams, rank)
|
|
else:
|
|
return (obl_points, beaten_teams, opponent_teams, rank, rival_name)
|
|
conn.close()
|
|
return (None, None)
|
|
|
|
def obl_leaderboards():
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c=conn.cursor()
|
|
c.execute("SELECT team_name, obl_points FROM one_big_league ORDER BY obl_points DESC")
|
|
teams_list = c.fetchall()
|
|
|
|
return teams_list #element (team_name, obl_points)
|
|
conn.close()
|
|
return False
|
|
|
|
def set_obl_rival(base_team, rival):
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c=conn.cursor()
|
|
|
|
c.execute("UPDATE one_big_league SET rival_name = ? WHERE team_name = ?", (rival.name, base_team.name))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def clear_obl():
|
|
conn = create_connection()
|
|
if conn is not None:
|
|
c=conn.cursor()
|
|
|
|
c.execute("DELETE FROM one_big_league")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
def list_to_newline_string(list):
|
|
string = ""
|
|
for element in list:
|
|
if string != "":
|
|
string += "\n"
|
|
string += element
|
|
return string
|
|
|
|
def newline_string_to_list(string):
|
|
if string is not None and string != "":
|
|
return string.split("\n")
|
|
else:
|
|
return [] |