# these functions are pretty general (file that can be reused across projects)
import argparse
import logging
import os
import sys
from datetime import datetime
from urllib.parse import urlparse, urlunsplit
import pandas as pd
# import yaml
## CHECKS
[docs]
def assert_path(filepath: str):
"""
Check that the given filepath is a string and that it exists.
Parameters
----------
filepath : str
The filepath or folder path to check.
Raises
------
TypeError
If the filepath is not a string.
FileNotFoundError
If the filepath does not exist.
Example
-------
>>> assert_path("..")
>>> assert_path("./tests")
"""
if not isinstance(filepath, str):
raise TypeError(f"filepath must be a string: {type(filepath)}")
if not os.path.exists(os.path.abspath(filepath)):
raise FileNotFoundError(f"The specified path does not exist: {filepath}")
[docs]
def create_folder(directory_path: str, is_nested: bool = False) -> bool:
"""
Create a folder if it doesn't exist.
Parameters
----------
directory_path : str
The path of the directory to create.
is_nested : bool, optional
Whether to create nested directories (True uses os.makedirs, False uses os.mkdir), by default False.
Returns
-------
bool
True if the folder was created, False if it already existed.
Raises
------
TypeError
If directory_path is not a string.
ValueError
If directory_path is an existing file.
OSError
If there is an error creating the directory.
"""
# PRECONDITION CHECK
if not isinstance(directory_path, str):
raise TypeError(f"filepath must be a string: {type(directory_path)}")
abs_path = os.path.abspath(directory_path)
# make sure it is a folder not a file
if os.path.isfile(abs_path):
raise ValueError(
f"directory_path is an existing file when it should be a folder/foldername: {abs_path}"
)
# if folder already exists
elif os.path.isdir(abs_path):
return False
# create the folder(s)
else:
try:
if is_nested:
# Create the directory and any necessary parent directories
os.makedirs(directory_path, exist_ok=True)
return True
else:
# Create only the final directory (not nested)
os.mkdir(directory_path)
return True
except OSError as e:
raise OSError(f"Error creating directory '{directory_path}': {e}") from e
[docs]
def assert_nonempty_keys(dictionary: dict):
"""
Check that the keys in a dictionary are not empty strings.
Parameters
----------
dictionary : dict
A dictionary (e.g., config file).
Raises
------
AssertionError
If dictionary is not a dict or if any key is empty or blank.
"""
# PRECONDITIONS
if not isinstance(dictionary, dict):
raise TypeError(f"dictionary must be a dict, got {type(dictionary)}")
# MAIN FUNCTION
for key in dictionary:
if type(key) is str:
assert key, f'There is an empty key (e.g., ""): {key, dictionary.keys()}'
assert (
key.strip()
), f'There is a blank key (e.g., space, " "): {key, dictionary.keys()}'
[docs]
def assert_nonempty_vals(dictionary: dict):
"""
Check that the values in a dictionary are not empty strings.
Parameters
----------
dictionary : dict
A dictionary (e.g., config file).
Raises
------
AssertionError
If dictionary is not a dict or if any value is empty or blank.
"""
# PRECONDITIONS
if not isinstance(dictionary, dict):
raise TypeError(f"dictionary must be a dict, got {type(dictionary)}")
# MAIN FUNCTION
for v in dictionary.items():
if type(v) is str:
assert v, f'There is an empty key (e.g., ""): {v, dictionary.items()}'
assert (
v.strip()
), f'There is a blank key (e.g., space, " "): {v, dictionary.items()}'
[docs]
def normalize_url(host: str, port: int, scheme: str = "http") -> str:
"""
Normalize the given URL, ensuring it starts with the specified scheme.
Parameters
----------
host : str
The host to be normalized.
port : int
The port number.
scheme : str, optional
The URL scheme (default is "http").
Returns
-------
str
The normalized URL.
Raises
------
TypeError
If host, port, or scheme are not of the correct type, or if URL cannot be normalized.
Examples
--------
>>> normalize_url("localhost", 7474)
'http://localhost:7474'
>>> normalize_url("example.com", 80, "bolt")
'bolt://example.com:80'
"""
## PRECONDITIONS
if not isinstance(host, str):
raise TypeError(f"host should be a str e.g., 'localhost': {type(host)}")
if not isinstance(port, int):
raise TypeError(f"port must be int e.g., '7474': {type(port)}")
if not isinstance(scheme, str):
raise TypeError(f"scheme must be str: {type(scheme)}")
## MAIN FUNCTION
if not urlparse(host).netloc:
host = urlunsplit([scheme, host, "", "", ""])
# Remove any trailing slashes
url = host.rstrip("/")
# Add the port
url = f"{url}:{str(port)}"
## POSTCOND CHECKS
if not urlparse(url).netloc:
raise TypeError(f"Unable to normalize url: {url}")
return url
[docs]
def get_args(prog_name: str, others: dict = None):
"""
Initiate argparse.ArgumentParser() and add common arguments.
Parameters
----------
prog_name : str
The name of the program.
others : dict, optional
Additional keyword arguments for ArgumentParser, by default {}.
Returns
-------
argparse.Namespace
Parsed command-line arguments.
Raises
------
TypeError
If prog_name is not a string or others is not a dict.
"""
### PRECONDITIONS
if not isinstance(prog_name, str):
raise TypeError(f"prog_name should be a string: {type(prog_name)}")
if others is None:
others = {}
elif not isinstance(others, dict):
raise TypeError(f"other kwargs must be a dict: {type(others)}")
## MAIN FUNCTION
# init
parser = argparse.ArgumentParser(prog=prog_name, **others)
# config file path
parser.add_argument(
"-c",
"--config",
action="store",
default="demo/config.yaml",
help="provide path to config yaml file",
)
args = parser.parse_args()
return args
[docs]
def get_basename(fname: None | str = None) -> str:
"""
Get the basename of a given filename, without file extension.
If no filename is given, returns the basename of the current script.
Parameters
----------
fname : str or None, optional
The filename to get basename of, or None (default is None).
Returns
-------
str
Basename of the given filepath or the current file the function is executed in.
"""
if fname is not None:
# PRECONDITION
assert_path(fname)
# MAIN FUNCTIONS
return os.path.splitext(os.path.basename(fname))[0]
else:
return os.path.splitext(os.path.basename(sys.argv[0]))[0]
[docs]
def get_time(incl_time: bool = True, incl_timezone: bool = True) -> str:
"""
Get current date, time (optional), and timezone (optional) for file naming.
Parameters
----------
incl_time : bool, optional
Whether to include timestamp in the string (default is True).
incl_timezone : bool, optional
Whether to include the timezone in the string (default is True).
Returns
-------
str
String including date, timestamp and/or timezone, e.g. 'yyyyMMdd_hhmm_timezone'.
Raises
------
TypeError
If incl_time or incl_timezone are not bool.
AssertionError
If the output format is not as expected.
"""
# PRECONDITIONALS
if not isinstance(incl_time, bool):
raise TypeError("incl_time must be True or False")
if not isinstance(incl_timezone, bool):
raise TypeError("incl_timezone must be True or False")
# MAIN FUNCTION
# getting current time and timezone
the_time = datetime.now()
timezone = datetime.now().astimezone().tzname()
# convert date parts to string
y = str(the_time.year)
M = str(the_time.month)
d = str(the_time.day)
h = str(the_time.hour)
m = str(the_time.minute)
s = str(the_time.second)
# putting date parts into one string
if incl_time and incl_timezone:
fname = "_".join([y + M + d, h + m + s, timezone])
elif incl_time:
fname = "_".join([y + M + d, h + m + s])
elif incl_timezone:
fname = "_".join([y + M + d, timezone])
else:
fname = y + M + d
# POSTCONDITIONALS
parts = fname.split("_")
if incl_time and incl_timezone:
assert len(parts) == 3, f"time and/or timezone inclusion issue: {fname}"
elif incl_time or incl_timezone:
assert len(parts) == 2, f"time/timezone inclusion issue: {fname}"
else:
assert len(parts) == 1, f"time/timezone inclusion issue: {fname}"
return fname
[docs]
def generate_log_filename(folder: str = "logs", suffix: str = "") -> str:
"""
Create a log file name and path.
Parameters
----------
folder : str, optional
Name of the folder to put the log file in (default is "logs").
suffix : str, optional
Additional string to add to the log file name (default is "").
Returns
-------
str
The file path to the log file.
"""
# PRECONDITIONS
create_folder(folder)
# MAIN FUNCTION
log_filename = get_time(incl_timezone=False) + "_" + suffix + ".log"
log_filepath = os.path.join(folder, log_filename)
return log_filepath
[docs]
def init_log(filename: str, display: bool = False, logger_id: str | None = None):
"""
Configure a custom Python logger with file and optional stdout handlers.
Parameters
----------
filename : str
Filepath to log record file.
display : bool, optional
Whether to print the logs to standard output (default is False).
logger_id : str or None, optional
An optional identifier for the logger. If None, defaults to 'root'.
Returns
-------
logging.Logger
Configured logger object.
Raises
------
TypeError
If filename is not a string or logger_id is not a string or None.
"""
# PRECONDITIONS
if not isinstance(filename, str):
raise TypeError(f"filename must be a string: {filename}")
if not (isinstance(logger_id, str) or logger_id is None):
raise TypeError("logger_id must be a string or None")
# MAIN FUNCTION
# init handlers
file_handler = logging.FileHandler(filename=filename)
stdout_handler = logging.StreamHandler(stream=sys.stdout)
if display:
handlers = [file_handler, stdout_handler]
else:
handlers = [file_handler]
# logger configuration
logging.basicConfig(
# level=logging.DEBUG,
format="[%(asctime)s] %(name)s: %(levelname)s - %(message)s",
handlers=handlers,
)
logging.getLogger("matplotlib.font_manager").disabled = True
# instantiate the logger
logger = logging.getLogger(logger_id)
logger.setLevel(logging.DEBUG)
return logger
[docs]
def get_logger():
"""
Initialize and return a logger with a log file named after the current script.
Returns
-------
logging.Logger
Configured logger object.
"""
# get log suffix, which will be the current script's base file name
log_suffix = get_basename()
# generate log file name
log_file = generate_log_filename(suffix=log_suffix)
# init logger
logger = init_log(log_file, display=True)
# log it
logger.info(f"Path to log file: {log_file}")
return logger
# FUNCTIONS FOR CONFIG
# def config_loader(filepath: str) -> dict:
# """
# Load a YAML config file as a dictionary.
# Parameters
# ----------
# filepath : str
# Path to the config file.
# Returns
# -------
# dict
# Configuration parameters as a dictionary.
# """
# # PRECONDITIONS
# assert_path(filepath)
# # MAIN FUNCTION
# with open(filepath, "r") as f:
# contents = yaml.safe_load(f)
# # POSTCONDITIONS
# assert isinstance(contents, dict), "content not returned as a dict"
# return contents
[docs]
def df_joiner(
df_dict: dict[pd.DataFrame],
on: str,
how: str = "outer",
) -> pd.DataFrame:
"""
Join multiple dataframes on a common column.
Parameters
----------
df_dict : dict of pandas.DataFrame
Dictionary of dataframes to join.
on : str, optional
Column to join on. Defaults to "taxa".
how : str, optional
Type of join. Defaults to "outer".
Returns
-------
pandas.DataFrame
Joined dataframe.
"""
## PRECONDITION CHECKS
if not isinstance(df_dict, dict):
raise TypeError(f"df_dict must be a dict: {type(df_dict)}")
if not isinstance(on, str):
raise TypeError(f"on must be a str: {type(on)}")
for key, df in df_dict.items():
if not isinstance(df, pd.DataFrame):
raise TypeError(f"df_dict values must be pd.DataFrame: {type(df)}")
if (on not in df.columns) and (on not in df.index.names):
raise ValueError(f"Column '{on}' not found in dataframe with key '{key}'")
if how not in ["left", "right", "outer", "inner"]:
raise ValueError(f"how must be one of 'left', 'right', 'outer', 'inner': {how}")
## MAIN FUNCTION
# dfs into a list
df_list = list(df_dict.values())
# init the merged df with the first one
df_merged = df_list[0]
# for all others, merge iteratively
for df in df_list[1:]:
df_merged = pd.merge(df_merged, df, on=on, how=how)
return df_merged