import os, json, re, jsonpickle import sqlite3 as sql data_dir = "data" league_dir = "leagues" statements_file = os.path.join(data_dir, "sql_statements.xvi") def create_connection(league_name): #create connection, create db if doesn't exist conn = None try: if not os.path.exists(os.path.join(data_dir, league_dir, league_name)): os.makedirs(os.path.join(data_dir, league_dir, league_name)) conn = sql.connect(os.path.join(data_dir, league_dir, league_name, f"{league_name}.db")) # enable write-ahead log for performance and resilience conn.execute('pragma journal_mode=wal') modifications_table_check_string = """ CREATE TABLE IF NOT EXISTS mods ( counter integer PRIMARY KEY, name text NOT NULL, team_name text NOT NULL, modifications_json text );""" conn.execute(modifications_table_check_string) return conn except: print("oops, db connection no work") return conn def statements(): if not os.path.exists(os.path.dirname(statements_file)): os.makedirs(os.path.dirname(statements_file)) if not os.path.exists(statements_file): #generate default statements: bat_base and pitch_base to be appended with a relevant ORDER BY statement config_dic = { "bat_base" : """SELECT name, team_name, plate_appearances - (walks_taken + sacrifices) as ABs, ROUND(hits*1.0 / (plate_appearances - (walks_taken + sacrifices)*1.0),3) as BA, ROUND(total_bases*1.0 / (plate_appearances - (walks_taken + sacrifices)*1.0),3) as SLG, ROUND((walks_taken + hits)*1.0/plate_appearances*1.0,3) as OBP, ROUND((walks_taken + hits)*1.0/plate_appearances*1.0,3) + ROUND(total_bases*1.0 / (plate_appearances - (walks_taken + sacrifices)*1.0),3) as OPS FROM stats WHERE plate_appearances > """, "bat_base_req": 3, "avg" : ["ORDER BY BA DESC;", "bat_base"], "slg" : ["ORDER BY SLG DESC;", "bat_base"], "obp" : ["ORDER BY OBP DESC;", "bat_base"], "ops" : ["ORDER BY OPS DESC;", "bat_base"], "bat_count_base": "SELECT name, team_name,\n\tplate_appearances - (walks_taken + sacrifices) as ABs,\nwalks_taken as BB,\nhits as H,\nhome_runs as HR,\nrbis as RBIs,\nstrikeouts_taken as K,\nsacrifices\nFROM stats WHERE plate_appearances > 8", "home runs": ["ORDER BY HR DESC;", "bat_count_base"], "walks drawn": ["ORDER BY BB DESC;", "bat_count_base"], "bat_count_base_req" : 3, "pitch_base" : """SELECT name, team_name, ROUND(((outs_pitched*1.0)/3.0),1) as IP, ROUND(runs_allowed*27.0/(outs_pitched*1.0),3) as ERA, ROUND((walks_allowed+hits_allowed)*3.0/(outs_pitched*1.0),3) as WHIP, ROUND(walks_allowed*27.0/(outs_pitched*1.0),3) as BBper9, ROUND(strikeouts_given*27.0/(outs_pitched*1.0),3) as Kper9, ROUND(strikeouts_given*1.0/walks_allowed*1.0,3) as KperBB FROM stats WHERE outs_pitched > """, "pitch_base_req": 2, "era" : ["ORDER BY ERA ASC;", "pitch_base"], "whip" : ["ORDER BY WHIP ASC;", "pitch_base"], "kper9" : ["ORDER BY Kper9 DESC;", "pitch_base"], "bbper9" : ["ORDER BY BBper9 ASC;", "pitch_base"], "kperbb" : ["ORDER BY KperBB DESC;", "pitch_base"] } with open(statements_file, "w") as config_file: json.dump(config_dic, config_file, indent=4) return config_dic else: with open(statements_file) as config_file: return json.load(config_file) def create_season_connection(league_name, season_num): #create connection, create db if doesn't exist conn = None try: if not os.path.exists(os.path.join(data_dir, league_dir, league_name)): os.makedirs(os.path.join(data_dir, league_dir, league_name)) conn = sql.connect(os.path.join(data_dir, league_dir, league_name, season_num, f"{league_name}.db")) # enable write-ahead log for performance and resilience conn.execute('pragma journal_mode=wal') return conn except: print("oops, db connection no work") return conn def state(league_name): if not os.path.exists(os.path.dirname(os.path.join(data_dir, league_dir, league_name, f"{league_name}.state"))): os.makedirs(os.path.dirname(os.path.join(data_dir, league_dir, league_name, f"{league_name}.state"))) with open(os.path.join(data_dir, league_dir, league_name, f"{league_name}.state")) as state_file: return json.load(state_file) def init_league_db(league): if os.path.exists(os.path.join(data_dir, league_dir, league.name, f"{league.name}.db")): os.remove(os.path.join(data_dir, league_dir, league.name, f"{league.name}.db")) conn = create_connection(league.name) player_stats_table_check_string = """ CREATE TABLE IF NOT EXISTS stats ( counter integer PRIMARY KEY, id text, name text, team_name text, outs_pitched integer DEFAULT 0, walks_allowed integer DEFAULT 0, hits_allowed integer DEFAULT 0, strikeouts_given integer DEFAULT 0, runs_allowed integer DEFAULT 0, plate_appearances integer DEFAULT 0, walks_taken integer DEFAULT 0, sacrifices integer DEFAULT 0, hits integer DEFAULT 0, home_runs integer DEFAULT 0, total_bases integer DEFAULT 0, rbis integer DEFAULT 0, strikeouts_taken integer DEFAULT 0 );""" teams_table_check_string = """ CREATE TABLE IF NOT EXISTS teams ( counter integer PRIMARY KEY, name text NOT NULL, wins integer DEFAULT 0, losses integer DEFAULT 0, run_diff integer DEFAULT 0 ); """ modifications_table_check_string = """ CREATE TABLE IF NOT EXISTS mods ( counter integer PRIMARY KEY, name text NOT NULL, team_name text NOT NULL, modifications_json text );""" if conn is not None: c = conn.cursor() c.execute(player_stats_table_check_string) c.execute(teams_table_check_string) c.execute(modifications_table_check_string) for team in league.teams_in_league(): c.execute("INSERT INTO teams (name) VALUES (?)", (team.name,)) player_string = "INSERT INTO stats (name, team_name) VALUES (?,?)" for batter in team.lineup: c.execute(player_string, (batter.name, team.name)) for pitcher in team.rotation: c.execute(player_string, (pitcher.name, team.name)) conn.commit() conn.close() def save_league(league): if league_exists(league.name): state_dic = { "season" : league.season, "day" : league.day, "subs" : league.subbed_channels, "last_weather_event" : league.last_weather_event_day, "constraints" : league.constraints, "game_length" : league.game_length, "series_length" : league.series_length, "games_per_hour" : league.games_per_hour, "owner" : league.owner, "champion" : league.champion, "schedule" : league.schedule, "forecasts" : league.weather_forecast, "historic" : league.historic } with open(os.path.join(data_dir, league_dir, league.name, f"{league.name}.state"), "w") as state_file: json.dump(state_dic, state_file, indent=4) def add_stats(league_name, player_game_stats_list): conn = create_connection(league_name) if conn is not None: c=conn.cursor() for team_name in player_game_stats_list.keys(): for (name, player_stats_dic) in player_game_stats_list[team_name]: c.execute("SELECT * FROM stats WHERE name=? AND team_name=?",(name, team_name)) this_player = c.fetchone() if this_player is not None: for stat in player_stats_dic.keys(): c.execute(f"SELECT {stat} FROM stats WHERE name=? AND team_name=?",(name, team_name)) old_value = int(c.fetchone()[0]) c.execute(f"UPDATE stats SET {stat} = ? WHERE name=? AND team_name=?",(player_stats_dic[stat]+old_value, name, team_name)) else: c.execute("INSERT INTO stats(name) VALUES (?)",(name,)) for stat in player_stats_dic.keys(): c.execute(f"UPDATE stats SET {stat} = ? WHERE name=? AND team_name=?",(player_stats_dic[stat], name, team_name)) conn.commit() conn.close() def get_stats(league_name, stat, is_batter=True, day = 10): conn = create_connection(league_name) stats = None if conn is not None: conn.row_factory = sql.Row c=conn.cursor() if stat in statements().keys(): req_number = str(day * int(statements()[statements()[stat][1]+"_req"])) c.execute(statements()[statements()[stat][1]]+req_number+"\n"+statements()[stat][0]) stats = c.fetchall() conn.close() return stats def get_mods(league_name, player:str, team:str): """returns a player's modifications dict""" conn = create_connection(league_name) if conn is not None: c = conn.cursor() c.execute("SELECT * FROM stats WHERE name=? AND team_name=?", (player, team)) #check stats table to make sure player actually exists in the league row = c.fetchone() if row is None: return False c.execute("SELECT modifications_json FROM mods WHERE name=? AND team_name=?", (player, team)) mod_string = c.fetchone() if mod_string is None: return None mods = json.loads(mod_string[0]) conn.close() return mods return False def get_team_mods(league_name, team:str): """returns a dictionary of player modifications belonging to a team""" conn = create_connection(league_name) if conn is not None: c = conn.cursor() c.execute("SELECT * FROM stats WHERE team_name=?", (team,)) #check stats table to make sure team actually exists in the league row = c.fetchone() if row is None: return False c.execute("SELECT name, modifications_json FROM mods WHERE team_name=?",(team,)) rows = c.fetchall() if len(rows) == 0: return None mods_dic = {} for row in rows: mods_dic[row[0]] = json.loads(row[1]) return mods_dic def set_mods(league_name, player:str, team:str, modifications:dict): """Overwrites a player's modifications with an entirely new set""" conn = create_connection(league_name) if conn is not None: c = conn.cursor() c.execute("SELECT * FROM stats WHERE name=? AND team_name=?", (player, team)) #check stats table to make sure player actually exists in the league row = c.fetchone() if row is None: return False mod_string = json.dumps(modifications) c.execute("SELECT counter FROM mods WHERE name=? AND team_name=?", (player, team)) #check stats table to make sure player actually exists in the league counter = c.fetchone() if counter is None: c.execute("INSERT INTO mods(name, team_name, modifications_json) VALUES (?,?,?)", (player, team, mod_string)) else: c.execute("UPDATE mods SET modifications_json = ? WHERE counter=?", (mod_string, counter)) conn.commit() conn.close return True return False def update_standings(league_name, update_dic): if league_exists(league_name): conn = create_connection(league_name) if conn is not None: c = conn.cursor() for team_name in update_dic.keys(): for stat_type in update_dic[team_name].keys(): #wins, losses, run_diff c.execute(f"SELECT {stat_type} FROM teams WHERE name = ?", (team_name,)) old_value = int(c.fetchone()[0]) c.execute(f"UPDATE teams SET {stat_type} = ? WHERE name = ?", (update_dic[team_name][stat_type]+old_value, team_name)) conn.commit() conn.close() def get_standings(league_name): if league_exists(league_name): conn = create_connection(league_name) if conn is not None: c = conn.cursor() c.execute("SELECT name, wins, losses, run_diff FROM teams",) standings_array = c.fetchall() conn.close() return standings_array def season_save(league): if league_exists(league.name): seasons = 1 with os.scandir(os.path.join(data_dir, league_dir, league.name)) as folder: for item in folder: if "." not in item.name: seasons += 1 new_dir = os.path.join(data_dir, league_dir, league.name, str(seasons)) os.makedirs(new_dir) with os.scandir(os.path.join(data_dir, league_dir, league.name)) as folder: for item in folder: if "." in item.name: os.rename(os.path.join(data_dir, league_dir, league.name, item.name), os.path.join(new_dir, item.name)) def season_restart(league): if league_exists(league.name): with os.scandir(os.path.join(data_dir, league_dir, league.name)) as folder: for item in folder: if "." in item.name: os.remove(os.path.join(data_dir, league_dir, league.name, item.name)) def get_past_standings(league_name, season_num): if league_exists(league_name): with os.scandir(os.path.join(data_dir, league_dir, league_name)) as folder: for item in folder: if item.name == str(season_num): conn = create_season_connection(league_name, str(item.name)) if conn is not None: c = conn.cursor() c.execute("SELECT name, wins, losses, run_diff FROM teams",) standings_array = c.fetchall() conn.close() return standings_array def get_past_champion(league_name, season_num): if league_exists(league_name): with os.scandir(os.path.join(data_dir, league_dir, league_name)) as folder: for item in folder: if item.name == str(season_num): with open(os.path.join(data_dir, league_dir, league_name, item.name, f"{league_name}.state")) as state_file: state_dic = json.load(state_file) return state_dic["champion"] def league_exists(league_name): with os.scandir(os.path.join(data_dir, league_dir)) as folder: for subfolder in folder: if league_name == subfolder.name: with os.scandir(subfolder.path) as league_folder: for item in league_folder: if item.name == f"{league_name}.db": return True return False