Commit 20df4ab3 authored by Sean Solari's avatar Sean Solari
Browse files

Refactored codes into isolated modules

parent 165c87f7
......@@ -5,7 +5,7 @@ from setuptools.extension import Extension
from Cython.Build import cythonize
import numpy as np
EXPAM_VERSION = (0, 0, 9)
EXPAM_VERSION = (1, 0, 0)
SOURCE = os.path.dirname(os.path.abspath(__file__))
......@@ -16,21 +16,21 @@ with open(os.path.join(SOURCE, "README.md"), mode="r", encoding="utf-8") as f:
# Extension instances for Cython scripts.
extensions = [
Extension(
"map",
sources=["src/expam/c/map.pyx"],
"expam.ext.kmers._build",
sources=["src/expam/ext/kmers/extract.pyx", "src/expam/ext/kmers/kmers.c", "src/expam/ext/kmers/jellyfish.c"],
include_dirs=[np.get_include()],
extra_compile_args=["-std=c99"],
define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")]
),
Extension(
"extract",
sources=["src/expam/c/extract.pyx", "src/expam/c/kmers.c", "src/expam/c/jellyfish.c"],
"expam.ext.map._build",
sources=["src/expam/ext/map/map.pyx"],
include_dirs=[np.get_include()],
extra_compile_args=["-std=c99"],
define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")]
),
Extension(
"sets",
sources=["src/expam/c/sets.pyx", "src/expam/c/mfil.c"],
"expam.ext.sets._build",
sources=["src/expam/ext/sets/sets.pyx", "src/expam/ext/sets/mfil.c"],
include_dirs=[np.get_include()],
extra_compile_args=["-std=c99"],
define_macros=[("NPY_NO_DEPRECATED_API", "NPY_1_7_API_VERSION")]
......@@ -82,7 +82,7 @@ setup(
#
# Cython modules.
#
ext_package="expam.c",
ext_package="expam.ext",
ext_modules=cythonize(extensions, language_level="3"),
#
# Make main callable from console.
......
#!/usr/bin/env python3
def main():
...
if __name__ == "__main__":
...
\ No newline at end of file
import expam.classification
import expam.processes
import expam.run
import expam.run
import expam.sequences
import expam.stores
import expam.tree
import gzip
COMPRESSION_EXTNS = ['.tar.gz', '.tar', '.gz']
DEFAULT_MODE = "rb"
DEFAULT_OPENER = open
COMP_PARSE = {
".tar.gz": {"mode": "rb", "opener": gzip.open},
".gz": {"mode": "rb", "opener": gzip.open}
}
from .main import ExpamOptions, clear_logs, CommandGroup, PlotLogs
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
This source diff could not be displayed because it is too large. You can view the blob instead.
from collections import namedtuple
import os
ENTREZ_EMAIL = "sean.solari@monash.edu"
ENTREZ_TOOL = "expam"
ENTREZ_API = "8e5edfa1413576bca4f48b4a5520f6295308"
_STEP = 500 # NCBI Database searches at a time.
PHY_RESULTS = "phy"
TAX_RESULTS = "tax"
RAW_RESULTS = "raw"
TEMP_RESULTS = "temp"
CLASSIFIED_NAME = "classified.tsv"
SPLIT_NAME = "split.tsv"
PHY_RAW = os.path.join(PHY_RESULTS, RAW_RESULTS)
TAX_RAW = os.path.join(TAX_RESULTS, RAW_RESULTS)
PHY_CLASSIFIED_FILE = os.path.join(PHY_RESULTS, CLASSIFIED_NAME)
PHY_SPLIT_FILE = os.path.join(PHY_RESULTS, SPLIT_NAME)
TAX_CLASSIFIED_FILE = os.path.join(TAX_RESULTS, CLASSIFIED_NAME)
TAX_SPLIT_FILE = os.path.join(TAX_RESULTS, SPLIT_NAME)
ResultsPathConfig = namedtuple(
'ResultsPathConfig',
[
'phy', 'tax',
'temp',
'phy_raw', 'tax_raw',
'phy_classified', 'phy_split',
'tax_classified', 'tax_split'
]
)
from genericpath import isfile
from math import floor
from multiprocessing import shared_memory
import os
import re
import requests
import time
from ctypes import *
from math import floor
from multiprocessing import Array, shared_memory, Value
from os.path import join
from typing import Union
import numpy as np
import pandas as pd
from expam.processes import ReadWorker, ClassifyWorker
from expam.run import EXPAM_TEMP_EXT, TIMEOUT, UNION_RATIO, ControlCenter
from expam.sequences import ls
from expam.stores import TablesDb
from expam.tree import Index
#
# Constants.
#
from expam.run import KEYS_DATA_TYPE, \
KEYS_DATA_TYPE_SIZE, \
MAP_DATA_TYPE, \
MAP_DATA_TYPE_SIZE, \
LCA_MATRIX_PATH, \
DB_PATH, \
ACCESSION_ID_PATH
ENTREZ_EMAIL = "ssol0002@student.monash.edu"
ENTREZ_TOOL = "expam"
ENTREZ_API = "8e5edfa1413576bca4f48b4a5520f6295308"
_STEP = 500 # NCBI Database searches at a time.
PHY_RESULTS = "phy"
TAX_RESULTS = "tax"
RAW_RESULTS = "raw"
TEMP_RESULTS = "temp"
CLASSIFICATIONS_NAME = "classifications.csv"
SPLITS_NAME = "splits.csv"
FINAL_NAME = "results.csv"
TAXID_LINEAGE_MAP_NAME = os.path.join("phylogeny", "taxid_lineage.csv")
TAXON_RANK_MAP_NAME = os.path.join("phylogeny", "taxa_rank.csv")
RANK_ORDER = ["0", "root", "top", "superkingdom", "kingdom", "subkingdom", "superclade", "clade", "subclade",
"superphylum", "phylum", "subphylum", "superclass", "class", "subclass", "superorder", "order",
"suborder", "superfamily", "family", "subfamily", "supergenus", "genus", "subgenus", "species group",
"species subgroup", "superspecies", "species", "subspecies", "serogroup", "serotype", "superstrain",
"strain", "substrain", "no rank"]
TAX_RESULTS_HEADER = ["taxid", "%ClassifiedCumulative", "ClassifiedCumulative", "ClassifiedCount",
"%SplitCumulative", "SplitCumulative", "SplitCount", "Rank", "Lineage"]
#
# Functions.
#
def prepare_results(n):
# Create a shared array to put fully classified reads.
classifications = Array(c_uint32, n, lock=True)
splits = Array(c_uint32, n, lock=True)
unclassified = Value(c_uint32, lock=True)
return classifications, splits, unclassified
def name_to_id(phylogeny_path):
# Load Index from Newick file.
with open(phylogeny_path, "r") as f:
newick_str = f.read().strip()
_, indexMap = Index.from_newick(newick_str)
# Define the index dictionary and phi mapping.
index = {}
for i, loc in enumerate(indexMap.pool):
# Define the name mapping.
index[loc.name] = i
return index, indexMap
def string_to_tuple(st):
opener = st.split("(")[1]
number_string = opener.split(")")[0]
return tuple(int(i) for i in number_string.split(",") if i != '')
from expam.classify import ResultsPathConfig
from expam.classify.config import load_results_config
from expam.classify.process import ExpamClassifierProcesses
from expam.classify.taxonomy import TaxonomyNCBI
from expam.database import EXPAM_TEMP_EXT, TIMEOUT, UNION_RATIO, FileLocationConfig
from expam.database.config import load_database_config
from expam.database.db import SharedDB
from expam.process.classify import ClassifyWorker
from expam.process.manager import ControlCenter
from expam.process.reads import ReadWorker
from expam.tree.tree import Index
from expam.utils import ls, yield_csv
def run_classifier(
read_paths: str,
out_dir: str,
db_dir: str,
k: int,
n: int,
phylogeny_path: str,
keys_shape: tuple[int],
values_shape: tuple[int],
logging_dir: str,
taxonomy: bool = False, cutoff: float = 0.0, groups: Union[None, list[tuple]] = None,
keep_zeros: bool = False, cpm: float = 0.0, use_node_names: bool = True,
phyla: bool = False, name_taxa: Union[dict[str, str], None] = None,
colour_list: list[str] = None, paired_end: bool = False, alpha: float = 1.0,
log_scores: bool = False, itol_mode: bool = False
):
output_config: ResultsPathConfig = load_results_config(out_dir)
database_config: FileLocationConfig = load_database_config(db_dir)
def run_classifier(read_paths, out_dir, db_dir, k, n, phylogeny_path, keys_shape, values_shape, logging_dir,
taxonomy=False, cutoff=0.0, groups=None, keep_zeros=False, cpm=0.0, use_node_names=True,
phyla=False, name_taxa=None, colour_list=None, circle_scale=1.0,
paired_end=False, alpha=1.0, log_scores=False, itol_mode=False):
# Verify results path.
if not os.path.exists(out_dir):
print("Creating results path %s." % out_dir)
os.mkdir(out_dir)
else:
print("Using path %s for results." % out_dir)
print("Loading the map and phylogeny.\n")
# Load the kmer dictionary.
db_kmers = SharedDB(db_dir, keys_shape, values_shape)
phy_results_path = os.path.join(out_dir, PHY_RESULTS)
raw_results_path = os.path.join(phy_results_path, RAW_RESULTS)
temp_results_path = os.path.join(out_dir, TEMP_RESULTS)
for new_path in (phy_results_path, raw_results_path, temp_results_path):
os.mkdir(new_path)
print("Loading the map and phylogeny.\n")
db_kmers: SharedDB = SharedDB(db_dir, keys_shape, values_shape)
try:
# Get name mapping.
index, phylogeny_index = name_to_id(phylogeny_path)
# Load LCA Matrix.
lca_matrix = np.load(join(db_dir, LCA_MATRIX_PATH))
lca_matrix = np.load(database_config.lca_matrix)
# Initialise the distribution.
d = Distribution(
......@@ -127,7 +60,7 @@ def run_classifier(read_paths, out_dir, db_dir, k, n, phylogeny_path, keys_shape
read_paths=read_paths,
out_dir=out_dir,
logging_dir=logging_dir,
temp_dir=temp_results_path,
temp_dir=output_config.temp,
paired_end=paired_end,
alpha=alpha
)
......@@ -136,11 +69,7 @@ def run_classifier(read_paths, out_dir, db_dir, k, n, phylogeny_path, keys_shape
d.run(n)
# Load accession ids.
with open(join(db_dir, ACCESSION_ID_PATH), 'r') as f:
sequence_ids = [
line.strip().split(",")
for line in f
]
sequence_ids = list(yield_csv(database_config.accession_id))
# Update accession ids in map.
for (fname, accn_id, tax_id) in sequence_ids:
......@@ -154,7 +83,7 @@ def run_classifier(read_paths, out_dir, db_dir, k, n, phylogeny_path, keys_shape
results = ClassificationResults(
index=index,
phylogeny_index=phylogeny_index,
in_dir=phy_results_path,
in_dir=output_config.phy,
out_dir=out_dir,
groups=groups,
keep_zeros=keep_zeros,
......@@ -164,24 +93,22 @@ def run_classifier(read_paths, out_dir, db_dir, k, n, phylogeny_path, keys_shape
phyla=phyla,
name_taxa=name_taxa,
colour_list=colour_list,
circle_scale=circle_scale,
log_scores=log_scores
)
if taxonomy:
# Attempt to update taxon ids.
accession_to_taxonomy(db_dir)
tax_obj: TaxonomyNCBI = TaxonomyNCBI(database_config)
tax_obj.accession_to_taxonomy(db_dir)
tax_results_path = os.path.join(out_dir, TAX_RESULTS)
os.mkdir(tax_results_path)
tax_results_path = os.path.join(out_dir, output_config.tax)
os.mkdir(output_config.tax)
name_to_lineage, taxon_to_rank = load_taxonomy_map(db_dir)
name_to_lineage, taxon_to_rank = tax_obj.load_taxonomy_map(db_dir)
results.to_taxonomy(name_to_lineage, taxon_to_rank, tax_results_path)
results.draw_results(itol_mode=itol_mode)
finally:
# Close shared memory.
keys_shm = shared_memory.SharedMemory(
name=db_kmers.keys_shm_name,
......@@ -196,609 +123,21 @@ def run_classifier(read_paths, out_dir, db_dir, k, n, phylogeny_path, keys_shape
values_shm.unlink()
values_shm.close()
os.rmdir(temp_results_path)
def load_taxonomy_map(out_dir, convert_to_name=True):
def yield_lines(fh, watch=False):
for line in fh:
line = line.strip()
if line:
line = line.split(",")
yield line
name_tax_id_url = os.path.join(out_dir, ACCESSION_ID_PATH)
tax_id_lineage_url = os.path.join(out_dir, TAXID_LINEAGE_MAP_NAME)
taxa_rank_url = os.path.join(out_dir, TAXON_RANK_MAP_NAME)
# Create map from scientific name --> (taxid, rank).
taxon_data = {}
with open(taxa_rank_url, "r") as f:
for data in yield_lines(f):
taxon_data[",".join(data[0:-2])] = tuple(data[-2:])
# Create map from tax_id --> lineage (tuple).
tax_id_to_lineage = {}
with open(tax_id_lineage_url, "r") as f:
for data in yield_lines(f):
tax_id_to_lineage[data[0]] = tuple(data[1:])
if not convert_to_name:
return tax_id_to_lineage, taxon_data
# Create map from name --> lineage (tuple).
name_to_lineage = {}
with open(name_tax_id_url, "r") as f:
for data in yield_lines(f):
name_to_lineage[data[0]] = tax_id_to_lineage[data[2]]
return name_to_lineage, taxon_data
def load_sequence_map(out_dir):
sequence_data_url = os.path.join(out_dir, ACCESSION_ID_PATH)
if not os.path.exists(sequence_data_url):
return []
with open(sequence_data_url, "r") as f:
sequences_ids = [
line.strip().split(",")
for line in f
]
return sequences_ids
def load_taxid_lineage_map(out_dir):
map_url = os.path.join(out_dir, TAXID_LINEAGE_MAP_NAME)
if not os.path.exists(map_url):
return []
with open(map_url, "r") as f:
taxid_map = [
line.strip().split(",")
for line in f
]
return taxid_map
def load_rank_map(out_dir):
rank_url = os.path.join(out_dir, TAXON_RANK_MAP_NAME)
name_to_rank = {}
if not os.path.exists(rank_url):
return name_to_rank
with open(rank_url, "r") as f:
for line in f:
line = line.strip()
try:
name_index = line.index(",")
name_to_rank[line[:name_index]] = line[name_index + 1:]
except ValueError:
continue
return name_to_rank
def to_first_tab(string):
name = re.findall(r'^[^\t]+(?=\t)', string)
if not name:
print(string)
raise ValueError("No name found!")
else:
return name[0]
def accession_to_taxonomy(out_dir):
"""
Map accession IDs to taxonomic labels.
:sequence_ids: List - (sequence_id, accession_id, taxon_id)
:taxon_ranks: List - (taxon_id, taxonomic ranks)
:taxa_to_rank: Dict - taxon --> rank
"""
os.rmdir(output_config.temp)
def tuples_to_disk(lst):
return "\n".join([",".join(item) for item in lst])
def dict_to_disk(dct):
return "\n".join([",".join((key, value)) for key, value in dct.items()])
sequence_ids = load_sequence_map(out_dir)
taxon_ranks = load_taxid_lineage_map(out_dir)
taxa_to_rank = load_rank_map(out_dir)
# Collect taxon ids for unknown organisms.
accessions_to_be_mapped = []
taxa_to_be_collected = []
for (sequence_id, accession_id, taxon_id) in sequence_ids:
if taxon_id == "None":
accessions_to_be_mapped.append(accession_id)
else:
taxa_to_be_collected.append(taxon_id)
if accessions_to_be_mapped:
accession_to_tax = request_tax_ids("nuccore", accessions_to_be_mapped)
print("Received %d response(s) for ESummary TaxID request!"
% len(accession_to_tax))
for i in range(len(sequence_ids)):
if sequence_ids[i][1] in accession_to_tax:
sequence_ids[i][2] = accession_to_tax[sequence_ids[i][1]]
taxa_to_be_collected.append(sequence_ids[i][2])
# Collect taxonomic lineages for taxa.
current_taxa = {taxa[0] for taxa in taxon_ranks}
taxa_to_be_collected = { # Set so that we collect unique values.
taxon_id
for taxon_id in taxa_to_be_collected
if taxon_id not in current_taxa
}
if taxa_to_be_collected:
taxid_to_taxon, taxon_to_rank = request_labels("taxonomy", "xml", list(taxa_to_be_collected))
print("Received %d response(s) for EFetch Taxon request!"
% len(taxid_to_taxon))
taxon_ranks.extend(taxid_to_taxon)
taxa_to_rank.update(taxon_to_rank)
# Save update maps to disk.
with open(os.path.join(out_dir, ACCESSION_ID_PATH), "w") as f:
f.write(tuples_to_disk(sequence_ids))
with open(os.path.join(out_dir, TAXID_LINEAGE_MAP_NAME), "w") as f:
f.write(tuples_to_disk(taxon_ranks))
print("Taxonomic lineages written to %s!" % os.path.join(out_dir, TAXID_LINEAGE_MAP_NAME))
with open(os.path.join(out_dir, TAXON_RANK_MAP_NAME), "w") as f:
f.write(dict_to_disk(taxa_to_rank))
print("Taxonomic ranks written to %s!" % os.path.join(out_dir, TAXON_RANK_MAP_NAME))
def request_tax_ids(db, id_list):
POST_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esummary.fcgi"
taxids, upto = {}, 0
PARAMS = {
"tool": ENTREZ_TOOL,
"email": ENTREZ_EMAIL,
"api_key": ENTREZ_API,
"db": db,
}
while upto < len(id_list):
next_requests = id_list[upto:upto + _STEP]
print("Posting %d UIDs to NCBI Entrez %s."
% (len(next_requests), db))
PARAMS["id"] = ",".join(next_requests)
esummary_request = requests.post(
url=POST_URL,
data=PARAMS
)
# Parse TaxonIDs from raw results.
accn_id = tax_id = None
for line in esummary_request.text.split("\n"):
if "<DocSum>" in line: # Start new record.
accn_id = tax_id = None
elif 'AccessionVersion' in line:
accn_id = parse_id(line)
elif 'TaxId' in line:
tax_id = parse_tax_id(line)
elif "</DocSum>" in line: # Only save complete records.
if accn_id is not None and tax_id is not None:
taxids[accn_id] = tax_id
upto += _STEP
time.sleep(1.0) # Allow server time to breath.
return taxids
def parse_id(string):
new_id = re.findall(r'\<Item Name\="AccessionVersion" Type\="String"\>(.*?)\<\/Item\>', string)
if not new_id:
raise ValueError("No taxids found!")
else:
return new_id[0]
def parse_tax_id(string):
taxids = re.findall(r'\<Item Name\="TaxId" Type\="Integer"\>(.*?)\<\/Item\>', string)
if not taxids:
raise ValueError("No taxids found!")
else:
return taxids[0]
def request_labels(db, retmode, id_list):
POST_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
taxon_labels, ranks, upto = [], {}, 0
PARAMS = {
"tool": ENTREZ_TOOL,
"email": ENTREZ_EMAIL,
"api_key": ENTREZ_API,
"db": db,
"retmode": retmode,
}
while upto < len(id_list):