""" ``SNPs`` reads, writes, merges, and remaps genotype / raw data files.
"""
"""
BSD 3-Clause License
Copyright (c) 2019, Andrew Riha
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
import copy
from itertools import groupby, count
import logging
import os
import re
import warnings
import numpy as np
import pandas as pd
from pandas.api.types import CategoricalDtype
from snps.ensembl import EnsemblRestClient
from snps.resources import Resources
from snps.io import Reader, Writer, get_empty_snps_dataframe
from snps.utils import Parallelizer
logger = logging.getLogger(__name__)
[docs]
class SNPs:
[docs]
def __init__(
self,
file="",
only_detect_source=False,
assign_par_snps=False,
output_dir="output",
resources_dir="resources",
deduplicate=True,
deduplicate_XY_chrom=True,
deduplicate_MT_chrom=True,
parallelize=False,
processes=os.cpu_count(),
rsids=(),
):
"""Object used to read, write, and remap genotype / raw data files.
Parameters
----------
file : str or bytes
path to file to load or bytes to load
only_detect_source : bool
only detect the source of the data
assign_par_snps : bool
assign PAR SNPs to the X and Y chromosomes
output_dir : str
path to output directory
resources_dir : str
name / path of resources directory
deduplicate : bool
deduplicate RSIDs and make SNPs available as `SNPs.duplicate`
deduplicate_MT_chrom : bool
deduplicate alleles on MT; see `SNPs.heterozygous_MT`
deduplicate_XY_chrom : bool or str
deduplicate alleles in the non-PAR regions of X and Y for males; see `SNPs.discrepant_XY`
if a `str` then this is the sex determination method to use X Y or XY
parallelize : bool
utilize multiprocessing to speedup calculations
processes : int
processes to launch if multiprocessing
rsids : tuple, optional
rsids to extract if loading a VCF file
"""
self._file = file
self._only_detect_source = only_detect_source
self._snps = get_empty_snps_dataframe()
self._duplicate = get_empty_snps_dataframe()
self._discrepant_XY = get_empty_snps_dataframe()
self._heterozygous_MT = get_empty_snps_dataframe()
self._discrepant_vcf_position = get_empty_snps_dataframe()
self._low_quality = get_empty_snps_dataframe().index
self._discrepant_merge_positions = pd.DataFrame()
self._discrepant_merge_genotypes = pd.DataFrame()
self._source = []
self._phased = False
self._build = 0
self._build_detected = False
self._output_dir = output_dir
self._resources = Resources(resources_dir=resources_dir)
self._parallelizer = Parallelizer(parallelize=parallelize, processes=processes)
self._cluster = ""
self._chip = ""
self._chip_version = ""
if file:
d = self._read_raw_data(file, only_detect_source, rsids)
# Replace multiple rsids separated by commas in index with the first rsid. E.g. rs1,rs2 -> rs1
multi_rsids = {
multi_rsid: multi_rsid.split(",")[0]
for multi_rsid in list(
filter(lambda x: len(x.split(",")) > 1, d["snps"].index)
)
}
d["snps"].rename(index=multi_rsids, inplace=True)
self._snps = d["snps"]
self._source = (
d["source"].split(", ") if ", " in d["source"] else [d["source"]]
)
self._phased = d["phased"]
self._build = d["build"]
self._build_detected = True if d["build"] else False
if not self._snps.empty:
self.sort()
if deduplicate:
self._deduplicate_rsids()
# use build detected from `read` method or comments, if any
# otherwise use SNP positions to detect build
if not self._build_detected:
self._build = self.detect_build()
self._build_detected = True if self._build else False
if not self._build:
self._build = 37 # assume Build 37 / GRCh37 if not detected
else:
self._build_detected = True
if assign_par_snps:
self._assign_par_snps()
self.sort()
if deduplicate_XY_chrom:
if (
deduplicate_XY_chrom is True and self.determine_sex() == "Male"
) or self.determine_sex(chrom=deduplicate_XY_chrom) == "Male":
self._deduplicate_XY_chrom()
if deduplicate_MT_chrom:
self._deduplicate_MT_chrom()
else:
logger.warning("no SNPs loaded...")
def __len__(self):
return self.count
def __repr__(self):
if isinstance(self._file, str):
return f"SNPs('{os.path.basename(self._file)}')"
else:
return "SNPs(<bytes>)"
@property
def source(self):
"""Summary of the SNP data source(s).
Returns
-------
str
Data source(s) for this ``SNPs`` object, separated by ", ".
"""
return ", ".join(self._source)
@property
def snps(self):
"""Normalized SNPs.
Notes
-----
Throughout ``snps``, the "normalized ``snps`` dataframe" is defined as follows:
============= =================================== ===============
Column Description `pandas` dtype
============= =================================== ===============
rsid [*]_ SNP ID object (string)
chrom Chromosome of SNP object (string)
pos Position of SNP (relative to build) uint32
genotype [*]_ Genotype of SNP object (string)
============= =================================== ===============
.. [*] Dataframe index
.. [*] Genotype can be null, length 1, or length 2. Specifically, genotype is null if not
called or unavailable. Otherwise, for autosomal chromosomes, genotype is two alleles.
For the X and Y chromosomes, male genotypes are one allele in the non-PAR regions
(assuming `deduplicate_XY_chrom`). For the MT chromosome, genotypes are one allele
(assuming `deduplicate_MT_chrom`).
Returns
-------
pandas.DataFrame
normalized ``snps`` dataframe
"""
return self._snps
@property
def snps_qc(self):
"""Normalized SNPs, after quality control.
Any low quality SNPs, identified per
:meth:`identify_low_quality_snps() <snps.snps.SNPs.identify_low_quality_snps>`,
are not included in the result.
Returns
-------
pandas.DataFrame
normalized ``snps`` dataframe
"""
if len(self._low_quality) == 0:
# ensure low quality SNPs, if any, are identified
self.identify_low_quality_snps()
if len(self._low_quality) > 0:
# filter out low quality SNPs
return self._snps.drop(self._low_quality)
else:
# no low quality SNPs to filter
return self._snps
@property
def duplicate(self):
"""Duplicate SNPs.
A duplicate SNP has the same RSID as another SNP. The first occurrence
of the RSID is not considered a duplicate SNP.
Returns
-------
pandas.DataFrame
normalized ``snps`` dataframe
"""
return self._duplicate
@property
def discrepant_XY(self):
"""Discrepant XY SNPs.
A discrepant XY SNP is a heterozygous SNP in the non-PAR region of the X
or Y chromosome found during deduplication for a detected male genotype.
Returns
-------
pandas.DataFrame
normalized ``snps`` dataframe
"""
return self._discrepant_XY
@property
def heterozygous_MT(self):
"""Heterozygous SNPs on the MT chromosome found during deduplication.
Returns
-------
pandas.DataFrame
normalized ``snps`` dataframe
"""
return self._heterozygous_MT
@property
def discrepant_vcf_position(self):
"""SNPs with discrepant positions discovered while saving VCF.
Returns
-------
pandas.DataFrame
normalized ``snps`` dataframe
"""
return self._discrepant_vcf_position
@property
def low_quality(self):
"""SNPs identified as low quality, if any, per
:meth:`identify_low_quality_snps() <snps.snps.SNPs.identify_low_quality_snps>`.
Returns
-------
pandas.DataFrame
normalized ``snps`` dataframe
"""
if len(self._low_quality) == 0:
# ensure low quality SNPs, if any, are identified
self.identify_low_quality_snps()
return self._snps.loc[self._low_quality]
@property
def discrepant_merge_positions(self):
"""SNPs with discrepant positions discovered while merging SNPs.
Notes
-----
Definitions of columns in this dataframe are as follows:
============== ===========
Column Description
============== ===========
rsid SNP ID
chrom Chromosome of existing SNP
pos Position of existing SNP
genotype Genotype of existing SNP
chrom_added Chromosome of added SNP
pos_added Position of added SNP (discrepant with pos)
genotype_added Genotype of added SNP
============== ===========
Returns
-------
pandas.DataFrame
"""
return self._discrepant_merge_positions
@property
def discrepant_merge_genotypes(self):
"""SNPs with discrepant genotypes discovered while merging SNPs.
Notes
-----
Definitions of columns in this dataframe are as follows:
=============== ===========
Column Description
=============== ===========
rsid SNP ID
chrom Chromosome of existing SNP
pos Position of existing SNP
genotype Genotype of existing SNP
chrom_added Chromosome of added SNP
pos_added Position of added SNP
genotype_added Genotype of added SNP (discrepant with genotype)
=============== ===========
Returns
-------
pandas.DataFrame
"""
return self._discrepant_merge_genotypes
@property
def discrepant_merge_positions_genotypes(self):
"""SNPs with discrepant positions and / or genotypes discovered while merging SNPs.
Notes
-----
Definitions of columns in this dataframe are as follows:
=============== ===========
Column Description
=============== ===========
rsid SNP ID
chrom Chromosome of existing SNP
pos Position of existing SNP
genotype Genotype of existing SNP
chrom_added Chromosome of added SNP
pos_added Position of added SNP (possibly discrepant with pos)
genotype_added Genotype of added SNP (possibly discrepant with genotype)
=============== ===========
Returns
-------
pandas.DataFrame
"""
df = pd.concat(
[self._discrepant_merge_positions, self._discrepant_merge_genotypes]
)
if len(df) > 1:
df = df.drop_duplicates()
return df
@property
def build(self):
"""Build of SNPs.
Returns
-------
int
"""
return self._build
@property
def build_detected(self):
"""Status indicating if build of SNPs was detected.
Returns
-------
bool
"""
return self._build_detected
@property
def assembly(self):
"""Assembly of SNPs.
Returns
-------
str
"""
if self.build == 37:
return "GRCh37"
elif self.build == 36:
return "NCBI36"
elif self.build == 38:
return "GRCh38"
else:
return ""
@property
def count(self):
"""Count of SNPs.
Returns
-------
int
"""
return self.get_count()
@property
def chromosomes(self):
"""Chromosomes of SNPs.
Returns
-------
list
list of str chromosomes (e.g., ['1', '2', '3', 'MT'], empty list if no chromosomes
"""
if not self._snps.empty:
return list(pd.unique(self._snps["chrom"]))
else:
return []
@property
def chromosomes_summary(self):
"""Summary of the chromosomes of SNPs.
Returns
-------
str
human-readable listing of chromosomes (e.g., '1-3, MT'), empty str if no chromosomes
"""
if not self._snps.empty:
chroms = list(pd.unique(self._snps["chrom"]))
int_chroms = [int(chrom) for chrom in chroms if chrom.isdigit()]
str_chroms = [chrom for chrom in chroms if not chrom.isdigit()]
# https://codereview.stackexchange.com/a/5202
def as_range(iterable):
l = list(iterable)
if len(l) > 1:
return f"{l[0]}-{l[-1]}"
else:
return f"{l[0]}"
# create str representations
int_chroms = ", ".join(
as_range(g)
for _, g in groupby(int_chroms, key=lambda n, c=count(): n - next(c))
)
str_chroms = ", ".join(str_chroms)
if int_chroms != "" and str_chroms != "":
int_chroms += ", "
return int_chroms + str_chroms
else:
return ""
@property
def sex(self):
"""Sex derived from SNPs.
Returns
-------
str
'Male' or 'Female' if detected, else empty str
"""
sex = self.determine_sex(chrom="X")
if not sex:
sex = self.determine_sex(chrom="Y")
return sex
@property
def unannotated_vcf(self):
"""Indicates if VCF file is unannotated.
Returns
-------
bool
"""
if self.count == 0 and self.source == "vcf":
return True
return False
@property
def phased(self):
"""Indicates if genotype is phased.
Returns
-------
bool
"""
return self._phased
@property
def cluster(self):
"""Detected chip cluster, if any, per
:meth:`compute_cluster_overlap <snps.snps.SNPs.compute_cluster_overlap>`.
Notes
-----
Refer to :meth:`compute_cluster_overlap <snps.snps.SNPs.compute_cluster_overlap>`
for more details about chip clusters.
Returns
-------
str
detected chip cluster, e.g., 'c1', else empty str
"""
if not self._cluster:
self.compute_cluster_overlap()
return self._cluster
@property
def chip(self):
"""Detected deduced genotype / chip array, if any, per
:meth:`compute_cluster_overlap <snps.snps.SNPs.compute_cluster_overlap>`.
Returns
-------
str
detected chip array, else empty str
"""
if not self._chip:
self.compute_cluster_overlap()
return self._chip
@property
def chip_version(self):
"""Detected genotype / chip array version, if any, per
:meth:`compute_cluster_overlap <snps.snps.SNPs.compute_cluster_overlap>`.
Notes
-----
Chip array version is only applicable to 23andMe (v3, v4, v5) and AncestryDNA
(v1, v2) files.
Returns
-------
str
detected chip array version, e.g., 'v4', else empty str
"""
if not self._chip_version:
self.compute_cluster_overlap()
return self._chip_version
[docs]
def heterozygous(self, chrom=""):
"""Get heterozygous SNPs.
Parameters
----------
chrom : str, optional
chromosome (e.g., "1", "X", "MT")
Returns
-------
pandas.DataFrame
normalized ``snps`` dataframe
"""
df = self._filter(chrom)
return df.loc[
(df.genotype.notnull())
& (df.genotype.str.len() == 2)
& (df.genotype.str[0] != df.genotype.str[1])
]
[docs]
def homozygous(self, chrom=""):
"""Get homozygous SNPs.
Parameters
----------
chrom : str, optional
chromosome (e.g., "1", "X", "MT")
Returns
-------
pandas.DataFrame
normalized ``snps`` dataframe
"""
df = self._filter(chrom)
return df.loc[
(df.genotype.notnull())
& (df.genotype.str.len() == 2)
& (df.genotype.str[0] == df.genotype.str[1])
]
[docs]
def notnull(self, chrom=""):
"""Get not null genotype SNPs.
Parameters
----------
chrom : str, optional
chromosome (e.g., "1", "X", "MT")
Returns
-------
pandas.DataFrame
normalized ``snps`` dataframe
"""
df = self._filter(chrom)
return df.loc[df.genotype.notnull()]
@property
def summary(self):
"""Summary of SNPs.
Returns
-------
dict
summary info if ``SNPs`` is valid, else {}
"""
if not self.valid:
return {}
else:
return {
"source": self.source,
"assembly": self.assembly,
"build": self.build,
"build_detected": self.build_detected,
"count": self.count,
"chromosomes": self.chromosomes_summary,
"sex": self.sex,
}
@property
def valid(self):
"""Determine if ``SNPs`` is valid.
``SNPs`` is valid when the input file has been successfully parsed.
Returns
-------
bool
True if ``SNPs`` is valid
"""
if self.snps.empty:
return False
else:
return True
def save(
self,
filename="",
vcf=False,
atomic=True,
vcf_alt_unavailable=".",
vcf_qc_only=False,
vcf_qc_filter=False,
**kwargs,
):
warnings.warn(
"Method `save` has been replaced by `to_csv`, `to_tsv`, and `to_vcf`.",
DeprecationWarning,
)
return self._save(
filename,
vcf,
atomic,
vcf_alt_unavailable,
vcf_qc_only,
vcf_qc_filter,
**kwargs,
)
def _save(
self,
filename="",
vcf=False,
atomic=True,
vcf_alt_unavailable=".",
vcf_chrom_prefix="",
vcf_qc_only=False,
vcf_qc_filter=False,
**kwargs,
):
"""Save SNPs to file.
References
----------
1. Fluent Python by Luciano Ramalho (O'Reilly). Copyright 2015 Luciano Ramalho,
978-1-491-94600-8.
"""
if "sep" not in kwargs:
kwargs["sep"] = "\t"
w = Writer(
snps=self,
filename=filename,
vcf=vcf,
atomic=atomic,
vcf_alt_unavailable=vcf_alt_unavailable,
vcf_chrom_prefix=vcf_chrom_prefix,
vcf_qc_only=vcf_qc_only,
vcf_qc_filter=vcf_qc_filter,
**kwargs,
)
path, *extra = w.write()
if len(extra) == 1 and not extra[0].empty:
self._discrepant_vcf_position = extra[0]
self._discrepant_vcf_position.set_index("rsid", inplace=True)
logger.warning(
f"{len(self.discrepant_vcf_position)} SNP positions were found to be discrepant when saving VCF"
)
return path
[docs]
def to_csv(self, filename="", atomic=True, **kwargs):
"""Output SNPs as comma-separated values.
Parameters
----------
filename : str or buffer
filename for file to save or buffer to write to
atomic : bool
atomically write output to a file on local filesystem
**kwargs
additional parameters to `pandas.DataFrame.to_csv`
Returns
-------
str
path to file in output directory if SNPs were saved, else empty str
"""
kwargs["sep"] = ","
return self._save(filename=filename, atomic=atomic, **kwargs)
[docs]
def to_tsv(self, filename="", atomic=True, **kwargs):
"""Output SNPs as tab-separated values.
Note that this results in the same default output as `save`.
Parameters
----------
filename : str or buffer
filename for file to save or buffer to write to
atomic : bool
atomically write output to a file on local filesystem
**kwargs
additional parameters to `pandas.DataFrame.to_csv`
Returns
-------
str
path to file in output directory if SNPs were saved, else empty str
"""
kwargs["sep"] = "\t"
return self._save(filename=filename, atomic=atomic, **kwargs)
[docs]
def to_vcf(
self,
filename="",
atomic=True,
alt_unavailable=".",
chrom_prefix="",
qc_only=False,
qc_filter=False,
**kwargs,
):
"""Output SNPs as Variant Call Format.
Parameters
----------
filename : str or buffer
filename for file to save or buffer to write to
atomic : bool
atomically write output to a file on local filesystem
alt_unavailable : str
representation of ALT allele when ALT is not able to be determined
chrom_prefix : str
prefix for chromosomes in VCF CHROM column
qc_only : bool
output only SNPs that pass quality control
qc_filter : bool
populate FILTER column based on quality control results
**kwargs
additional parameters to `pandas.DataFrame.to_csv`
Returns
-------
str
path to file in output directory if SNPs were saved, else empty str
Notes
-----
Parameters `qc_only` and `qc_filter`, if true, will identify low quality SNPs per
:meth:`identify_low_quality_snps() <snps.snps.SNPs.identify_low_quality_snps>`,
if not done already. Moreover, these parameters have no effect if this SNPs
object does not map to a cluster per
:meth:`compute_cluster_overlap() <snps.snps.SNPs.compute_cluster_overlap>`.
References
----------
1. The Variant Call Format (VCF) Version 4.2 Specification, 8 Mar 2019,
https://samtools.github.io/hts-specs/VCFv4.2.pdf
"""
return self._save(
filename=filename,
vcf=True,
atomic=atomic,
vcf_alt_unavailable=alt_unavailable,
vcf_chrom_prefix=chrom_prefix,
vcf_qc_only=qc_only,
vcf_qc_filter=qc_filter,
**kwargs,
)
def _filter(self, chrom=""):
return self.snps.loc[self.snps.chrom == chrom] if chrom else self.snps
def _read_raw_data(self, file, only_detect_source, rsids):
r = Reader(file, only_detect_source, self._resources, rsids)
return r.read()
def _assign_par_snps(self):
"""Assign PAR SNPs to the X or Y chromosome using SNP position.
References
-----
1. National Center for Biotechnology Information, Variation Services, RefSNP,
https://api.ncbi.nlm.nih.gov/variation/v0/
2. Yates et. al. (doi:10.1093/bioinformatics/btu613),
`<http://europepmc.org/search/?query=DOI:10.1093/bioinformatics/btu613>`_
3. Zerbino et. al. (doi.org/10.1093/nar/gkx1098), https://doi.org/10.1093/nar/gkx1098
4. Sherry ST, Ward MH, Kholodov M, Baker J, Phan L, Smigielski EM, Sirotkin K.
dbSNP: the NCBI database of genetic variation. Nucleic Acids Res. 2001 Jan 1;
29(1):308-11.
5. Database of Single Nucleotide Polymorphisms (dbSNP). Bethesda (MD): National Center
for Biotechnology Information, National Library of Medicine. dbSNP accession:
rs28736870, rs113313554, and rs758419898 (dbSNP Build ID: 151). Available from:
http://www.ncbi.nlm.nih.gov/SNP/
"""
rest_client = EnsemblRestClient(
server="https://api.ncbi.nlm.nih.gov", reqs_per_sec=1
)
for rsid in self._snps.loc[self._snps["chrom"] == "PAR"].index.values:
if "rs" in rsid:
response = self._lookup_refsnp_snapshot(rsid, rest_client)
if response is not None:
for item in response["primary_snapshot_data"][
"placements_with_allele"
]:
if "NC_000023" in item["seq_id"]:
assigned = self._assign_snp(rsid, item["alleles"], "X")
elif "NC_000024" in item["seq_id"]:
assigned = self._assign_snp(rsid, item["alleles"], "Y")
else:
assigned = False
if assigned:
if not self._build_detected:
self._build = self._extract_build(item)
self._build_detected = True
break
def _lookup_refsnp_snapshot(self, rsid, rest_client):
id = rsid.split("rs")[1]
response = rest_client.perform_rest_action("/variation/v0/refsnp/" + id)
if "merged_snapshot_data" in response:
# this RefSnp id was merged into another
# we'll pick the first one to decide which chromosome this PAR will be assigned to
merged_id = "rs" + response["merged_snapshot_data"]["merged_into"][0]
logger.info(f"SNP id {rsid} has been merged into id {merged_id}")
return self._lookup_refsnp_snapshot(merged_id, rest_client)
elif "nosnppos_snapshot_data" in response:
logger.warning(f"Unable to look up SNP id {rsid}")
return None
else:
return response
def _assign_snp(self, rsid, alleles, chrom):
# only assign SNP if positions match (i.e., same build)
for allele in alleles:
allele_pos = allele["allele"]["spdi"]["position"]
# ref SNP positions seem to be 0-based...
if allele_pos == self._snps.loc[rsid].pos - 1:
self._snps.loc[rsid, "chrom"] = chrom
return True
return False
def _extract_build(self, item):
assembly_name = item["placement_annot"]["seq_id_traits_by_assembly"][0][
"assembly_name"
]
assembly_name = assembly_name.split(".")[0]
return int(assembly_name[-2:])
[docs]
def detect_build(self):
"""Detect build of SNPs.
Use the coordinates of common SNPs to identify the build / assembly of a genotype file
that is being loaded.
Notes
-----
* rs3094315 : plus strand in 36, 37, and 38
* rs11928389 : plus strand in 36, minus strand in 37 and 38
* rs2500347 : plus strand in 36 and 37, minus strand in 38
* rs964481 : plus strand in 36, 37, and 38
* rs2341354 : plus strand in 36, 37, and 38
* rs3850290 : plus strand in 36, 37, and 38
* rs1329546 : plus strand in 36, 37, and 38
Returns
-------
int
detected build of SNPs, else 0
References
----------
1. Yates et. al. (doi:10.1093/bioinformatics/btu613),
`<http://europepmc.org/search/?query=DOI:10.1093/bioinformatics/btu613>`_
2. Zerbino et. al. (doi.org/10.1093/nar/gkx1098), https://doi.org/10.1093/nar/gkx1098
3. Sherry ST, Ward MH, Kholodov M, Baker J, Phan L, Smigielski EM, Sirotkin K.
dbSNP: the NCBI database of genetic variation. Nucleic Acids Res. 2001
Jan 1;29(1):308-11.
4. Database of Single Nucleotide Polymorphisms (dbSNP). Bethesda (MD): National Center
for Biotechnology Information, National Library of Medicine. dbSNP accession: rs3094315,
rs11928389, rs2500347, rs964481, rs2341354, rs3850290, and rs1329546
(dbSNP Build ID: 151). Available from: http://www.ncbi.nlm.nih.gov/SNP/
"""
def lookup_build_with_snp_pos(pos, s):
try:
return int(s.loc[s == pos].index[0])
except:
return 0
build = 0
rsids = [
"rs3094315",
"rs11928389",
"rs2500347",
"rs964481",
"rs2341354",
"rs3850290",
"rs1329546",
]
df = pd.DataFrame(
{
36: [
742429,
50908372,
143649677,
27566744,
908436,
22315141,
135302086,
],
37: [
752566,
50927009,
144938320,
27656823,
918573,
23245301,
135474420,
],
38: [
817186,
50889578,
148946169,
27638706,
983193,
22776092,
136392261,
],
},
index=rsids,
)
for rsid in rsids:
if rsid in self._snps.index:
build = lookup_build_with_snp_pos(
self._snps.loc[rsid].pos, df.loc[rsid]
)
if build:
break
return build
[docs]
def get_count(self, chrom=""):
"""Count of SNPs.
Parameters
----------
chrom : str, optional
chromosome (e.g., "1", "X", "MT")
Returns
-------
int
"""
return len(self._filter(chrom))
[docs]
def determine_sex(
self,
heterozygous_x_snps_threshold=0.03,
y_snps_not_null_threshold=0.3,
chrom="X",
):
"""Determine sex from SNPs using thresholds.
Parameters
----------
heterozygous_x_snps_threshold : float
percentage heterozygous X SNPs; above this threshold, Female is determined
y_snps_not_null_threshold : float
percentage Y SNPs that are not null; above this threshold, Male is determined
chrom : {"X", "Y"}
use X or Y chromosome SNPs to determine sex
Returns
-------
str
'Male' or 'Female' if detected, else empty str
"""
if not self._snps.empty:
if chrom == "X":
return self._determine_sex_X(heterozygous_x_snps_threshold)
elif chrom == "Y":
return self._determine_sex_Y(y_snps_not_null_threshold)
return ""
def _determine_sex_X(self, threshold):
x_snps = self.get_count("X")
if x_snps > 0:
if len(self.heterozygous("X")) / x_snps > threshold:
return "Female"
else:
return "Male"
else:
return ""
def _determine_sex_Y(self, threshold):
y_snps = self.get_count("Y")
if y_snps > 0:
if len(self.notnull("Y")) / y_snps > threshold:
return "Male"
else:
return "Female"
else:
return ""
def _get_non_par_start_stop(self, chrom):
# get non-PAR start / stop positions for chrom
pr = self.get_par_regions(self.build)
np_start = pr.loc[(pr.chrom == chrom) & (pr.region == "PAR1")].stop.values[0]
np_stop = pr.loc[(pr.chrom == chrom) & (pr.region == "PAR2")].start.values[0]
return np_start, np_stop
def _get_non_par_snps(self, chrom, heterozygous=True):
np_start, np_stop = self._get_non_par_start_stop(chrom)
df = self._filter(chrom)
if heterozygous:
# get heterozygous SNPs in the non-PAR region (i.e., discrepant XY SNPs)
return df.loc[
(df.genotype.notnull())
& (df.genotype.str.len() == 2)
& (df.genotype.str[0] != df.genotype.str[1])
& (df.pos > np_start)
& (df.pos < np_stop)
].index
else:
# get homozygous SNPs in the non-PAR region
return df.loc[
(df.genotype.notnull())
& (df.genotype.str.len() == 2)
& (df.genotype.str[0] == df.genotype.str[1])
& (df.pos > np_start)
& (df.pos < np_stop)
].index
def _deduplicate_rsids(self):
# Keep first duplicate rsid.
duplicate_rsids = self._snps.index.duplicated(keep="first")
# save duplicate SNPs
self._duplicate = pd.concat([self._duplicate, self._snps.loc[duplicate_rsids]])
# deduplicate
self._snps = self._snps.loc[~duplicate_rsids]
def _deduplicate_alleles(self, rsids):
# remove duplicate allele
self._snps.loc[rsids, "genotype"] = self._snps.loc[rsids, "genotype"].apply(
lambda x: x[0]
)
def _deduplicate_sex_chrom(self, chrom):
"""Deduplicate a chromosome in the non-PAR region."""
discrepant_XY_snps = self._get_non_par_snps(chrom)
# save discrepant XY SNPs
self._discrepant_XY = pd.concat(
[self._discrepant_XY, self._snps.loc[discrepant_XY_snps]]
)
# drop discrepant XY SNPs since it's ambiguous for which allele to deduplicate
self._snps.drop(discrepant_XY_snps, inplace=True)
# get remaining non-PAR SNPs with two alleles
non_par_snps = self._get_non_par_snps(chrom, heterozygous=False)
self._deduplicate_alleles(non_par_snps)
def _deduplicate_XY_chrom(self):
"""Fix chromosome issue where some data providers duplicate male X and Y chromosomes"""
self._deduplicate_sex_chrom("X")
self._deduplicate_sex_chrom("Y")
def _deduplicate_MT_chrom(self):
"""Deduplicate MT chromosome."""
heterozygous_MT_snps = self._snps.loc[self.heterozygous("MT").index].index
# save heterozygous MT SNPs
self._heterozygous_MT = pd.concat(
[self._heterozygous_MT, self._snps.loc[heterozygous_MT_snps]]
)
# drop heterozygous MT SNPs since it's ambiguous for which allele to deduplicate
self._snps.drop(heterozygous_MT_snps, inplace=True)
self._deduplicate_alleles(self.homozygous("MT").index)
[docs]
@staticmethod
def get_par_regions(build):
"""Get PAR regions for the X and Y chromosomes.
Parameters
----------
build : int
build of SNPs
Returns
-------
pandas.DataFrame
PAR regions for the given build
References
----------
1. Genome Reference Consortium, https://www.ncbi.nlm.nih.gov/grc/human
2. Yates et. al. (doi:10.1093/bioinformatics/btu613),
`<http://europepmc.org/search/?query=DOI:10.1093/bioinformatics/btu613>`_
3. Zerbino et. al. (doi.org/10.1093/nar/gkx1098), https://doi.org/10.1093/nar/gkx1098
"""
if build == 37:
return pd.DataFrame(
{
"region": ["PAR1", "PAR2", "PAR1", "PAR2"],
"chrom": ["X", "X", "Y", "Y"],
"start": [60001, 154931044, 10001, 59034050],
"stop": [2699520, 155260560, 2649520, 59363566],
},
columns=["region", "chrom", "start", "stop"],
)
elif build == 38:
return pd.DataFrame(
{
"region": ["PAR1", "PAR2", "PAR1", "PAR2"],
"chrom": ["X", "X", "Y", "Y"],
"start": [10001, 155701383, 10001, 56887903],
"stop": [2781479, 156030895, 2781479, 57217415],
},
columns=["region", "chrom", "start", "stop"],
)
elif build == 36:
return pd.DataFrame(
{
"region": ["PAR1", "PAR2", "PAR1", "PAR2"],
"chrom": ["X", "X", "Y", "Y"],
"start": [1, 154584238, 1, 57443438],
"stop": [2709520, 154913754, 2709520, 57772954],
},
columns=["region", "chrom", "start", "stop"],
)
else:
return pd.DataFrame()
[docs]
def sort(self):
"""Sort SNPs based on ordered chromosome list and position."""
sorted_list = sorted(
(str(x) for x in self._snps["chrom"].unique()), key=self._natural_sort_key
)
# move PAR and MT to the end of the dataframe
if "PAR" in sorted_list:
sorted_list.remove("PAR")
sorted_list.append("PAR")
if "MT" in sorted_list:
sorted_list.remove("MT")
sorted_list.append("MT")
# convert chrom column to category for sorting
# https://stackoverflow.com/a/26707444
self._snps["chrom"] = self._snps["chrom"].astype(
CategoricalDtype(categories=sorted_list, ordered=True)
)
# sort based on ordered chromosome list and position
snps = self._snps.sort_values(["chrom", "pos"])
# convert chromosome back to object
snps["chrom"] = snps["chrom"].astype(object)
self._snps = snps
[docs]
def remap(self, target_assembly, complement_bases=True):
"""Remap SNP coordinates from one assembly to another.
This method uses the assembly map endpoint of the Ensembl REST API service (via
``Resources``'s ``EnsemblRestClient``) to convert SNP coordinates / positions from one
assembly to another. After remapping, the coordinates / positions for the
SNPs will be that of the target assembly.
If the SNPs are already mapped relative to the target assembly, remapping will not be
performed.
Parameters
----------
target_assembly : {'NCBI36', 'GRCh37', 'GRCh38', 36, 37, 38}
assembly to remap to
complement_bases : bool
complement bases when remapping SNPs to the minus strand
Returns
-------
chromosomes_remapped : list of str
chromosomes remapped
chromosomes_not_remapped : list of str
chromosomes not remapped
Notes
-----
An assembly is also know as a "build." For example:
Assembly NCBI36 = Build 36
Assembly GRCh37 = Build 37
Assembly GRCh38 = Build 38
See https://www.ncbi.nlm.nih.gov/assembly for more information about assemblies and
remapping.
References
----------
1. Ensembl, Assembly Map Endpoint,
http://rest.ensembl.org/documentation/info/assembly_map
2. Yates et. al. (doi:10.1093/bioinformatics/btu613),
`<http://europepmc.org/search/?query=DOI:10.1093/bioinformatics/btu613>`_
3. Zerbino et. al. (doi.org/10.1093/nar/gkx1098), https://doi.org/10.1093/nar/gkx1098
"""
chromosomes_remapped = []
chromosomes_not_remapped = []
snps = self.snps
if snps.empty:
logger.warning("No SNPs to remap")
return chromosomes_remapped, chromosomes_not_remapped
else:
chromosomes = snps["chrom"].unique()
chromosomes_not_remapped = list(chromosomes)
valid_assemblies = ["NCBI36", "GRCh37", "GRCh38", 36, 37, 38]
if target_assembly not in valid_assemblies:
logger.warning("Invalid target assembly")
return chromosomes_remapped, chromosomes_not_remapped
if isinstance(target_assembly, int):
if target_assembly == 36:
target_assembly = "NCBI36"
else:
target_assembly = "GRCh" + str(target_assembly)
if self.build == 36:
source_assembly = "NCBI36"
else:
source_assembly = "GRCh" + str(self.build)
if source_assembly == target_assembly:
return chromosomes_remapped, chromosomes_not_remapped
assembly_mapping_data = self._resources.get_assembly_mapping_data(
source_assembly, target_assembly
)
if not assembly_mapping_data:
return chromosomes_remapped, chromosomes_not_remapped
tasks = []
for chrom in chromosomes:
if chrom in assembly_mapping_data:
chromosomes_remapped.append(chrom)
chromosomes_not_remapped.remove(chrom)
mappings = assembly_mapping_data[chrom]
tasks.append(
{
"snps": snps.loc[snps["chrom"] == chrom],
"mappings": mappings,
"complement_bases": complement_bases,
}
)
else:
logger.warning(
f"Chromosome {chrom} not remapped; "
f"removing chromosome from SNPs for consistency"
)
snps = snps.drop(snps.loc[snps["chrom"] == chrom].index)
# remap SNPs
remapped_snps = self._parallelizer(self._remapper, tasks)
remapped_snps = pd.concat(remapped_snps)
# update SNP positions and genotypes
snps.loc[remapped_snps.index, "pos"] = remapped_snps["pos"]
snps.loc[remapped_snps.index, "genotype"] = remapped_snps["genotype"]
snps.pos = snps.pos.astype(np.uint32)
self._snps = snps
self.sort()
self._build = int(target_assembly[-2:])
return chromosomes_remapped, chromosomes_not_remapped
def _remapper(self, task):
"""Remap SNPs for a chromosome.
Parameters
----------
task : dict
dict with `snps` to remap per `mappings`, optionally `complement_bases`
Returns
-------
pandas.DataFrame
remapped SNPs
"""
temp = task["snps"].copy()
mappings = task["mappings"]
complement_bases = task["complement_bases"]
temp["remapped"] = False
pos_start = int(temp["pos"].describe()["min"])
pos_end = int(temp["pos"].describe()["max"])
for mapping in mappings["mappings"]:
orig_start = mapping["original"]["start"]
orig_end = mapping["original"]["end"]
mapped_start = mapping["mapped"]["start"]
mapped_end = mapping["mapped"]["end"]
orig_region = mapping["original"]["seq_region_name"]
mapped_region = mapping["mapped"]["seq_region_name"]
# skip if mapping is outside of range of SNP positions
if orig_end < pos_start or orig_start > pos_end:
continue
# find the SNPs that are being remapped for this mapping
snp_indices = temp.loc[
~temp["remapped"]
& (temp["pos"] >= orig_start)
& (temp["pos"] <= orig_end)
].index
# if there are no snp here, skip
if not len(snp_indices):
continue
orig_range_len = orig_end - orig_start
mapped_range_len = mapped_end - mapped_start
# if this would change chromosome, skip
# TODO allow within normal chromosomes
# TODO flatten patches
if orig_region != mapped_region:
logger.warning(
f"discrepant chroms for {len(snp_indices)} SNPs from {orig_region} to {mapped_region}"
)
continue
# if there is any stretching or squashing of the region
# observed when mapping NCBI36 -> GRCh38
# TODO disallow skipping a version when remapping
if orig_range_len != mapped_range_len:
logger.warning(
f"discrepant coords for {len(snp_indices)} SNPs from {orig_region}:{orig_start}-{orig_end} to {mapped_region}:{mapped_start}-{mapped_end}"
)
continue
# remap the SNPs
if mapping["mapped"]["strand"] == -1:
# flip and (optionally) complement since we're mapping to minus strand
diff_from_start = temp.loc[snp_indices, "pos"] - orig_start
temp.loc[snp_indices, "pos"] = mapped_end - diff_from_start
if complement_bases:
temp.loc[snp_indices, "genotype"] = temp.loc[
snp_indices, "genotype"
].apply(self._complement_bases)
else:
# mapping is on same (plus) strand, so just remap based on offset
offset = mapped_start - orig_start
temp.loc[snp_indices, "pos"] = temp["pos"] + offset
# mark these SNPs as remapped
temp.loc[snp_indices, "remapped"] = True
return temp
def _complement_bases(self, genotype):
if pd.isnull(genotype):
return np.nan
complement = ""
for base in list(genotype):
if base == "A":
complement += "T"
elif base == "G":
complement += "C"
elif base == "C":
complement += "G"
elif base == "T":
complement += "A"
else:
complement += base
return complement
# https://stackoverflow.com/a/16090640
@staticmethod
def _natural_sort_key(s, natural_sort_re=re.compile("([0-9]+)")):
return [
int(text) if text.isdigit() else text.lower()
for text in re.split(natural_sort_re, s)
]
[docs]
def merge(
self,
snps_objects=(),
discrepant_positions_threshold=100,
discrepant_genotypes_threshold=500,
remap=True,
chrom="",
):
"""Merge other ``SNPs`` objects into this ``SNPs`` object.
Parameters
----------
snps_objects : list or tuple of ``SNPs``
other ``SNPs`` objects to merge into this ``SNPs`` object
discrepant_positions_threshold : int
threshold for discrepant SNP positions between existing data and data to be loaded;
a large value could indicate mismatched genome assemblies
discrepant_genotypes_threshold : int
threshold for discrepant genotype data between existing data and data to be loaded;
a large value could indicated mismatched individuals
remap : bool
if necessary, remap other ``SNPs`` objects to have the same build as this ``SNPs`` object
before merging
chrom : str, optional
chromosome to merge (e.g., "1", "Y", "MT")
Returns
-------
list of dict
for each ``SNPs`` object to merge, a dict with the following items:
merged (bool)
whether ``SNPs`` object was merged
common_rsids (pandas.Index)
SNPs in common
discrepant_position_rsids (pandas.Index)
SNPs with discrepant positions
discrepant_genotype_rsids (pandas.Index)
SNPs with discrepant genotypes
References
----------
1. Fluent Python by Luciano Ramalho (O'Reilly). Copyright 2015 Luciano Ramalho,
978-1-491-94600-8.
"""
def init(s):
# initialize this SNPs object with properties of the SNPs object being merged
self._snps = s.snps
self._duplicate = s.duplicate
self._discrepant_XY = s.discrepant_XY
self._heterozygous_MT = s.heterozygous_MT
self._discrepant_vcf_position = s.discrepant_vcf_position
self._discrepant_merge_positions = s.discrepant_merge_positions
self._discrepant_merge_genotypes = s.discrepant_merge_genotypes
self._source = s._source
self._phased = s.phased
self._build = s.build
self._build_detected = s.build_detected
def ensure_same_build(s):
# ensure builds match when merging
if not s.build_detected:
logger.warning(
f"Build not detected for {s.__repr__()}, assuming Build {s.build}"
)
if self.build != s.build:
logger.info(
f"{s.__repr__()} has Build {s.build}; remapping to Build {self.build}"
)
s.remap(self.build)
def merge_properties(s):
if not s.build_detected:
# can no longer assume build has been detected for all SNPs after merge
self._build_detected = False
if not s.phased:
# can no longer assume all SNPs are phased after merge
self._phased = False
self._source.extend(s._source)
def merge_dfs(s):
# append dataframes created when a ``SNPs`` object is instantiated
self._duplicate = pd.concat([self.duplicate, s.duplicate])
self._discrepant_XY = pd.concat([self.discrepant_XY, s.discrepant_XY])
self._heterozygous_MT = pd.concat([self.heterozygous_MT, s.heterozygous_MT])
self._discrepant_vcf_position = pd.concat(
[self.discrepant_vcf_position, s.discrepant_vcf_position]
)
def merge_snps(s, positions_threshold, genotypes_threshold, merge_chrom):
# merge SNPs, identifying those with discrepant positions and genotypes; update NAs
# identify common SNPs (i.e., any rsids being added that already exist in self.snps)
df = (
self.snps.join(s.snps, how="inner", rsuffix="_added")
if not merge_chrom
else self.snps.loc[self.snps.chrom == merge_chrom].join(
s.snps.loc[s.snps.chrom == merge_chrom],
how="inner",
rsuffix="_added",
)
)
common_rsids = df.index
discrepant_positions = df.loc[
(df.chrom != df.chrom_added) | (df.pos != df.pos_added)
]
if len(discrepant_positions) >= positions_threshold:
logger.warning(
"Too many SNPs differ in position; ensure SNPs have same build"
)
return (False,)
# remove null genotypes
df = df.loc[~df.genotype.isnull() & ~df.genotype_added.isnull()]
# discrepant genotypes are where alleles are not equivalent (i.e., alleles are not the
# same and not swapped)
discrepant_genotypes = df.loc[
(df.genotype.str.len() != df.genotype_added.str.len())
| (
(df.genotype.str.len() == 1)
& (df.genotype_added.str.len() == 1)
& (df.genotype != df.genotype_added)
)
| (
(df.genotype.str.len() == 2)
& (df.genotype_added.str.len() == 2)
& ~(
(df.genotype.str[0] == df.genotype_added.str[0])
& (df.genotype.str[1] == df.genotype_added.str[1])
)
& ~(
(df.genotype.str[0] == df.genotype_added.str[1])
& (df.genotype.str[1] == df.genotype_added.str[0])
)
)
]
if len(discrepant_genotypes) >= genotypes_threshold:
logger.warning(
"Too many SNPs differ in their genotype; ensure file is for same "
"individual"
)
return (False,)
# add new SNPs
self._snps = (
self.snps.combine_first(s.snps)
if not merge_chrom
else self.snps.combine_first(s.snps.loc[s.snps.chrom == merge_chrom])
)
# combine_first converts position to float64, so convert it back to uint32
self._snps["pos"] = self.snps["pos"].astype(np.uint32)
if 0 < len(discrepant_positions) < positions_threshold:
logger.warning(
f"{str(len(discrepant_positions))} SNP positions were discrepant; keeping original positions"
)
if 0 < len(discrepant_genotypes) < genotypes_threshold:
logger.warning(
f"{str(len(discrepant_genotypes))} SNP genotypes were discrepant; marking those as null"
)
# set discrepant genotypes to null
self._snps.loc[discrepant_genotypes.index, "genotype"] = np.nan
# append discrepant positions dataframe
self._discrepant_merge_positions = pd.concat(
[self._discrepant_merge_positions, discrepant_positions], sort=True
)
# append discrepant genotypes dataframe
self._discrepant_merge_genotypes = pd.concat(
[self._discrepant_merge_genotypes, discrepant_genotypes], sort=True
)
return (
True,
{
"common_rsids": common_rsids,
"discrepant_position_rsids": discrepant_positions.index,
"discrepant_genotype_rsids": discrepant_genotypes.index,
},
)
results = []
for snps_object in snps_objects:
d = {
"merged": False,
"common_rsids": pd.Index([], name="rsid"),
"discrepant_position_rsids": pd.Index([], name="rsid"),
"discrepant_genotype_rsids": pd.Index([], name="rsid"),
}
if not snps_object.valid:
logger.warning("No SNPs to merge...")
results.append(d)
continue
if not self.valid:
logger.info(f"Loading {snps_object.__repr__()}")
init(snps_object)
d.update({"merged": True})
else:
logger.info(f"Merging {snps_object.__repr__()}")
if remap:
ensure_same_build(snps_object)
if self.build != snps_object.build:
logger.warning(
f"{snps_object.__repr__()} has Build {snps_object.build}; this SNPs object has Build {self.build}"
)
merged, *extra = merge_snps(
snps_object,
discrepant_positions_threshold,
discrepant_genotypes_threshold,
chrom,
)
if merged:
merge_properties(snps_object)
merge_dfs(snps_object)
self.sort()
d.update({"merged": True})
d.update(extra[0])
results.append(d)
return results
def sort_snps(self):
warnings.warn("This method has been renamed to `sort`.", DeprecationWarning)
self.sort()
def remap_snps(self, target_assembly, complement_bases=True):
warnings.warn("This method has been renamed to `remap`.", DeprecationWarning)
return self.remap(target_assembly, complement_bases)
def save_snps(self, filename="", vcf=False, atomic=True, **kwargs):
warnings.warn(
"Method `save_snps` has been replaced by `to_csv`, `to_tsv`, and `to_vcf`.",
DeprecationWarning,
)
return self._save(filename, vcf, atomic, **kwargs)
@property
def snp_count(self):
warnings.warn("This property has been renamed to `count`.", DeprecationWarning)
return self.count
def get_snp_count(self, chrom=""):
warnings.warn(
"This method has been renamed to `get_count`.", DeprecationWarning
)
return self.get_count(chrom)
def not_null_snps(self, chrom=""):
warnings.warn("This method has been renamed to `notnull`.", DeprecationWarning)
return self.notnull(chrom)
def get_summary(self):
warnings.warn(
"This method has been renamed to `summary` and is now a property.",
DeprecationWarning,
)
return self.summary
def get_assembly(self):
warnings.warn("See the `assembly` property.", DeprecationWarning)
return self.assembly
def get_chromosomes(self):
warnings.warn("See the `chromosomes` property.", DeprecationWarning)
return self.chromosomes
def get_chromosomes_summary(self):
warnings.warn("See the `chromosomes_summary` property.", DeprecationWarning)
return self.chromosomes_summary
@property
def duplicate_snps(self):
warnings.warn(
"This property has been renamed to `duplicate`.", DeprecationWarning
)
return self.duplicate
@property
def discrepant_XY_snps(self):
warnings.warn(
"This property has been renamed to `discrepant_XY`.", DeprecationWarning
)
return self.discrepant_XY
@property
def heterozygous_MT_snps(self):
warnings.warn(
"This property has been renamed to `heterozygous_MT`.", DeprecationWarning
)
return self.heterozygous_MT
def heterozygous_snps(self, chrom=""):
warnings.warn(
"This method has been renamed to `heterozygous`.", DeprecationWarning
)
return self.heterozygous(chrom)
def homozygous_snps(self, chrom=""):
warnings.warn(
"This method has been renamed to `homozygous`.", DeprecationWarning
)
return self.homozygous(chrom)
@property
def discrepant_positions(self):
warnings.warn(
"This property has been renamed to `discrepant_merge_positions`.",
DeprecationWarning,
)
return self.discrepant_merge_positions
@property
def discrepant_genotypes(self):
warnings.warn(
"This property has been renamed to `discrepant_merge_genotypes`.",
DeprecationWarning,
)
return self.discrepant_merge_genotypes
@property
def discrepant_snps(self):
warnings.warn(
"This property has been renamed to `discrepant_merge_positions_genotypes`.",
DeprecationWarning,
)
return self.discrepant_merge_positions_genotypes
def is_valid(self):
warnings.warn(
"This method has been renamed to `valid` and is now a property.",
DeprecationWarning,
)
return self.valid
[docs]
def predict_ancestry(
self,
output_directory=None,
write_predictions=False,
models_directory=None,
aisnps_directory=None,
aisnps_set=None,
):
"""Predict genetic ancestry for SNPs.
Predictions by `ezancestry <https://github.com/arvkevi/ezancestry>`_.
Notes
-----
Populations below are described `here <https://www.internationalgenome.org/faq/what-do-the-population-codes-mean/>`_.
Parameters
----------
various : optional
See the available settings for `predict` at `ezancestry <https://github.com/arvkevi/ezancestry>`_.
Returns
-------
dict
dict with the following keys:
`population_code` (str)
max predicted population for the sample
`population_percent` (float)
predicted probability for the max predicted population
`superpopulation_code` (str)
max predicted super population (continental) for the sample
`superpopulation_percent` (float)
predicted probability for the max predicted super population
`ezancestry_df` (pandas.DataFrame)
pandas.DataFrame with the following columns:
`component1`, `component2`, `component3`
The coordinates of the sample in the dimensionality-reduced component space. Can be
used as (x, y, z,) coordinates for plotting in a 3d scatter plot.
`predicted_ancestry_population`
The max predicted population for the sample.
`ACB`, `ASW`, `BEB`, `CDX`, `CEU`, `CHB`, `CHS`, `CLM`, `ESN`, `FIN`, `GBR`, `GIH`, `GWD`, `IBS`, `ITU`, `JPT`, `KHV`, `LWK`, `MSL`, `MXL`, `PEL`, `PJL`, `PUR`, `STU`, `TSI`, `YRI`
Predicted probabilities for each of the populations. These sum to 1.0.
`predicted_ancestry_superpopulation`
The max predicted super population (continental) for the sample.
`AFR`, `AMR`, `EAS`, `EUR`, `SAS`
Predicted probabilities for each of the super populations. These sum to 1.0.
"""
if not self.valid:
return {}
try:
from ezancestry.commands import predict
except ModuleNotFoundError:
raise ModuleNotFoundError(
"Ancestry prediction requires the ezancestry package; please install it using pip install ezancestry"
)
def max_pop(row):
popcode = row["predicted_ancestry_population"]
poppct = row[popcode]
superpopcode = row["predicted_ancestry_superpopulation"]
superpoppct = row[superpopcode]
return {
"population_code": popcode,
"population_percent": poppct,
"superpopulation_code": superpopcode,
"superpopulation_percent": superpoppct,
}
predictions = predict(
self.snps,
output_directory,
write_predictions,
models_directory,
aisnps_directory,
aisnps_set,
)
d = dict(predictions.apply(max_pop, axis=1).iloc[0])
d["ezancestry_df"] = predictions
return d
[docs]
def compute_cluster_overlap(self, cluster_overlap_threshold=0.95):
"""Compute overlap with chip clusters.
Chip clusters, which are defined in [1]_, are associated with deduced genotype /
chip arrays and DTC companies.
This method also sets the values returned by the `cluster`, `chip`, and
`chip_version` properties, based on max overlap, if the specified threshold is
satisfied.
Parameters
----------
cluster_overlap_threshold : float
threshold for cluster to overlap this SNPs object, and vice versa, to set
values returned by the `cluster`, `chip`, and `chip_version` properties
Returns
-------
pandas.DataFrame
pandas.DataFrame with the following columns:
`company_composition`
DTC company composition of associated cluster from [1]_
`chip_base_deduced`
deduced genotype / chip array of associated cluster from [1]_
`snps_in_cluster`
count of SNPs in cluster
`snps_in_common`
count of SNPs in common with cluster (inner merge with cluster)
`overlap_with_cluster`
percentage overlap of `snps_in_common` with cluster
`overlap_with_self`
percentage overlap of `snps_in_common` with this SNPs object
References
----------
.. [1] Chang Lu, Bastian Greshake Tzovaras, Julian Gough, A survey of
direct-to-consumer genotype data, and quality control tool
(GenomePrep) for research, Computational and Structural
Biotechnology Journal, Volume 19, 2021, Pages 3747-3754, ISSN
2001-0370, https://doi.org/10.1016/j.csbj.2021.06.040.
"""
# information from Lu et. al (Ref. [1]_), Table 2 and Fig. 2
df = pd.DataFrame(
data={
"cluster_id": ["c1", "c3", "c4", "c5", "v5"],
"company_composition": [
"23andMe-v4",
"AncestryDNA-v1, FTDNA, MyHeritage",
"23andMe-v3",
"AncestryDNA-v2",
"23andMe-v5, LivingDNA",
],
"chip_base_deduced": [
"HTS iSelect HD",
"OmniExpress",
"OmniExpress plus",
"OmniExpress plus",
"Illumina GSAs",
],
"snps_in_cluster": [0] * 5,
"snps_in_common": [0] * 5,
}
)
df.set_index("cluster_id", inplace=True)
if self.build != 37:
to_remap = copy.deepcopy(self)
to_remap.remap(37) # clusters are relative to Build 37
self_snps = to_remap.snps[["chrom", "pos"]].drop_duplicates()
else:
self_snps = self.snps[["chrom", "pos"]].drop_duplicates()
chip_clusters = self._resources.get_chip_clusters()
for cluster in df.index.values:
cluster_snps = chip_clusters.loc[
chip_clusters.clusters.str.contains(cluster)
][["chrom", "pos"]]
df.loc[cluster, "snps_in_cluster"] = len(cluster_snps)
df.loc[cluster, "snps_in_common"] = len(
self_snps.merge(cluster_snps, how="inner")
)
df["overlap_with_cluster"] = df.snps_in_common / df.snps_in_cluster
df["overlap_with_self"] = df.snps_in_common / len(self_snps)
max_overlap = df.overlap_with_cluster.idxmax()
if (
df.overlap_with_cluster.loc[max_overlap] > cluster_overlap_threshold
and df.overlap_with_self.loc[max_overlap] > cluster_overlap_threshold
):
self._cluster = max_overlap
self._chip = df.chip_base_deduced.loc[max_overlap]
company_composition = df.company_composition.loc[max_overlap]
if self.source in company_composition:
if self.source == "23andMe" or self.source == "AncestryDNA":
i = company_composition.find("v")
self._chip_version = company_composition[i : i + 2]
else:
logger.warning(
"Detected SNPs data source not found in cluster's company composition"
)
return df
[docs]
def identify_low_quality_snps(self):
"""Identify low quality SNPs based on chip clusters.
Any low quality SNPs are removed from the
:meth:`snps_qc <snps.snps.SNPs.snps_qc>` dataframe and are made
available as :meth:`low_quality <snps.snps.SNPs.low_quality>`.
Notes
-----
Chip clusters, which are defined in [1]_, are associated with low quality SNPs.
As such, low quality SNPs will only be identified when this SNPs object corresponds
to a cluster per
:meth:`compute_cluster_overlap() <snps.snps.SNPs.compute_cluster_overlap>`.
"""
if self.build != 37:
to_remap = copy.deepcopy(self)
to_remap.remap(37) # clusters are relative to Build 37
self_snps = to_remap._snps[["chrom", "pos"]]
else:
self_snps = self._snps[["chrom", "pos"]]
low_quality_snps = self._resources.get_low_quality_snps()
if self.cluster:
cluster_snps = low_quality_snps.loc[
low_quality_snps.cluster.str.contains(self.cluster)
][["chrom", "pos"]]
# keep index after merge; https://stackoverflow.com/a/11982843
merged = (
self_snps.reset_index()
.merge(cluster_snps, how="inner")
.set_index("rsid")
)
self._low_quality = merged.index